Akka Fluxo continuamente consumir websocket

0

Pergunta

Im meio novo para Scala e Akka Fluxo e estou tentando obter JSON Seqüência de mensagens a partir de um websocket e empurrá-los para um Kafka tópico.

Por agora estou só trabalhando em "obter mensagens de um ws" parte.

Mensagens provenientes de websocket parecido com este :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

Eu quero dividir esta mensagem JSON para várias mensagens :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

E, em seguida, empurre cada uma dessas mensagens para um kafka tópico.

Aqui está o que eu consegui até agora :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

Ele está trabalhando, estou tendo o resultado esperado mensagem Json, mas eu queria saber se eu poderia escrever este produtor de uma forma mais "Akka-ish", estilo, como o uso de GraphDSL. Então, eu tenho algumas perguntas :

  • É possível continuamente consumir um WebSocket usando um GraphDSL ? Se sim, você pode me mostrar um exemplo, por favor ?
  • É uma boa idéia para consumir o WS usando um GraphDSL ?
  • Eu deveria decompor o recebeu Mensagem Json como estou fazendo antes de enviá-lo para kafka ? Ou é melhor enviá-lo como ele é para diminuir a latência ?
  • Depois de produzir a mensagem de Kafka, estou planejando para consumi-lo usando o Apache Tempestade, é uma boa idéia ? Ou devo ficar com Akka ?

Obrigado por ler-me, Atenciosamente, Arès

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

Melhor resposta

1

Que código é bastante Akka-ish: scaladsl é tão Akka como o GraphDSL ou a implementação de um personalizado GraphStage. A única razão, IMO/E, para ir para o GraphDSL é se a forma do gráfico não estão expressas no scaladsl.

Eu poderia ir pessoalmente a rota de definição de uma CoinPrice classe para fazer o modelo explícito

case class CoinPrice(coin: String, price: BigDecimal)

E, em seguida, ter uma Flow[Message, CoinPrice, NotUsed] que analisa 1 mensagem recebida em zero ou mais CoinPrices. Algo (usando Jogar JSON aqui) como:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

Você pode, dependendo do que o tamanho da JSONs na mensagem são, querem quebrar que em diferentes sequência de fases para permitir um assíncrono fronteira entre o JSON analisar e a extração em CoinPrices. Por exemplo,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

No exemplo acima, as fases em ambos os lados da async limite será executado em separado atores e, assim, possivelmente, simultaneamente (se houver um número suficiente de núcleos de CPU disponível, etc.), o custo de sobrecarga extra para os atores, para coordenar e trocar mensagens. Extra coordenação/sobrecarga de comunicação (cf. Gunther Universal Escalabilidade da Lei), só vai valer a pena se a objetos JSON são suficientemente grandes e chegando de forma suficientemente rápida (consistentemente vindo antes que a anterior tenha acabado de processamento).

Se a sua intenção é consumir o websocket até que o programa pára, você pode encontrá-lo mais claro para só usar Source.never[Message].

2021-11-21 12:42:30

Obrigado por responder, é muito clara, eu só tenho uma pergunta tho. Como posso quebrar a minha resposta em diferentes sequência de fases ? Você pode apenas me mostre um pouco de exemplo, por favor ? Ou orientar-me para a boa parte da documentação ?
Arès

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................