这是自个儿在安利的经历(干货)

 
 在面前几篇琢磨里大家都事关过:Akka-http是一项系统融为一体工具库。它是以数据互换的花样开展系统融为一体的。所以,Akka-http的骨干功能应该是数据沟通的兑现了:应该能透过某种公开的多少格式和传导标准相比便利的落实包括异类系统之间通过网上举行的数据交流。覆盖包括:数据编码、发送和多少接受、解析全经过。Akka-http提供了诸多网上传输标准数据的概括模型以及数据类型转换方法,可以使编程人士很便宜的构建网上往来的Request和Response。但是,现实中的数据沟通远远不止针对request和response操作可以满足的。系统里头数据交流通常提到文件或者数据库表类型的数量上传下载。尽管在Http标准中讲述了怎么着通过MultiPart音讯类型举行批量数额的传输,但是这个标准提到的贯彻细节包括数据内容叙述、数据分段模式、音讯数据长度统计等等简直可以顿时令人却步。Akka-http是按照Akka-stream开发的:不但它的行事流程可以用Akka-stream来发表,它还扶助stream化的数量传输。我们领悟:Akka-stream提供了成效强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单的说:Akka-http的音信数据内容HttpEntity可以支撑理论上无与伦比长度的data-stream。最珍奇的是:这个Source是个Reactive-Stream-Source,具备了back-pressure机制,可以有效应付数据互换参预两方Reactive端点不同的数据传输速率。

   
 其实这篇小说是专为硕士写的,社会上不少直销,以前我们都叫它传销,后来因为军队的特大,也无能为力一下子避免,国家给予了他们一个名分,美其名约:直销。对于直销的性质大部分人仍旧知道为传销,因为都是在推人,做下线。不过自己或者希望我们都能理性的自查自纠直销,因为微商也是中间一种。那么明天说的安利,其实自己个人认为是有别与直销的,因为它更吓人。它的吓人在于其中培训实在是太棒了,棒到就是你是个卓殊理性的人,也会不时往它这里走,那么究竟这里有什么样吸引我的地点吗?

 
Akka-http的stream类型数据内容是以Source[T,_]花色表示的。首先,Akka-stream通过FileIO对象提供了丰裕多的file-io操作函数,其中有个fromPath函数能够用某个文件内容数据构建一个Source类型:

1.本身刚进来安利是因为高校协会里的一个好爱人,因为咱们玩的很好,高校后如故保持联系,有一天她打来电话和我聊起她的近况,说自己现在每日去听公益课,会做猪蹄,会做蔓越莓饼干,披萨等等,而且他说话的感动和兴奋,使我坚决走进了这几个条件。首次去的时候,我是随着公益课去的,比如会有老师教您什么样养生,如何发展,怎么样让投机生活过的幽雅,课程体系丰裕多彩,这一个科目都万分适合刚毕业的大学生,因为先生说了在这些环境里你能源源学习,不断大力,不断遭遇更好的亲善。听完课之后都有一个分享:要简明介绍自己,说下是什么人带你进这么些环境,明天的觉悟。回来之后难掩内心的欢欣,觉得自己简直是捡到宝贝了,觉得一节公益课涉及到的学时2个钟头才收15-20元,是不是很合算,这钱交过去是AA场所费的。

/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

2.次之次去是为着学习护肤化妆方面的文化,当时去的时候是一节完全的卸妆爱慕课,其实就是推销安利雅姿。刚过去的时候,我的好爱人只是告诉我听课,我想才十几块钱的课,听听对自己有补益,就没推脱。但是后来去了才察觉这一整节课下来需要200,因为示范的民办教授说了,只好用安利的卸妆,加上他们的一套拨筋护肤手法,才能有机能。当时我在心尖暗暗低估,这多少个不是变相传销吧,为啥好情人没说了然。没有主意,想想依然买了吧,于是花了近乎500多。此后,我有些有点不乐意去了,因为感觉老师在灌输安利的事物。

以此函数构建了Source[ByteString,Future[IOResult]],我们需要把ByteString转化成MessageEntity。首先需要在implicit-scope内提供马尔斯haller[ByteString,MessageEntity]花色的隐式实例:

