Como preencher o Mapa[string,Dataframe] como uma coluna em uma Dataframe em scala

0

Pergunta

Eu tenho um Map[String, Dataframe]. Eu quero combinar todos os dados dentro desse Mapa em um único Dataframe. Pode uma dataframe tem uma coluna do Mapa de tipo de dados?

def sample(dfs : Map[String,Dataframe]): Dataframe =
{
.........
}

Exemplo:

DF1

id name age
1  aaa  23
2  bbb  34

DF2

game  time  score
ludo  10    20
rummy 30    40 

Eu passo os dois acima DFs como Mapa para a função. Em seguida, colocar os dados de cada dataframes em uma única coluna da saída dataframe como formato json.

fora do DF

+---------------------------------------------------------------------------------------+
| column1                                                                              |
+---------------------------------------------------------------------------------------+
| [{"id":"1","name":"aaa","age":"23"},{"id":21","name":"bbb","age":"24"}]               |
| [{"game":"ludo","time":"10","score":"20"},{"game":"rummy","time":"30","score":"40"}]  |
+---------------------------------------------------------------------------------------+
apache-spark dataframe dictionary scala
2021-11-23 13:42:20
2

Melhor resposta

1

Você está pedindo para gerar uma linha por dataframe. Cuidado, se um dos dataframes é grande o suficiente para que ele não pode ser contida em um único executor, esse código irá quebrar.

Primeiro vamos gerar dados e o mapa dfs do tipo Map[String, DataFrame].

val df1 = Seq((1, "aaa", 23), (2, "bbb", 34)).toDF("id", "name", "age")
val df2 = Seq(("ludo", 10, 20), ("rummy", 10, 40)).toDF("game", "time", "score")
dfs = Seq(df1, df2)

Em seguida, para cada dataframe do mapa, que geram duas colunas. big_map associa cada nome de coluna do dataframe o seu valor (cast, na seqüência de ter uma consistente tipo). df simplesmente contém o nome do dataframe. Temos, então, a união de todos os dataframes com reduce e grupo de name (essa é a parte onde todos dataframe acaba inteiramente em uma linha, e, portanto, um executor).

dfs
    .toSeq
    .map{ case (name, df) => df
        .select(map(
             df.columns.flatMap(c => Seq(lit(c), col(c).cast("string"))) : _*
        ) as "big_map")
        .withColumn("df", lit(name))}
    .reduce(_ union _)
    .groupBy("df")
    .agg(collect_list('big_map) as "column1")
    .show(false)
+---+-----------------------------------------------------------------------------------+
|df |column1                                                                            |
+---+-----------------------------------------------------------------------------------+
|df0|[{id -> 1, name -> aaa, age -> 23}, {id -> 2, name -> bbb, age -> 34}]             |
|df1|[{game -> ludo, time -> 10, score -> 20}, {game -> rummy, time -> 10, score -> 40}]|
+---+-----------------------------------------------------------------------------------+
2021-11-24 07:05:52
0

Aqui é uma solução específica para o seu caso de uso:

import org.apache.spark.sql._

def sample(dfs : Map[String, DataFrame])(implicit spark: SparkSession): DataFrame =
  dfs
    .values
    .foldLeft(spark.emptyDataFrame)((acc, df) => acc.union(df))

A vela de sessão é necessária para criar o vazio DataFrame acumulador de dobra.

Alternativamente, se você pode garantir a Map não é vazia.

def sample(dfs : Map[String, DataFrame]): DataFrame =
  dfs
    .values
    .reduce((acc, df) => acc.union(df))
2021-11-23 14:30:01

você pode por favor verificar a questão de eu ter adicionado um exemplo agora.Cada entrada dataframe que eu recebo tem comparação esquemas então eu quero que os dados de uma entrada inteira dataframe para ser preenchido como uma coluna ,por isso a minha saída dataframe tem dados de cada entrada dataframe em uma coluna
minnu

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................