Ausgabe
Ich habe einen Datenrahmen wie folgt:
Rowkey timestamp col_1 col_2 col_3.... col_n
1234 165789 20 null 30 ... null
1234 155789 20 20 null ... 40
1234 145789 20 10 30 ... 50
und außer es in den folgenden Datenrahmen umzuwandeln:
Rowkey timestamp col_1 col_2 col_3.... col_n
1234 165789 20 20 30 ... 40
Ich möchte den neuesten Zeitstempel. Auch wenn eine Zelle ist null
und die folgende Zelle mit demselben Rowkey
einen Wert hat, dann sollte dieser Wert verwendet werden.
Ich verwende Spark mit Scala.
Lösung
Hier ist meine Meinung:
Verwenden Sie eine Fensterfunktion, um den ersten Nicht-Null-Wert jeder Rowkey
Partition auszuwählen, sortiert nach timestamp
– und löschen Sie dann Duplikate, um nur eine Zeile pro zu haben Rowkey
.
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val simpleData: Seq[(String, Integer,Integer,Integer,Integer,Integer)] = Seq(
("1234",165789,20,null,30, null),
("1234",155789,10,20,null, 40),
("1234",145789,2,10,30, 50),
("123e4",145789,2,10,30, 50)
)
val someDF = simpleData.toDF("Rowkey","timestamp","col_1","col_2","col_3","col_4")
someDF.show()
val listCols= List("Rowkey","timestamp","col_1","col_2","col_3","col_4")
val windowSpec = Window.partitionBy("Rowkey").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
someDF.select(
listCols.map(m=> first(m, true)
.over(windowSpec).alias(m)
) :_*
)
.dropDuplicates()
.show()
Ergebnis:
+------+---------+-----+-----+-----+-----+
|Rowkey|timestamp|col_1|col_2|col_3|col_4|
+------+---------+-----+-----+-----+-----+
| 1234| 165789| 20| null| 30| null|
| 1234| 155789| 10| 20| null| 40|
| 1234| 145789| 2| 10| 30| 50|
| 123e4| 145789| 2| 10| 30| 50|
+------+---------+-----+-----+-----+-----+
+------+---------+-----+-----+-----+-----+
|Rowkey|timestamp|col_1|col_2|col_3|col_4|
+------+---------+-----+-----+-----+-----+
| 1234| 165789| 20| 20| 30| 40|
| 123e4| 145789| 2| 10| 30| 50|
+------+---------+-----+-----+-----+-----+
Beantwortet von – Stéphane Guichard
Antwort geprüft von – Marie Seifert (FixError Admin)