Akka fluxo de Entrada (`Em`) como de Saída ("Out")

0

Pergunta

Eu estou tentando escrever um trecho de código que faz o seguinte:-

  1. Lê-se um grande arquivo csv de origem remota, como s3.
  2. Processar o arquivo de registro por registro.
  3. Enviar notificação para o usuário
  4. Escrever a saída para um local remoto

Exemplo de registro de entrada de csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

O meu de entrada caso a classe que representa um recorde na entrada csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Exemplo de registro na saída csv (que precisa ser escrito):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Meu caso de saída de classe que representa um recorde na entrada csv:

case class OutputRecord(recordId: String, name: String, designation: String)

A leitura de um registro usando o akka fluxo de csv (usa Alpakka reativa s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Agora eu tenho uma função para processar os registros:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Função para gravar a OutputRecord como csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Função para enviar o e-mail de notificação:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Costura-lo todos juntos

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

Na Linha 15 e 16 eu estou recebendo um erro, eu sou capaz de adicionar a Linha 15 ou 16 da Linha, mas não tanto, pois tanto notify & writeOutput necessidades outputRecord. Depois de notificar é chamado de eu perder a minha outputRecord.

Existe uma maneira que eu possa adicionar notify e writeOutput para um mesmo gráfico?

Eu não estou olhando para a execução em paralelo, como eu quero para a primeira chamada notify e, em seguida, apenas writeOutput. Portanto, este não é útil: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

O caso de uso, parece muito simples para mim, mas de alguma forma eu não sou capaz de encontrar uma solução limpa.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Melhor resposta

1

A saída de notify é um PushResultmas a entrada de writeOutput é ByteString. Uma vez que você a mudança que ele irá compilar. No caso de você precisar ByteString, obter o mesmo a partir de OutputRecord.

BTW, no exemplo de código que você forneceu, um erro semelhante existe em readCSV e process.

2021-11-24 03:36:16

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................