如何使用akka在scala中使用TLS打开TCP连接
我想编写一个Scala客户端,该客户端通过与TLS的tcp连接谈论专有协议。如何使用akka在scala中使用TLS打开TCP连接
基本上,我想重写从Node.js的下面的代码在斯卡拉:
var conn_options = {
host: endpoint,
port: port
};
tlsSocket = tls.connect(conn_options, function() {
if (tlsSocket.authorized) {
logger.info('Successfully established a connection');
// Now that the connection has been established, let's perform the handshake
// Identification frame:
// 1 | I | id_size | id
var idFrameTypeAndVersion = "1I";
var clientIdString = "foorbar";
var idDataBuffer = new Buffer(idFrameTypeAndVersion.length + 1 + clientIdString.length);
idDataBuffer.write(idFrameTypeAndVersion, 0 ,
idFrameTypeAndVersion.length);
idDataBuffer.writeUIntBE(clientIdString.length,
idFrameTypeAndVersion.length, 1);
idDataBuffer.write(clientIdString, idFrameTypeAndVersion.length + 1, clientIdString.length);
// Send the identification frame to Logmet
tlsSocket.write(idDataBuffer);
}
...
}
从akka documentation我发现阿卡一个很好的例子,通过纯TCP,但我不知道如何提升该示例使用TLS套接字连接。有一些较早版本的文档显示with ssl/tls示例,但在较新版本中未找到。
我发现了关于Akka中一个TLS对象的文档,但是我没有在它周围找到任何好的示例。
非常感谢提前!
了解它与下面的代码工作,并希望分享。
基本上,我开始看着从akka社区获得的TcpTlsEcho.java。
我跟着akka-streams的文档。示出和说明阿卡流的使用的另一种很好的例子可以在下列blog post
连接建立被发现和流动的样子:
/**
+---------------------------+ +---------------------------+
| Flow | | tlsConnectionFlow |
| | | |
| +------+ +------+ | | +------+ +------+ |
| | SRC | ~Out~> | | ~~> O2 -- I1 ~~> | | ~O1~> | | |
| | | | LOGG | | | | TLS | | CONN | |
| | SINK | <~In~ | | <~~ I2 -- O2 <~~ | | <~I2~ | | |
| +------+ +------+ | | +------+ +------+ |
+---------------------------+ +---------------------------+
**/
// the tcp connection to the server
val connection = Tcp().outgoingConnection(address, port)
// ignore the received data for now. There are different actions to implement the Sink.
val sink = Sink.ignore
// create a source as an actor reference
val source = Source.actorRef(1000, OverflowStrategy.fail)
// join the TLS BidiFlow (see below) with the connection
val tlsConnectionFlow = tlsStage(TLSRole.client).join(connection)
// run the source with the TLS conection flow that is joined with a logging step that prints the bytes that are sent and or received from the connection.
val sourceActor = tlsConnectionFlow.join(logging).to(sink).runWith(source)
// send a message to the sourceActor that will be send to the Source of the stream
sourceActor ! ByteString("<message>")
的TLS连接流是一个BidiFlow。我的第一个简单例子忽略了所有证书,并避免管理信任和密钥存储。示例如何完成可以在上面的.java示例中找到。
def tlsStage(role: TLSRole)(implicit system: ActorSystem) = {
val sslConfig = AkkaSSLConfig.get(system)
val config = sslConfig.config
// create a ssl-context that ignores self-signed certificates
implicit val sslContext: SSLContext = {
object WideOpenX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String) =()
override def checkServerTrusted(chain: Array[X509Certificate], authType: String) =()
override def getAcceptedIssuers = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(WideOpenX509TrustManager), null)
context
}
// protocols
val defaultParams = sslContext.getDefaultSSLParameters()
val defaultProtocols = defaultParams.getProtocols()
val protocols = sslConfig.configureProtocols(defaultProtocols, config)
defaultParams.setProtocols(protocols)
// ciphers
val defaultCiphers = defaultParams.getCipherSuites()
val cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, config)
defaultParams.setCipherSuites(cipherSuites)
val firstSession = new TLSProtocol.NegotiateNewSession(None, None, None, None)
.withCipherSuites(cipherSuites: _*)
.withProtocols(protocols: _*)
.withParameters(defaultParams)
val clientAuth = getClientAuth(config.sslParametersConfig.clientAuth)
clientAuth map { firstSession.withClientAuth(_) }
val tls = TLS.apply(sslContext, firstSession, role)
val pf: PartialFunction[TLSProtocol.SslTlsInbound, ByteString] = {
case TLSProtocol.SessionBytes(_, sb) => ByteString.fromByteBuffer(sb.asByteBuffer)
}
val tlsSupport = BidiFlow.fromFlows(
Flow[ByteString].map(TLSProtocol.SendBytes),
Flow[TLSProtocol.SslTlsInbound].collect(pf));
tlsSupport.atop(tls);
}
def getClientAuth(auth: ClientAuth) = {
if (auth.equals(ClientAuth.want)) {
Some(TLSClientAuth.want)
} else if (auth.equals(ClientAuth.need)) {
Some(TLSClientAuth.need)
} else if (auth.equals(ClientAuth.none)) {
Some(TLSClientAuth.none)
} else {
None
}
}
并且为了完成,还有作为BidiFlow实施的记录阶段。
def logging: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
// function that takes a string, prints it with some fixed prefix in front and returns the string again
def logger(prefix: String) = (chunk: ByteString) => {
println(prefix + chunk.utf8String)
chunk
}
val inputLogger = logger("> ")
val outputLogger = logger("< ")
// create BidiFlow with a separate logger function for each of both streams
BidiFlow.fromFunctions(outputLogger, inputLogger)
}
我会进一步尝试改进和更新答案。希望有所帮助。
欢迎使用堆栈溢出!感谢您分享您的答案。这对我来说太具体了,但可能会帮助其他人。 –
我真的很喜欢Jeremias Werner的回答,因为它让我有我需要的地方。然而,我想提供下面的代码(受他的回答影响很大),作为一个“一个剪切和粘贴”解决方案,使用尽可能少的代码创建实际的TLS服务器 。
import javax.net.ssl.SSLContext
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.TLSProtocol.NegotiateNewSession
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, TLS, Tcp}
import akka.stream.{ActorMaterializer, OverflowStrategy, TLSProtocol, TLSRole}
import akka.util.ByteString
object TlsClient {
// Flow needed for TLS as well as mapping the TLS engine's flow to ByteStrings
def tlsClientLayer = {
// Default SSL context supporting most protocols and ciphers. Embellish this as you need
// by constructing your own SSLContext and NegotiateNewSession instances.
val tls = TLS(SSLContext.getDefault, NegotiateNewSession.withDefaults, TLSRole.client)
// Maps the TLS stream to a ByteString
val tlsSupport = BidiFlow.fromFlows(
Flow[ByteString].map(TLSProtocol.SendBytes),
Flow[TLSProtocol.SslTlsInbound].collect {
case TLSProtocol.SessionBytes(_, sb) => ByteString.fromByteBuffer(sb.asByteBuffer)
})
tlsSupport.atop(tls)
}
// Very simple logger
def logging: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
// function that takes a string, prints it with some fixed prefix in front and returns the string again
def logger(prefix: String) = (chunk: ByteString) => {
println(prefix + chunk.utf8String)
chunk
}
val inputLogger = logger("> ")
val outputLogger = logger("< ")
// create BidiFlow with a separate logger function for each of both streams
BidiFlow.fromFunctions(outputLogger, inputLogger)
}
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("sip-client")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val source = Source.actorRef(1000, OverflowStrategy.fail)
val connection = Tcp().outgoingConnection("www.google.com", 443)
val tlsFlow = tlsClientLayer.join(connection)
val srcActor = tlsFlow.join(logging).to(Sink.ignore).runWith(source)
// I show HTTP here but send/receive your protocol over this actor
// Should respond with a 302 (Found) and a small explanatory HTML message
srcActor ! ByteString("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n")
}
}
TLS支持的使用示例可以在其单元测试中找到。 https://github.com/akka/akka/blob/master/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala。希望能帮助到你! –