Concatinating dois Fluxos em Akka stream

0

Pergunta

Eu estou tentando concat dois Fluxos e eu não sou capaz de explicar a saída do meu implementação.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

Espero que o seguinte resultado deste código.

2
3
4
.
.
.
11
10
20
.
.
.
100

Em vez disso, vejo apenas "2" que está sendo impresso. Você pode por favor explicar o que está errado na minha implmentation e como devo mudar o programa para obter a saída desejada.

akka akka-stream scala
2021-10-21 17:29:00
2

Melhor resposta

3

A partir de Akka Stream API do google docs:

Concat:

Emite-se quando o fluxo atual tem um elemento disponível; se a corrente de entrada for concluída, ele tenta o seguinte

Broadcast:

Emite quando todas as saídas de pára backpressuring e há um elemento de entrada disponível

Os dois operadores não funcionam em conjunto, como há um conflito em como eles funcionam -- Concat tenta puxar todos os elementos de uma das Broadcast's saídas antes de mudar para o outro, enquanto Broadcast não emitem menos que há demanda para TODAS as suas saídas.

Para o que você precisa, você pode concatenar usando concat como sugerido pelos comentadores:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

ou equivalentemente, use Source.combine como abaixo:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

Usando GraphDSL, que é uma versão simplificada da implementação de Origem.combinar:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................