[FIXED] Wie erstelle ich einen Rang basierend auf dem Schwellenwert in Spark?

Ausgabe

Angenommen, ich habe einen Datenrahmen:

val df = Seq(
    (1,"A"),
    (1,"B"),
    (1,"C"),
    (1,"D"),
    (1,"E"),
    (1,"F"),
    (1,"G"),
    (1,"H"),
    (2,"I"),
    (2,"J"),
    (2,"J"),
    (2,"J"),
    (3,"K"),
).toDF("id", "code")

Ich muss es basierend auf IDs und in Bezug auf einen Schwellenwert einordnen. Beispiel:

Schwelle = 3

id code rank
1  A    1
1  B    1
1  C    1 -- threshold has been reached
1  D    2  
1  E    2
1  F    2 -- threshold has been reached
1  G    3  
1  H    3

2  I    1
2  J    1
2  J    1 -- threshold has been reached
2  J    2

3  K    1

Wie kann ich es tun?

Ich kann einen einfachen Rang erstellen:

df.withColumn("rank", dense_rank().over(Window.orderBy("id")))

Aber wie teilt man Ranggruppen nach Schwellenwerten auf?

Lösung

Eine Lösung, bei der nicht alle Daten in eine Partition verschoben werden müssen:

//get the largest number of equal ids
val maxGroupSize = df.groupBy("id").count().agg(max("count")).first().getLong(0)

val threshold = 3

var f = maxGroupSize
while( f % threshold>0) f=f+1

df.withColumn("tmp1", 'id* f)
  .withColumn("tmp2", dense_rank().over(Window.partitionBy("id").orderBy("code"))-1)
  .withColumn("tmp3", 'tmp1+'tmp2)
  .withColumn("rank", ('tmp3 / threshold).cast("int"))

Ergebnis:

+---+----+----+----+----+----+
| id|code|tmp1|tmp2|tmp3|rank|
+---+----+----+----+----+----+
|  1|   A|   9|   0|   9|   3|
|  1|   B|   9|   1|  10|   3|
|  1|   C|   9|   2|  11|   3|
|  1|   D|   9|   3|  12|   4|
|  1|   E|   9|   4|  13|   4|
|  1|   F|   9|   5|  14|   4|
|  1|   G|   9|   6|  15|   5|
|  1|   H|   9|   7|  16|   5|
|  2|   I|  18|   0|  18|   6|
|  2|   J|  18|   1|  19|   6|
|  3|   K|  27|   0|  27|   9|
+---+----+----+----+----+----+

Der Nachteil dieses Ansatzes besteht darin, dass die Ränge nicht aufeinander folgen. Das könnte man mit einem anderen Fenster beheben

df.withColumn("rank2", dense_rank().over(Window.orderBy("rank")))

aber dies würde wieder alle Daten zu einem einzelnen Executor verschieben.


Beantwortet von –
werner


Antwort geprüft von –
Dawn Plyler (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like