Akka(36): Http:Client-side-Api,Client-Connections

 
 Akka-http的客户端Api应该是因HttpRequest操作为主轴的网上消息交换模式编程工具。我们解:Akka-http是搭建筑在Akka-stream之上的。所以,Akka-http在客户端构建和服务器的总是通道为可以用Akka-stream的Flow来代表。这个Flow可以由此调用Http.outgoingConnection来博取:

 
 于前几首讨论里我们且关系过:Akka-http是同码系统融为一体工具库。它是以数据交换的款式进行系统融为一体的。所以,Akka-http的主干力量应该是数据交换的实现了:应该能经过某种公开的数目格式和导标准于便宜的实现包括异类系统之间通过网上展开的数据交换。覆盖包括:数据编码、发送和多少接收、解析全经过。Akka-http提供了许多网上传标准数量的概括模型与数据类型转换方法,可以假设编程人员很有利之构建网上往来之Request和Response。但是,现实中之数据交换远远不止对request和response操作会满足的。系统内数据交换经常涉及文件要数据库表类型的数码上传下载。虽然在Http标准中描述了安通过MultiPart消息类型进行批量数的传输,但是这个正式提到的兑现细节包括数据内容叙述、数据分段方式、消息数据长度计算等等简直可以及时叫人却步。Akka-http是依据Akka-stream开发之:不但她的行事流程可以用Akka-stream来表述,它还支持stream化的数目传。我们理解:Akka-stream提供了功能强大的FileIO和Data-Streaming,可以据此Stream-Source代表文件或者数据库数据源。简单来说:Akka-http的信息数据内容HttpEntity可以支持理论及无比长度的data-stream。最华贵的凡:这个Source是单Reactive-Stream-Source,具备了back-pressure机制,可以使得应付数据交换参与两方Reactive端点不同的数据传输速率。

  /**
   * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint.
   * Every materialization of the produced flow will attempt to establish a new outgoing connection.
   *
   * To configure additional settings for requests made using this method,
   * use the `akka.http.client` config section or pass in a [[akka.http.scaladsl.settings.ClientConnectionSettings]] explicitly.
   */
  def outgoingConnection(host: String, port: Int = 80,
                         localAddress: Option[InetSocketAddress] = None,
                         settings:     ClientConnectionSettings  = ClientConnectionSettings(system),
                         log:          LoggingAdapter            = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
    _outgoingConnection(host, port, settings.withLocalAddressOverride(localAddress), ConnectionContext.noEncryption(), ClientTransport.TCP, log)

 
Akka-http的stream类型数据内容是因Source[T,_]项目表示的。首先,Akka-stream通过FileIO对象提供了十足多的file-io操作函数,其中有个fromPath函数可以用有文件内容数据构建一个Source类型:

咱俩见到:这个函数实现了针对Server端地址host+port的设定,返回的结果类型是Flow[HttpRequest,HttpResponse,Future[OutgoingConnection]]。这个Flow代表将输入的HttpRequest转换成为输出的HttpResponse。这个转换过程包括了和Server之间的Request,Response信息交换。下面我们摸索着用之Flow来为Server端发送request,并赢得response:

/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)
  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =
    Http().outgoingConnection("akka.io")

  def sendHttpRequest(req: HttpRequest) = {
    Source.single(req)
      .via(connFlow)
      .runWith(Sink.head)
  }

  sendHttpRequest(HttpRequest(uri="/"))
      .andThen{
        case Success(resp) => println(s"got response: ${resp.status.intValue()}")
        case Failure(err) => println(s"request failed: ${err.getMessage}")
      }
     .andThen {case _ => sys.terminate()}

斯函数构建了Source[ByteString,Future[IOResult]],我们用把ByteString转化成MessageEntity。首先需在implicit-scope内提供Marshaller[ByteString,MessageEntity]花色的隐式实例:

方的这种模式就是是所谓Connection-Level-Client-Side-Api。这种模式可于用户发生重新可怜程度之自由度控制connection的构建、使用及以connection上发送request的点子。一般来讲,当回response的entity被全然消耗后系会自动close
connection,这套api还提供了部分手动智可在发要之情状下手动进行connection
close,如下:

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...
 //close connection by cancelling response entity
  resp.entity.dataBytes.runWith(Sink.cancelled)
 //close connection by receiving response with close header
  Http().bindAndHandleSync(
    { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) },
    "akka.io",
    80)(mat)

咱尚亟需Json-Streaming支持:

Akka-http客户端api还有一样种植实用的Host-Level-Client-Side-Api模式。这套api能自动针对每个端点维护一个连接池(connection-pool),用户仅需要对连接池进行安排。系统以连接池配置活动保护池内线程的很、死、动、停。akka-http.host-connection-pool配置中max-connections,max-open-requests,pipelining-limit等控制正在connection、在途request的数量,需要特别注意。针对有端点的连接池是通过Http().cachedHostConnectionPool(endPoint)获取之。同样,获取的啊是一个client-flow实例。因为系统活动保护在线程池,所以client-flow实例可以轻易引用,无论调用次数和调用时间间隔。cachedHostConnectionPool()函数定义如下:

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)
  /**
   * Same as [[#cachedHostConnectionPool]] but for encrypted (HTTPS) connections.
   *
   * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
   * for encryption on the connections.
   *
   * To configure additional settings for the pool (and requests made using it),
   * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
   */
  def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443,
                                       connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
                                       settings:          ConnectionPoolSettings = defaultConnectionPoolSettings,
                                       log:               LoggingAdapter         = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
    val cps = ConnectionPoolSetup(settings, connectionContext, log)
    val setup = HostConnectionPoolSetup(host, port, cps)
    cachedHostConnectionPool(setup)
  }

