[FIXED] Scala – Unterteilen Sie den Datensatz in Datensätze von Arrays mit fester Größe

Ausgabe

Ich habe eine Funktion, deren Zweck es ist, einen Datensatz in Arrays einer bestimmten Größe zu unterteilen.

Zum Beispiel – ich habe ein Dataset mit 123 Objekten vom Typ Foo, ich gebe der Funktion arraysSize 10 an, also habe ich als Ergebnis ein Dataset[Array[Foo]] mit 12 Arrays mit 10 Foo und 1 Array mit 3 Foo.
Im Moment arbeitet die Funktion an gesammelten Daten – ich würde sie aufgrund der Leistung gerne datensatzbasiert ändern, aber ich weiß nicht wie.


Das ist meine aktuelle Lösung:

  private def mapToFooArrays(data: Dataset[Foo],
                                         arraysSize: Int): Dataset[Array[Foo]]= {
data.collect().grouped(arraysSize).toSeq.toDS()
  }

Der Grund für diese Transformation liegt darin, dass die Daten im Ereignis gesendet werden. Anstatt 1 Million Ereignisse mit Informationen zu 1 Objekt zu senden, sende ich lieber beispielsweise 10.000 Ereignisse mit Informationen zu 100 Objekten

Lösung

IMO, das ist ein seltsamer Anwendungsfall. Ich kann mir keine effiziente Lösung dafür vorstellen, da es viel Mischen erfordern wird, egal wie wir es machen.

Aber das Folgende ist noch besser, da es das Sammeln auf dem Treiberknoten vermeidet und somit besser skalierbar ist.

Dinge zu beachten –

  • was ist der wert data.count()von
  • Wie groß ist ein Single Foo?
  • was ist der wert arraySizevon
  • Wie ist deine Executor-Konfiguration?

Basierend auf diesen Faktoren können Sie den desiredArraysPerPartitionWert ermitteln.


val desiredArraysPerPartition = 50

private def mapToFooArrays(
    data: Dataset[Foo],
    arraysSize: Int
): Dataset[Array[Foo]] = {
  val size = data.count()
  val numArrays = (size.toDouble / arrarySize).ceil
  val numPartitions = (numArrays.toDouble / desiredArraysPerPartition).ceil
  
  data
    .repartition(numPartitions)
    .mapPartitions(_.grouped(arrarySize).map(_.toArray))
}

Nachdem ich den bearbeiteten Teil gelesen habe, denke ich, dass 100 sizein 10 thousand events with information about 100 objectsnicht wirklich wichtig ist. Wie es genannt wird about 100. Es kann mehr als ein Ereignis mit weniger als geben 100 Foo's.

Wenn wir bei dieser Größe nicht sehr streng 100sind, besteht keine Notwendigkeit für eine Umbildung.

Wir können groupdie Foo'sGegenwart in jeder der Partitionen lokal. Da diese Gruppierung durchgeführt wird, kann locallydies not globallyzu mehr als einer (möglicherweise einer für jede Partition) Arraysmit weniger als 100 führen Foo's.

private def mapToFooArrays(
    data: Dataset[Foo],
    arraysSize: Int
): Dataset[Array[Foo]] = 
  data
    .mapPartitions(_.grouped(arrarySize).map(_.toArray))


Beantwortet von –
sarveshseri


Antwort geprüft von –
Marilyn (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like