Eu estou tentando escrever um trecho de código que faz o seguinte:-
- Lê-se um grande arquivo csv de origem remota, como s3.
- Processar o arquivo de registro por registro.
- Enviar notificação para o usuário
- Escrever a saída para um local remoto
Exemplo de registro de entrada de csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
O meu de entrada caso a classe que representa um recorde na entrada csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Exemplo de registro na saída csv (que precisa ser escrito):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Meu caso de saída de classe que representa um recorde na entrada csv:
case class OutputRecord(recordId: String, name: String, designation: String)
A leitura de um registro usando o akka fluxo de csv (usa Alpakka reativa s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Agora eu tenho uma função para processar os registros:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Função para gravar a OutputRecord como csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Função para enviar o e-mail de notificação:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Costura-lo todos juntos
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
Na Linha 15 e 16 eu estou recebendo um erro, eu sou capaz de adicionar a Linha 15 ou 16 da Linha, mas não tanto, pois tanto notify
& writeOutput
necessidades outputRecord
. Depois de notificar é chamado de eu perder a minha outputRecord
.
Existe uma maneira que eu possa adicionar notify
e writeOutput
para um mesmo gráfico?
Eu não estou olhando para a execução em paralelo, como eu quero para a primeira chamada notify
e, em seguida, apenas writeOutput
. Portanto, este não é útil: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
O caso de uso, parece muito simples para mim, mas de alguma forma eu não sou capaz de encontrar uma solução limpa.