3.第两次去的时候是仇敌打来说想要做蔓越莓饼干和猪蹄给自家吃,一听吃的本身就兴奋了,而且开会所的讲师不在这里,我的小心心就弱了,反正都是AA,自己也不亏,于是自己又跑过去。不过过去美食还没起来多长时间,会所的良师就死灰复燃了,然后因为自己不会做食品,好情人在做,无聊时刻,老师就接待我坐在茶几旁边喝茶聊天,给我讲了好多养生方面的事情,然后还看自己的指头,看手相,他们叫手诊。然后说要多补血巴拉巴拉一堆话,然后还带我参观安利产品,然后说了友好也在吃,举了几个实例,说安利是个高大的事业,巴拉巴拉。终于等到蔓越莓和猪蹄了,开吃的时候氛围很好,我们都很吃的开。也有说有笑,不说其他的。

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...

4.本人三姑知道自家去了某些次,但是本人都没说去安利,只说去学学。然后久了我妈就说怎么学习那么迟回家,我才说出去安利。于是他大怒,说我家隔壁老邻居在此以前的兄长吃了蛋白粉没长高的事务。还说安利很吓人。这我更要去看下安利内部到底是如何是好的。安利人士大部分白天去开发意向客户,清晨才会开展会议和学习。本次是去看家用产品演示,可能因为上次办了张卡,所以进一步信任自己了。而其又买了一瓶浓缩家用洗洁精和牙膏牙刷。本次看了无数比照试验,其实这在此外行业也很常见,就不具体描述了。

俺们还亟需Json-Streaming帮助:

5.第两回去的时候,是去听FQ解说,我只得说安利培育的人才个个解说口才了得,没有两把刷子还真上持续台。而且演说的良师从没给您体面罐鸡汤,画大饼,而是通过有些事例来注解安利的好,也就是她们的奖金制度,因为这多少个制度是延续的,即便您辞世了,你孩子可以拿走你的财产,就是其一事业是足以延续的。当时听来好刺激哦,怎么如此棒。

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)

6.第六次我带了自我的好情人过去,我就像以前带本人进安利的百般好对象一样,和名师们配合,起先“引”好友进入这些世界,猜猜看我带的是何人?—就是我邻居家的父兄。他即便不是很了然那么些,然则他说了友好后边的经验,老师的答复是先前的直销人士都没培训好,现在不一致了。而且还说了无数事物,具体内容大概是安利和原先不同了,现在有互联网+,安利的前景无限量等等。可是这次之后,我再也没去,一是因为日子的题材。二是自己认为自己终于自律性不错的人,学习完全可以去体育场馆或者上网,途径很多。再说我们刚毕业地基还没牢,别一心想着即刻挣快钱。

FileIO是blocking操作,我们还是可以拔取独立的线程供blocking操作使用:

总结:我不可以说哪些直销不佳或者好,害人或者不害人。不过纵观经济和政治,大国间的国策是我们这多少个小人物不懂的,就像在此以前传闻说安利的蛋白粉都是转基因,然而有人会反驳,这干什么中国引进安利,为啥奥运会运动健儿还吃呢?一,中国在引入前常有不知晓安利是什么性质,而且安利也不容许说出它是哪些性质。引进安利可以化解就业,可以……肯定有裨益才引进。运动员吃这多少个是互补体能,运动员的高强度运动需要能量维持,而我们是普通人,至于是否长久用,用了有没有影响,我们不得而知,大家要理性对待,不吃安利的依然会采取任何牌子的,为何不选一个进来中国较久的牌子,也许……说实话,作为中华人,我要么会微微偏向中华,什么人不想协调的国家强大,什么人不想自己的子孙繁衍生息,宁愿错过一个,也无法让祥和国家的利益受损。更何况关乎民族的一体利益。当然既然存在了,这东西一定有市场.安利的清洁剂和洗洁精,牙膏等产品都是极度正确的,我家至少6个亲属在用.不过关乎吃的,我们中国人也许更偏向于食疗.当然作为国人,不管网上传言想用改变基因破话种的业务是不是真正,我想不需要我们去定夺,怕就别吃.指出大学生可以去安利那个平台锻炼下,毕竟它提倡的无数市值观面的东西都非凡积极,而且培训的始末也足以说都是相比实用的.加不投入安利,这些是私房采用题材,我尚未身份干涉。只盼望看到这篇的人都能转化下,不需要打赏。

   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))

前几天大家得以从在server上用一个文件构建Source然后再转成Response:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

无异于,我们也可以把数据库表内数据转成Akka-Stream-Source,然后再落实到MessageEntity的变换。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

然后开展到MessageEntity的变换:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

下面是这一次示范的一体化源代码:

import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}