Akka(36): Http:Client-side-Api,Client-Connections葡京网上娱乐场

 
 Akka-http的客户端Api应该是以HttpRequest操作为主轴的网上音信交流形式编程工具。我们清楚:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客户端构建与服务器的连接通道也可以用Akka-stream的Flow来表示。那多少个Flow可以由此调用Http.outgoingConnection来赢得:

 
我们说过Akka-http是一个好的连串融为一体工具,集成是经过数据交流格局实现的。Http是个在网上传输和接受的正经商事。所以,在运用Akka-http以前,可能我们依然需要把Http情势的网上数据互换细节领会了然。数据互换双方是透过Http音讯类型Request和Response来实现的。在Akka-http中对应的是HttpRequest和HttpResponse。那三个连串都装有HttpEntity类型来装载需要交流的数量。首先,无论如何数据在线上的表现形式肯定是一串bytes。所以,数据交流六头Request,Response中的Entity也亟须是以bytes来表明的。在Akka-http里我们把需要传输的数额转换成ByteString,通过网络发送給接收端、接收端再把接受消息Entity中的ByteString转换成目标项目标多寡。这多少个转移过程就是Akka-http的马尔斯(Mars)halling和Unmarshalling过程了。大家先从HttpEntity的构建函数来打听它的定义:

  /**
   * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint.
   * Every materialization of the produced flow will attempt to establish a new outgoing connection.
   *
   * To configure additional settings for requests made using this method,
   * use the `akka.http.client` config section or pass in a [[akka.http.scaladsl.settings.ClientConnectionSettings]] explicitly.
   */
  def outgoingConnection(host: String, port: Int = 80,
                         localAddress: Option[InetSocketAddress] = None,
                         settings:     ClientConnectionSettings  = ClientConnectionSettings(system),
                         log:          LoggingAdapter            = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
    _outgoingConnection(host, port, settings.withLocalAddressOverride(localAddress), ConnectionContext.noEncryption(), ClientTransport.TCP, log)
object HttpEntity {
  implicit def apply(string: String): HttpEntity.Strict = apply(ContentTypes.`text/plain(UTF-8)`, string)
  implicit def apply(bytes: Array[Byte]): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, bytes)
  implicit def apply(data: ByteString): HttpEntity.Strict = apply(ContentTypes.`application/octet-stream`, data)
  def apply(contentType: ContentType.NonBinary, string: String): HttpEntity.Strict =
    if (string.isEmpty) empty(contentType) else apply(contentType, ByteString(string.getBytes(contentType.charset.nioCharset)))
  def apply(contentType: ContentType, bytes: Array[Byte]): HttpEntity.Strict =
    if (bytes.length == 0) empty(contentType) else apply(contentType, ByteString(bytes))
  def apply(contentType: ContentType, data: ByteString): HttpEntity.Strict =
    if (data.isEmpty) empty(contentType) else HttpEntity.Strict(contentType, data)

  def apply(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]): UniversalEntity =
    if (contentLength == 0) empty(contentType) else HttpEntity.Default(contentType, contentLength, data)
  def apply(contentType: ContentType, data: Source[ByteString, Any]): HttpEntity.Chunked =
    HttpEntity.Chunked.fromData(contentType, data)
...

大家见到:那一个函数实现了对Server端地址host+port的设定,重回的结果类型是Flow[HttpRequest,HttpResponse,Future[OutgoingConnection]]。这多少个Flow代表将输入的HttpRequest转换成输出的HttpResponse。这些转换过程包括了与Server之间的Request,Response信息互换。上边我们试着用这几个Flow来向Server端发送request,并收获response:

很通晓,HttpEntity可以分两大类,一种是Strict类型的,它的data是ByteString。另一种是UniversalEntity类型,它的数额dataBytes是Source[ByteString,Any]。无论如何最终在线上的或者ByteString。HttpEntity的ContentType声明了传输数据格式,有:

  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =
    Http().outgoingConnection("akka.io")

  def sendHttpRequest(req: HttpRequest) = {
    Source.single(req)
      .via(connFlow)
      .runWith(Sink.head)
  }

  sendHttpRequest(HttpRequest(uri="/"))
      .andThen{
        case Success(resp) => println(s"got response: ${resp.status.intValue()}")
        case Failure(err) => println(s"request failed: ${err.getMessage}")
      }
     .andThen {case _ => sys.terminate()}