FileIO是blocking操作,我们还可以选用独立的线程供blocking操作下:

函数返回结果类型:Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]。因为线程池内的线程是异步构建request和接收response的,而返response的逐一未必按照发送request的一一,所以用一个tuple2的T类型标示request与归的response进行匹配。线程池会根据idle-timeout自动终止,也可手动通过HostConnectionPool.shutDown()实现:

   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  /**
   * Represents a connection pool to a specific target host and pool configuration.
   */
  final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)(
    private[http] val gateway: PoolGateway) { // enable test access

    /**
     * Asynchronously triggers the shutdown of the host connection pool.
     *
     * The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed.
     */
    def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown()

    private[http] def toJava = new akka.http.javadsl.HostConnectionPool {
      override def setup = HostConnectionPool.this.setup
      override def shutdown(executor: ExecutionContextExecutor): CompletionStage[Done] = HostConnectionPool.this.shutdown()(executor).toJava
    }
  }

今昔咱们可以由在server上用一个文书构建Source然后重新转成Response:

啊得以经过Http().shutdownAllConnectionPools()一次性终止ActorSystem内所有线程池:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }
  /**
   * Triggers an orderly shutdown of all host connections pools currently maintained by the [[akka.actor.ActorSystem]].
   * The returned future is completed when all pools that were live at the time of this method call
   * have completed their shutdown process.
   *
   * If existing pool client flows are re-used or new ones materialized concurrently with or after this
   * method call the respective connection pools will be restarted and not contribute to the returned future.
   */
  def shutdownAllConnectionPools(): Future[Unit] = {
    val shutdownCompletedPromise = Promise[Done]()
    poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise)
    shutdownCompletedPromise.future.map(_ ⇒ ())(system.dispatcher)
  }

如出一辙,我们啊得以把数据库表内数据转成Akka-Stream-Source,然后再度落实到MessageEntity的转移。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后将publisher转成Akka-Stream-Source,如下:

咱俩用cachedHostConnectionPool获取一个client-flow实例:

object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]后即使得开展输入HttpRequest到HttpResponse的更换处理。如下面的例子:

然后进行到MessageEntity的换:

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80)

  def sendPoolRequest(req: HttpRequest, marker: Int) = {
    Source.single(req -> marker)
      .via(pooledFlow)
      .runWith(Sink.head)
  }

  sendPoolRequest(HttpRequest(uri="/"), 1)
    .andThen{
      case Success((tryResp, mk)) =>
        tryResp match {
          case Success(resp) => println(s"got response: ${resp.status.intValue()}")
          case Failure(err) => println(s"request failed: ${err.getMessage}")
        }
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }
    .andThen {case _ => sys.terminate()}
  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

每当以上这个例子里实际同样会碰到Connection-Level-Api所遭遇的的题材,这是坐获的线程池内的线程还是简单的,只能解决因request速率超出response速率所招的request积压。目前最为管用之点子或者经过采取一个queue来暂存request后又相继个处理:

下是本次示范的整源代码:

    val QueueSize = 10
    // This idea came initially from this blog post:
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
    val queue =
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
        .via(poolClientFlow)
        .toMat(Sink.foreach({
          case ((Success(resp), p)) => p.success(resp)
          case ((Failure(e), p))    => p.failure(e)
        }))(Keep.left)
        .run()

    def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
    }

    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
    responseFuture.andThen {
      case Success(resp) => println(s"got response: ${resp.status.intValue()}")
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }.andThen {case _ => sys.terminate()}
import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

脚是本次Akka-http-client-side-connection讨论的示范源代码:

 

import akka.actor._
import akka.http.javadsl.{HostConnectionPool, OutgoingConnection}
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._

import scala.concurrent._
import scala.util._

object ClientApiDemo extends App {
  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher
/*
  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =
    Http().outgoingConnection("www.sina.com")

  def sendHttpRequest(req: HttpRequest) = {
    Source.single(req)
      .via(connFlow)
      .runWith(Sink.head)
  }

  sendHttpRequest(HttpRequest(uri="/"))
      .andThen{
        case Success(resp) =>
          //close connection by cancelling response entity
          resp.entity.dataBytes.runWith(Sink.cancelled)
          println(s"got response: ${resp.status.intValue()}")
        case Failure(err) => println(s"request failed: ${err.getMessage}")
      }
  //   .andThen {case _ => sys.terminate()}


 //close connection by receiving response with close header
  Http().bindAndHandleSync(
    { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) },
    "akka.io",
    80)(mat)

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80)

  def sendPoolRequest(req: HttpRequest, marker: Int) = {
    Source.single(req -> marker)
      .via(pooledFlow)
      .runWith(Sink.head)
  }

  sendPoolRequest(HttpRequest(uri="/"), 1)
    .andThen{
      case Success((tryResp, mk)) =>
        tryResp match {
          case Success(resp) => println(s"got response: ${resp.status.intValue()}")
          case Failure(err) => println(s"request failed: ${err.getMessage}")
        }
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }
    .andThen {case _ => sys.terminate()}
*/

    val QueueSize = 10
    // This idea came initially from this blog post:
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
    val queue =
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
        .via(poolClientFlow)
        .toMat(Sink.foreach({
          case ((Success(resp), p)) => p.success(resp)
          case ((Failure(e), p))    => p.failure(e)
        }))(Keep.left)
        .run()

    def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
    }

    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
    responseFuture.andThen {
      case Success(resp) => println(s"got response: ${resp.status.intValue()}")
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }.andThen {case _ => sys.terminate()}

}