如何使用Play 2.5在分块响应中将Anorm大型查询结果流式传输到客户端

问题描述:

我有一个非常大的结果集(60k +记录列),我从数据库中提取并使用Anorm进行分析(尽管我可以使用play的默认数据访问根据需要返回ResultSet的模块)。我需要将这些结果直接转换并传输到客户端(不需要将它们保存在内存中的大列表中),然后直接将它们下载到客户端计算机上的文件中。如何使用Play 2.5在分块响应中将Anorm大型查询结果流式传输到客户端

我一直在参考ScalaStream 2.5.x Play文档中分块响应部分中演示的内容。我在实现它显示的“getDataStream”部分时遇到了困难。

我也一直在参考ScalaAnorm 2.5.x Play文档中的Streaming Results和Iteratee部分演示的内容。我曾尝试通过管道将结果像这里返回什么枚举:

val resultsEnumerator = Iteratees.from(SQL"SELECT * FROM Test", SqlParser.str("colName")) 

val dataContent = Source.fromPublisher(Streams.enumeratorToPublisher(resultsEnumerator)) 
Ok.chunked(dataContent).withHeaders(("ContentType","application/x-download"),("Content-disposition","attachment; filename=myDataFile.csv")) 

但生成的文件/内容是空的。

我无法找到如何将函数返回像这样的数据服务转换任何示例代码或引用:

@annotation.tailrec 
def go(c: Option[Cursor], l: List[String]): List[String] = c match { 
    case Some(cursor) => { 
    if (l.size == 10000000) l // custom limit, partial processing 
    else { 
     go(cursor.next, l :+ cursor.row[String]("VBU_NUM")) 
    } 
    } 
    case _ => l 
} 

val sqlString = s"select colName FROM ${tableName} WHERE ${whereClauseStr}" 

val results : Either[List[Throwable], List[String]] = SQL(sqlString).withResult(go(_, List.empty[String])) 
results 

到的东西我可以传递给Ok.chunked()。

所以基本上我的问题是,我应该如何将数据库中的每条记录提取到一个流中,我可以将其转换并发送到客户端作为可以下载到文件的分块响应?

我不希望为此使用Slick。但是我可以使用不使用Anorm的解决方案,只使用返回原始java.sql.ResultSet对象并使用它的播放dbApi对象。

+0

没有必要使用转换'Iteratee'与ANORM /播放2.5.X:https://github.com/playframework/anorm/blob/master/ docc/manual/working/scalaAnorm.md#akka-stream – cchantep

+0

@cchantep是正确的,您想要使用Akka Streams支持而不是Iteratees(在'anorm-akka' 2.5.2)。不幸的是,一些有用的部分(例如,获取完成值以允许您关闭数据库连接)仍在Anorm 2.6.0-SNAPSHOT中,尚未发布。 – Mikesname

+0

对于资源管理来说,Iteratee支持并不是更好,所以这不会改变这一点,直接使用Akka支持会更简单。 – cchantep

在参考了Anorm Akka Support文档以及大量的试验和错误之后,我能够实现我想要的解决方案。我不得不添加这些依赖关系

"com.typesafe.play" % "anorm_2.11" % "2.5.2", 
"com.typesafe.play" % "anorm-akka_2.11" % "2.5.2", 
"com.typesafe.akka" %% "akka-stream" % "2.4.4" 

to by build.sbt for Play 2.5。

和我实现了这样的事情

//...play imports 
import anorm.SqlParser._ 
import anorm._ 

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 

... 

private implicit val akkaActorSystem = ActorSystem("MyAkkaActorSytem") 
private implicit val materializer = ActorMaterializer() 

def streamedAnormResultResponse() = Action { 
    implicit val connection = db.getConnection() 

    val parser: RowParser[...] = ... 
    val sqlQuery: SqlQuery = SQL("SELECT * FROM table") 

    val source: Source[Map[String, Any] = AkkaStream.source(sqlQuery, parser, ColumnAliaser.empty).alsoTo(Sink.onComplete({ 
    case Success(v) => 
     connection.close() 
    case Failure(e) => 
     println("Info from the exception: " + e.getMessage) 
     connection.close() 
    })) 

    Ok.chunked(source) 
} 
+0

像这样@elephantopus使用'alsoTo'安全吗?我们最想知道的是流的完成权?我想知道这种方法是否有任何副作用,如性能问题等。 – endertunc

+0

是的,它的唯一目的是在完成流时关闭连接,因为它是异步的,对性能没有影响(我注意到了)。虽然我没有彻底测试过 – Elephantopus