Ausgabe
Ich arbeite in einem Scala + Spark-Projekt, in dem wir Daten aus einer Datei in PostgreSQL laden. Es läuft gut lokal im Standalone-Modus mit kleinen Testdaten unter Verwendung von jdbc.write.
Da die Produktionsdaten jedoch riesig sind, möchte ich einen Cluster mit mehreren Workern und 1 logischen Prozessorkern pro Ausführender verwenden.
Wie partitioniere ich vor diesem Hintergrund die Daten zwischen allen verfügbaren Kernen im Cluster?
Vielen Dank!
PS: Mit Scala 2.13.9 und Spark 3.3.0
Lösung
Wenn Sie die dynamische Zuordnung verwenden und Ihr Cluster von gleichzeitigen Jobs verwendet wird, kann es schwierig sein, die Anzahl der Partitionen genau gleich der Anzahl der Kerne zu erhalten, die Ihr Job möglicherweise verwendet, da Sie diese Zahl nicht im Voraus kennen und nicht berechnen können es dynamisch.
Sie können versuchen, eine beliebige Zahl herauszufinden und den jdbc-Parameter numPartitions auf die Anzahl der Partitionen zu setzen, die Sie beim Schreiben verwenden möchten. Mit diesem Parameter partitioniert Spark dieses Dataset vor dem Schreiben neu, und Sie erhalten am Ende eine Anzahl von Aufgaben beim Schreiben, die gleich numPartitions ist. Denken Sie daran, dass jede parallel geschriebene Aufgabe = 1 jdbc-Verbindung ist, also seien Sie sich bewusst, dass Ihr PostreSQL überlaufen kann
numPartitions (keine) Die maximale Anzahl von Partitionen, die für Parallelität beim Lesen und Schreiben von Tabellen verwendet werden können . Dies bestimmt auch die maximale Anzahl gleichzeitiger JDBC-Verbindungen . Wenn die Anzahl der zu schreibenden Partitionen dieses Limit überschreitet, verringern wir sie auf dieses Limit, indem wir coalesce(numPartitions) vor dem Schreiben aufrufen.
lesen Schreiben
Beantwortet von – M_S
Antwort geprüft von – David Marino (FixError Volunteer)