阿卡流+阿卡的Http传递参数
问题描述:
我有下面的代码片段:阿卡流+阿卡的Http传递参数
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????))
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
,并在某处,我发短信给流量代码:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
我的问题是该方法解析应该从原来如此类
所以对于第一条消息使用参数2应该是
parse(response,"b")
和第二条消息应该是
parse(response,"b1")
所以现在的问题是,我怎么能取从我提交给流方法的参数?
答
假设您的connection
值正在通过
val connection = Http().cachedHostConnectionPool(...)
实例可以使用该连接发生在一个元组的事实,而不是简单地传递request
两次元组可以在输入SomeClass
通过。此SomeClass
实例将不得不经过您的每个Flow
值才能进入解析阶段。
修改你的代码位:
val getDocumentFlow =
Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map(d => d -> f))
你的问题没有说明从getDocumentById
返回类型,所以我只是用Document
:
val documentToRequest =
Flow[(Document, SomeClass)] map { case (document, someClass) =>
val request = ...
(request, someClass)
}
val parseResponse =
Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)), someClass) =>
entity
.dataBytes
.runFold(ByteString(""))(_ ++ _)
.map(e => e -> someClass)
}
val parseEntity = Flow[(ByteString, SomeClass)] map {
case (entity, someClass) => parse(entity.utf8String, someClass)
}
这些流可以被用来作为在问题中描述:
val someFlow =
someClassActorSource
.via(getDocumentFlow)
.via(documentToRequest)
.via(connection)
.via(parseResponse)
.via(parseEntity)
.to(Sink.someSink{...})
.run()