object ContentTypes {
  val `application/json` = ContentType(MediaTypes.`application/json`)
  val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`)
  val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8`
  val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8`
  val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8`
  val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8`

  // used for explicitly suppressing the rendering of Content-Type headers on requests and responses
  val NoContentType = ContentType(MediaTypes.NoMediaType)
}

上边的那种形式就是所谓Connection-Level-Client-Side-Api。这种形式可以让用户有更大程度的自由度控制connection的构建、使用及在connection上发送request的不二法门。一般来讲,当重临response的entity被统统消耗后系统会自动close
connection,这套api还提供了一些手动方法可以在有亟待的意况出手动举行connection
close,如下:

留意:ContentType只是一种备注,不影响线上数据表达情势,线上的数目永远是ByteString。不过,其中的application/octet-stream类型代表数量必须是Source[ByteString,Any]。我们下边就通过客户端的事例来通晓HttpEntity。上面是一个客户端测试函数:

 //close connection by cancelling response entity
  resp.entity.dataBytes.runWith(Sink.cancelled)
 //close connection by receiving response with close header
  Http().bindAndHandleSync(
    { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) },
    "akka.io",
    80)(mat)
  def runService(request: HttpRequest, rentity: RequestEntity) = {
   val futResp = for {
     entity <- Future.successful(rentity)
     resp <- Http(sys).singleRequest(
       request.copy(entity = rentity)
     )
   } yield resp

   futResp
    .andThen {
      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        entity.dataBytes.map(_.utf8String).runForeach(println)
      case Success(r@HttpResponse(code, _, _, _)) =>
        println(s"Download request failed, response code: $code")
        r.discardEntityBytes()
      case Success(_) => println("Unable to download rows!")
      case Failure(err) => println(s"Download failed: ${err.getMessage}")

    }
  }

Akka-http客户端api还有一种实用的Host-Level-Client-Side-Api情势。这套api能自动针对各种端点维护一个连接池(connection-pool),用户只需对连接池举办配备。系统遵照连接池配置活动体贴池内线程的生、死、动、停。akka-http.host-connection-pool配置中max-connections,max-open-requests,pipelining-limit等决定着connection、在途request的数额,需要特别注意。针对某个端点的连接池是经过Http().cachedHostConnectionPool(endPoint)获取的。同样,获取的也是一个client-flow实例。因为系统活动敬服着线程池,所以client-flow实例可以肆意引用,无论调用次数与调用时间间隔。cachedHostConnectionPool()函数定义如下:

我们只需要对这些函数传入RequestEntity就可以领悟重回Response里Entity的诸多细节了。首先我们渴求服务端发送一个纯字符串Hello
World。服务端代码如下:

  /**
   * Same as [[#cachedHostConnectionPool]] but for encrypted (HTTPS) connections.
   *
   * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used
   * for encryption on the connections.
   *
   * To configure additional settings for the pool (and requests made using it),
   * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly.
   */
  def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443,
                                       connectionContext: HttpsConnectionContext = defaultClientHttpsContext,
                                       settings:          ConnectionPoolSettings = defaultConnectionPoolSettings,
                                       log:               LoggingAdapter         = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = {
    val cps = ConnectionPoolSetup(settings, connectionContext, log)
    val setup = HostConnectionPoolSetup(host, port, cps)
    cachedHostConnectionPool(setup)
  }
 } ~ path("text") {
      get {
        complete("Hello World!")
      } ~

函数重临结果类型:Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]。因为线程池内的线程是异步构建request和接收response的,而回到response的逐条未必按照发送request的逐条,所以需要一个tuple2的T类型标示request与重回的response举办匹配。线程池会遵照idle-timeout自动终止,也可以手动通过HostConnectionPool.shutDown()实现:

虽然complete(“Hello
World!”)有些迷糊,不过相应complete做了些字符串到ByteString的转换。大家可以从地点那一个run瑟维斯(Service)(Service)函数得到认证。下边是其一事例的调用:

  /**
   * Represents a connection pool to a specific target host and pool configuration.
   */
  final case class HostConnectionPool private[http] (setup: HostConnectionPoolSetup)(
    private[http] val gateway: PoolGateway) { // enable test access

    /**
     * Asynchronously triggers the shutdown of the host connection pool.
     *
     * The produced [[scala.concurrent.Future]] is fulfilled when the shutdown has been completed.
     */
    def shutdown()(implicit ec: ExecutionContextExecutor): Future[Done] = gateway.shutdown()

    private[http] def toJava = new akka.http.javadsl.HostConnectionPool {
      override def setup = HostConnectionPool.this.setup
      override def shutdown(executor: ExecutionContextExecutor): CompletionStage[Done] = HostConnectionPool.this.shutdown()(executor).toJava
    }
  }
  val reqText = HttpRequest(uri = s"http://localhost:8011/text")
  runService(reqText,HttpEntity.Empty)
    .andThen{case _ => sys.terminate()}

也可以透过Http().shutdownAllConnectionPools()五次性终止ActorSystem内所无线程池:

从出示的结果可以得出runService函数中的entity.dataBytes.map(_.utf8String)已经把ByteString转换成了String,也就是说服务器端发送的Entity里的多寡是ByteString。

  /**
   * Triggers an orderly shutdown of all host connections pools currently maintained by the [[akka.actor.ActorSystem]].
   * The returned future is completed when all pools that were live at the time of this method call
   * have completed their shutdown process.
   *
   * If existing pool client flows are re-used or new ones materialized concurrently with or after this
   * method call the respective connection pools will be restarted and not contribute to the returned future.
   */
  def shutdownAllConnectionPools(): Future[Unit] = {
    val shutdownCompletedPromise = Promise[Done]()
    poolMasterActorRef ! ShutdownAll(shutdownCompletedPromise)
    shutdownCompletedPromise.future.map(_ ⇒ ())(system.dispatcher)
  }

咱俩再试着发送一些数额給服务端,然后让服务端把结果通过response
entity重回来:

俺们用cachedHostConnectionPool获取一个client-flow实例:

    } ~ path("text") {
      get {
        complete("Hello World!")
      } ~
        post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }

Flow[(HttpRequest,T),(Try[HttpResponse],T),HostConnectionPool]后就足以开展输入HttpRequest到HttpResponse的转换处理。如下边的例证:

大家来看服务端对request
entity的操作是以ByteString举行的。客户端上传一串字符的request如下:

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80)

  def sendPoolRequest(req: HttpRequest, marker: Int) = {
    Source.single(req -> marker)
      .via(pooledFlow)
      .runWith(Sink.head)
  }

  sendPoolRequest(HttpRequest(uri="/"), 1)
    .andThen{
      case Success((tryResp, mk)) =>
        tryResp match {
          case Success(resp) => println(s"got response: ${resp.status.intValue()}")
          case Failure(err) => println(s"request failed: ${err.getMessage}")
        }
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }
    .andThen {case _ => sys.terminate()}
  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
  val uploadText = HttpEntity(
    ContentTypes.`text/plain(UTF-8)`,
    // transform each number to a chunk of bytes
    ByteString("hello world again")
  )
  runService(postText,uploadText)
    .andThen{case _ => sys.terminate()}

葡京网上娱乐场,在以上那几个例子里实际同样会遇见Connection-Level-Api所遇的的问题,这是因为获取的线程池内的线程如故有数的,只好解决因为request速率超出response速率所导致的request积压。近日最有效的法门如故经过拔取一个queue来暂存request后再逐个处理:

俺们可以看出放进entity里的数额是ByteString。

    val QueueSize = 10
    // This idea came initially from this blog post:
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
    val queue =
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
        .via(poolClientFlow)
        .toMat(Sink.foreach({
          case ((Success(resp), p)) => p.success(resp)
          case ((Failure(e), p))    => p.failure(e)
        }))(Keep.left)
        .run()

    def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
    }

    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
    responseFuture.andThen {
      case Success(resp) => println(s"got response: ${resp.status.intValue()}")
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }.andThen {case _ => sys.terminate()}

大家知晓Akka-http是遵照Akka-Stream的,具备Reactive-Stream功能特色。下边我们就示范一下如何进展stream的上传下载。首先定制一个Source:

下边是这次Akka-http-client-side-connection啄磨的示范源代码:

  val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)
import akka.actor._
import akka.http.javadsl.{HostConnectionPool, OutgoingConnection}
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._

import scala.concurrent._
import scala.util._

object ClientApiDemo extends App {
  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher
/*
  val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] =
    Http().outgoingConnection("www.sina.com")

  def sendHttpRequest(req: HttpRequest) = {
    Source.single(req)
      .via(connFlow)
      .runWith(Sink.head)
  }

  sendHttpRequest(HttpRequest(uri="/"))
      .andThen{
        case Success(resp) =>
          //close connection by cancelling response entity
          resp.entity.dataBytes.runWith(Sink.cancelled)
          println(s"got response: ${resp.status.intValue()}")
        case Failure(err) => println(s"request failed: ${err.getMessage}")
      }
  //   .andThen {case _ => sys.terminate()}


 //close connection by receiving response with close header
  Http().bindAndHandleSync(
    { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) },
    "akka.io",
    80)(mat)

  val pooledFlow: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool] =
      Http().cachedHostConnectionPool[Int](host="akka.io",port=80)

  def sendPoolRequest(req: HttpRequest, marker: Int) = {
    Source.single(req -> marker)
      .via(pooledFlow)
      .runWith(Sink.head)
  }

  sendPoolRequest(HttpRequest(uri="/"), 1)
    .andThen{
      case Success((tryResp, mk)) =>
        tryResp match {
          case Success(resp) => println(s"got response: ${resp.status.intValue()}")
          case Failure(err) => println(s"request failed: ${err.getMessage}")
        }
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }
    .andThen {case _ => sys.terminate()}
*/

    val QueueSize = 10
    // This idea came initially from this blog post:
    // http://kazuhiro.github.io/scala/akka/akka-http/akka-streams/2016/01/31/connection-pooling-with-akka-http-and-source-queue.html
    val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
    val queue =
      Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
        .via(poolClientFlow)
        .toMat(Sink.foreach({
          case ((Success(resp), p)) => p.success(resp)
          case ((Failure(e), p))    => p.failure(e)
        }))(Keep.left)
        .run()

    def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
    }

    val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
    responseFuture.andThen {
      case Success(resp) => println(s"got response: ${resp.status.intValue()}")
      case Failure(err) => println(s"request failed: ${err.getMessage}")
    }.andThen {case _ => sys.terminate()}

}

服务端也是用HttpEntity来装载那个Source然后由此HttpRequest传给客户端的:

 

  path("random") {
      get {
        complete(
          HttpEntity(
            ContentTypes.`application/octet-stream`,
            // transform each number to a chunk of bytes
            source.take(10000)
          )
        )
      } ~

 

咱俩在客户端仍旧用runService(Service)来分析传过来的entity。由于接到一个巨型的Source,所以需要修改一下收下格局代码:

 

   futResp
    .andThen {
      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
             Await.result(futEnt, Duration.Inf) // throws if binding fails
             println("End of stream!!!")
      case Success(r@HttpResponse(code, _, _, _)) =>
        println(s"Download request failed, response code: $code")
        r.discardEntityBytes()
      case Success(_) => println("Unable to download rows!")
      case Failure(err) => println(s"Download failed: ${err.getMessage}")

    }

 

用下面的艺术调用:

 

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
    runService(reqRandom,HttpEntity.Empty)
     .andThen{case _ => sys.terminate()}

 

再示范一下在客户端用Source上传数据。服务端代码:

 

       post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }

 

客户端上传数据范例:

 val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)

  val bytes = HttpEntity(
    ContentTypes.`application/octet-stream`,
    // transform each number to a chunk of bytes
    source.take(10000)
  )
  val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
  runService(postRandom,bytes)
    .andThen{case _ => sys.terminate()}

从下边琢磨咱们精通了在马尔斯hal,Unmarshal下层只是ByteString的操作和转移。下边是这一次研讨示范源代码:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.util.ByteString
import akka.http.scaladsl.model.HttpEntity._

import scala.util.Random

object ServerEntity extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)



  val route =
    path("random") {
      get {
        withoutSizeLimit {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              // transform each number to a chunk of bytes
              source.take(1000))
          )
        }
      } ~
        post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }
    } ~ path("text") {
      get {
        complete("Hello World!")
      } ~
        post {
          withoutSizeLimit {
            extractDataBytes { bytes =>
              val data = bytes.runFold(ByteString())(_ ++ _)
              onComplete(data) { t =>
                complete(t)
              }
            }
          }
        }
    }


  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._

import scala.concurrent.duration._
import akka.util.ByteString

import scala.concurrent._
import scala.util._

object ClientEntity extends App {

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  def runService(request: HttpRequest, rentity: RequestEntity) = {
   val futResp = for {
     entity <- Future.successful(rentity)
     resp <- Http(sys).singleRequest(
       request.copy(entity = rentity)
     )
   } yield resp

   futResp
    .andThen {
      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
        val futEnt = entity.dataBytes.map(_.utf8String).runForeach(println)
             Await.result(futEnt, Duration.Inf) // throws if binding fails
             println("End of stream!!!")
      case Success(r@HttpResponse(code, _, _, _)) =>
        println(s"Download request failed, response code: $code")
        r.discardEntityBytes()
      case Success(_) => println("Unable to download rows!")
      case Failure(err) => println(s"Download failed: ${err.getMessage}")

    }
  }

  val reqText = HttpRequest(uri = s"http://localhost:8011/text")
//  runService(reqText,HttpEntity.Empty)
//    .andThen{case _ => sys.terminate()}

  val postText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/text")
  val uploadText = HttpEntity(
    ContentTypes.`text/plain(UTF-8)`,
    // transform each number to a chunk of bytes
    ByteString("hello world again")
  )
//  runService(postText,uploadText)
//    .andThen{case _ => sys.terminate()}

  val reqRandom = HttpRequest(uri = s"http://localhost:8011/random")
 //   runService(reqRandom,HttpEntity.Empty)
 //    .andThen{case _ => sys.terminate()}

  val numbers = Source.fromIterator(() =>
    Iterator.continually(Random.nextInt()))
    .map(n => ByteString(s"$n\n"))
  //make conform to withoutSizeLimit constrain
  val source = limitableByteSource(numbers)

  val bytes = HttpEntity(
    ContentTypes.`application/octet-stream`,
    // transform each number to a chunk of bytes
    source.take(10000)
  )
  val postRandom = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/random")
  runService(postRandom,bytes)
    .andThen{case _ => sys.terminate()}


}