Im meio novo para Scala e Akka Fluxo e estou tentando obter JSON Seqüência de mensagens a partir de um websocket e empurrá-los para um Kafka tópico.
Por agora estou só trabalhando em "obter mensagens de um ws" parte.
Mensagens provenientes de websocket parecido com este :
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
Eu quero dividir esta mensagem JSON para várias mensagens :
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
E, em seguida, empurre cada uma dessas mensagens para um kafka tópico.
Aqui está o que eu consegui até agora :
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
Ele está trabalhando, estou tendo o resultado esperado mensagem Json, mas eu queria saber se eu poderia escrever este produtor de uma forma mais "Akka-ish", estilo, como o uso de GraphDSL. Então, eu tenho algumas perguntas :
- É possível continuamente consumir um WebSocket usando um GraphDSL ? Se sim, você pode me mostrar um exemplo, por favor ?
- É uma boa idéia para consumir o WS usando um GraphDSL ?
- Eu deveria decompor o recebeu Mensagem Json como estou fazendo antes de enviá-lo para kafka ? Ou é melhor enviá-lo como ele é para diminuir a latência ?
- Depois de produzir a mensagem de Kafka, estou planejando para consumi-lo usando o Apache Tempestade, é uma boa idéia ? Ou devo ficar com Akka ?
Obrigado por ler-me, Atenciosamente, Arès