Ausgabe
Angenommen, ich habe einen Datenrahmen:
val df = Seq(
(1,"A"),
(1,"B"),
(1,"C"),
(1,"D"),
(1,"E"),
(1,"F"),
(1,"G"),
(1,"H"),
(2,"I"),
(2,"J"),
(2,"J"),
(2,"J"),
(3,"K"),
).toDF("id", "code")
Ich muss es basierend auf IDs und in Bezug auf einen Schwellenwert einordnen. Beispiel:
Schwelle = 3
id code rank
1 A 1
1 B 1
1 C 1 -- threshold has been reached
1 D 2
1 E 2
1 F 2 -- threshold has been reached
1 G 3
1 H 3
2 I 1
2 J 1
2 J 1 -- threshold has been reached
2 J 2
3 K 1
Wie kann ich es tun?
Ich kann einen einfachen Rang erstellen:
df.withColumn("rank", dense_rank().over(Window.orderBy("id")))
Aber wie teilt man Ranggruppen nach Schwellenwerten auf?
Lösung
Eine Lösung, bei der nicht alle Daten in eine Partition verschoben werden müssen:
//get the largest number of equal ids
val maxGroupSize = df.groupBy("id").count().agg(max("count")).first().getLong(0)
val threshold = 3
var f = maxGroupSize
while( f % threshold>0) f=f+1
df.withColumn("tmp1", 'id* f)
.withColumn("tmp2", dense_rank().over(Window.partitionBy("id").orderBy("code"))-1)
.withColumn("tmp3", 'tmp1+'tmp2)
.withColumn("rank", ('tmp3 / threshold).cast("int"))
Ergebnis:
+---+----+----+----+----+----+
| id|code|tmp1|tmp2|tmp3|rank|
+---+----+----+----+----+----+
| 1| A| 9| 0| 9| 3|
| 1| B| 9| 1| 10| 3|
| 1| C| 9| 2| 11| 3|
| 1| D| 9| 3| 12| 4|
| 1| E| 9| 4| 13| 4|
| 1| F| 9| 5| 14| 4|
| 1| G| 9| 6| 15| 5|
| 1| H| 9| 7| 16| 5|
| 2| I| 18| 0| 18| 6|
| 2| J| 18| 1| 19| 6|
| 3| K| 27| 0| 27| 9|
+---+----+----+----+----+----+
Der Nachteil dieses Ansatzes besteht darin, dass die Ränge nicht aufeinander folgen. Das könnte man mit einem anderen Fenster beheben
df.withColumn("rank2", dense_rank().over(Window.orderBy("rank")))
aber dies würde wieder alle Daten zu einem einzelnen Executor verschieben.
Beantwortet von – werner
Antwort geprüft von – Dawn Plyler (FixError Volunteer)