[FIXED] Unterabfrage vs. Dataframe-Filterfunktion in Spark

Ausgabe

Ich führe das folgende Spark-SQL mit der Unterabfrage aus.

val df = spark.sql("""select * from employeesTableTempview where dep_id in (select dep_id from departmentTableTempview)""")
df.count()

Ich führe dasselbe auch mit Hilfe der Datenrahmenfunktion wie unten aus. Nehmen wir an, wir lesen die Mitarbeitertabelle und die Abteilungstabelle als Datenrahmen und ihre Namen sollten empDF bzw. DepDF lauten.

val depidList = DepDF.map(x=>x(0).string).collect().toList()
val empdf2 = empDF.filter(col("dep_id").isin(depidList:_*))
empdf2.count

Welches bietet in diesen beiden oben genannten Szenarien eine bessere Leistung und warum? Bitte helfen Sie mir, diese Szenarien in Spark Scala zu verstehen.

Lösung

Ich kann Ihnen klassische Antwort geben: es kommt darauf an 😀

Schauen wir uns den ersten Fall an. Ich habe ein ähnliches Beispiel vorbereitet:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val data = Seq(("test", "3"),("test", "3"), ("test2", "5"), ("test3", "7"), ("test55", "86"))
val data2 = Seq(("test", "3"),("test", "3"), ("test2", "5"), ("test3", "6"), ("test33", "76"))

val df1 = data.toDF("name", "dep_id")
val df2 = data2.toDF("name", "dep_id")

df1.createOrReplaceTempView("employeesTableTempview")
df2.createOrReplaceTempView("departmentTableTempview")

val result = spark.sql("select * from employeesTableTempview where dep_id in (select dep_id from departmentTableTempview)")
result.count

Ich setze autoBroadcastJoinThreshold auf -1, weil ich annehme, dass Ihre Datensätze für diesen Parameter größer als die standardmäßigen 10 MB sein werden

Diese SQL-Abfrage generiert diesen Plan:

Geben Sie hier die Bildbeschreibung ein

Wie Sie sehen können, führt Spark eine SMJ durch, was meistens bei Datensätzen mit mehr als 10 MB der Fall sein wird. Dies erfordert, dass Daten gemischt und dann sortiert werden, damit sie leise und schwer arbeiten

Lassen Sie uns nun Option2 überprüfen (die ersten Codezeilen sind die gleichen wie zuvor):

val depidList = df1.map(x=>x.getString(1)).collect().toList
val empdf2 = df2.filter(col("dep_id").isin(depidList:_*))
empdf2.count

Bei diesem Optionsplan ist das anders. Sie haben offensichtlich keinen Join, aber es gibt zwei separate SQLs. Zuerst wird der DepDF-Datensatz gelesen und dann eine Spalte als Liste gesammelt. In der zweiten SQL wird diese Liste verwendet, um die Daten im empDF-Datensatz zu filtern.

Wenn DepDF relativ klein ist, sollte es in Ordnung sein, aber wenn Sie eine allgemeinere Lösung benötigen, können Sie sich an die Unterabfrage halten, die zum Verbinden aufgelöst wird. Sie können Join auch direkt in Ihren Datenrahmen mit Spark df api verwenden


Beantwortet von –
M_S


Antwort geprüft von –
Dawn Plyler (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like