Ausgabe
Ich habe eine Funktion, deren Zweck es ist, einen Datensatz in Arrays einer bestimmten Größe zu unterteilen.
Zum Beispiel – ich habe ein Dataset mit 123 Objekten vom Typ Foo, ich gebe der Funktion arraysSize 10 an, also habe ich als Ergebnis ein Dataset[Array[Foo]] mit 12 Arrays mit 10 Foo und 1 Array mit 3 Foo. Im Moment arbeitet die Funktion an gesammelten Daten – ich würde sie aufgrund der Leistung gerne datensatzbasiert ändern, aber ich weiß nicht wie.
Das ist meine aktuelle Lösung:
private def mapToFooArrays(data: Dataset[Foo],
arraysSize: Int): Dataset[Array[Foo]]= {
data.collect().grouped(arraysSize).toSeq.toDS()
}
Der Grund für diese Transformation liegt darin, dass die Daten im Ereignis gesendet werden. Anstatt 1 Million Ereignisse mit Informationen zu 1 Objekt zu senden, sende ich lieber beispielsweise 10.000 Ereignisse mit Informationen zu 100 Objekten
Lösung
IMO, das ist ein seltsamer Anwendungsfall. Ich kann mir keine effiziente Lösung dafür vorstellen, da es viel Mischen erfordern wird, egal wie wir es machen.
Aber das Folgende ist noch besser, da es das Sammeln auf dem Treiberknoten vermeidet und somit besser skalierbar ist.
Dinge zu beachten –
- was ist der wert
data.count()
von - Wie groß ist ein Single
Foo
? - was ist der wert
arraySize
von - Wie ist deine Executor-Konfiguration?
Basierend auf diesen Faktoren können Sie den desiredArraysPerPartition
Wert ermitteln.
val desiredArraysPerPartition = 50
private def mapToFooArrays(
data: Dataset[Foo],
arraysSize: Int
): Dataset[Array[Foo]] = {
val size = data.count()
val numArrays = (size.toDouble / arrarySize).ceil
val numPartitions = (numArrays.toDouble / desiredArraysPerPartition).ceil
data
.repartition(numPartitions)
.mapPartitions(_.grouped(arrarySize).map(_.toArray))
}
Nachdem ich den bearbeiteten Teil gelesen habe, denke ich, dass 100 size
in 10 thousand events with information about 100 objects
nicht wirklich wichtig ist. Wie es genannt wird about 100
. Es kann mehr als ein Ereignis mit weniger als geben 100 Foo's
.
Wenn wir bei dieser Größe nicht sehr streng 100
sind, besteht keine Notwendigkeit für eine Umbildung.
Wir können group
die Foo's
Gegenwart in jeder der Partitionen lokal. Da diese Gruppierung durchgeführt wird, kann locally
dies not globally
zu mehr als einer (möglicherweise einer für jede Partition) Arrays
mit weniger als 100 führen Foo's
.
private def mapToFooArrays(
data: Dataset[Foo],
arraysSize: Int
): Dataset[Array[Foo]] =
data
.mapPartitions(_.grouped(arrarySize).map(_.toArray))
Beantwortet von – sarveshseri
Antwort geprüft von – Marilyn (FixError Volunteer)