[FIXED] Spark – Generieren Sie Fenster mit bestimmten Schlüsseln

Ausgabe

Ich möchte meinem eine Spalte hinzufügen df_ordored, die Fenster basierend auf der Statusspalte identifiziert. Ich möchte eine ID für alle Protokolle zwischen “Öffnen” und “Schließen” wie folgt generieren:

df_ordored = 
+----+---------+------+
|date|word     |status|
+----+---------+------+
|1   |un       |      |
|2   |itnane   |open  |
|3   |tres     |      |
|4   |four     |close |
|4.1 |four     |other |
|5   |fünf     |open  |
|6   |Liù      |null  |
|7   |Sette    |any   |
|8   |vosem    |      |
|9   |Shinchaku|close |
+----+---------+------+
df_expected =
+----+---------+------+--+
|date|word     |status|id|
+----+---------+------+--+
|1   |un       |      |  |
|2   |itnane   |open  |a |
|3   |tres     |      |a |
|4   |four     |close |a |
|4.1 |four     |other |  |
|5   |fünf     |open  |b |
|6   |Liù      |null  |b |
|7   |Sette    |any   |b |
|8   |vosem    |      |b |
|9   |Shinchaku|close |b |
+----+---------+------+--+

Ist es möglich, dies in der Dataframe/Dataset-Abstraktion und ohne das Sammeln von Daten über den Treiber zu tun?

Lösung

Sie können eine Fensterfunktion verwenden, um inkrementelle Ganzzahlen als IDs von Öffnungs-/Schließsequenzen zu generieren:

# Window to compute cumulative sums
cumsum_window = (
    Window
    # .partitionBy('something')  # if you can use a column to partition the data, is a good idea to use it to improve performance in the case of DataFrames with a lot of data/rows
    .orderBy('date')
    .rangeBetween(Window.unboundedPreceding, 0)
)

# find row with status open
df = df.withColumn('is_open', F.when(F.col('status') == 'open', 1).otherwise(0))

# mark both open and close, with 1 and -1 respectively
df = df.withColumn('is_open_close', F.when(F.col('status') == 'close', -1).otherwise(F.col('is_open')))

# A sequence is composed by rows between open and close (included).
# So, row belongs to a sequence if one of the following holds:
#    1. have close status or 1 in the cumulative sum 
#    2. its cumulative sum of the column is_open_close is 1
#    IS_OPEN_CLOSE: 0 0 0 ... 1 (open) 0 0 ... 0 -1 (close) 0 1 (open) ...
#    CUMSUM:        0 0 0 ... 1 (open) 1 1 ... 1  0 (close) 0 1 (open) ...
df = df.withColumn('is_in_sequence', (F.col('status') == 'close') | (F.sum('is_open_close').over(cumsum_window).cast(T.BooleanType())))

# Compute an id for rows in a sequence as the ordinal of their correspondent
# open in the is_open column. Use the cumulative sum of is_open to compute it.
#    IS_OPEN: 0 0 0 ... 1 (open) 0 0 ... 0  0 (close) 0 1 (open) ...
#    CUMSUM:  0 0 0 ... 1 (open) 1 1 ... 1  1 (close) 1 2 (open) ...
# Assign the just created ID only to rows belonging to a sequence
df = df.withColumn('sequence_id', F.when(F.col('is_in_sequence'), F.sum('is_open').over(cumsum_window)))

# remove temporary columns
df = df.drop('is_open')
df = df.drop('is_open_close')
df = df.drop('is_in_sequence')

Das ist was du bekommst:

+----+---------+------+-----------+
|date|     word|status|sequence_id|
+----+---------+------+-----------+
|   1|       un|  null|       null|
|   2|   itnane|  open|          1|
|   3|     tres|  null|          1|
|   4|     four| close|          1|
| 4.1|     four| other|       null|
|   5|     fünf|  open|          2|
|   6|      Liù|  null|          2|
|   7|    Sette|   any|          2|
|   8|    vosem|  null|          2|
|   9|Shinchaku| close|          2|
+----+---------+------+-----------+

Wenn Sie eine Partitionsspalte im Fenster verwenden, ist die ID einer Sequenz offensichtlich das Paar, das aus der sequence_idund dieser Spalte besteht. Sie können es leicht in eine insgesamt eindeutige ID übersetzen, indem Sie sie kombinieren. Angenommen, Ihr DataFrame ist der folgende:

+-------------------+---------+------+
|               date|     word|status|
+-------------------+---------+------+
|2022-01-10 12:00:00|       un|  null|
|2022-01-10 13:00:00|   itnane|  open|
|2022-01-10 14:00:00|     tres|  null|
|2022-01-10 15:00:00|     four| close|
|2022-01-10 16:00:00|     four| other|
|2022-01-10 17:00:00|     fünf|  open|
|2022-01-10 18:00:00|      Liù|  null|
|2022-01-10 18:00:00|    Sette|   any|
|2022-01-10 20:00:00|    vosem|  null|
|2022-01-10 21:00:00|Shinchaku| close|
|2022-01-13 09:00:00|       ve|  null|
|2022-01-13 10:00:00|      col|  open|
|2022-01-13 11:00:00|     bias|  null|
|2022-01-13 12:00:00|       no| close|
+-------------------+---------+------+

und Sie möchten nach Tag partitionieren, dann können Sie das folgende Fenster verwenden:

cumsum_window = (
    Window
    .partitionBy(F.date_trunc('day', 'date'))
    .orderBy('date')
    .rangeBetween(Window.unboundedPreceding, 0)
)

und fügen Sie als letzten Schritt den folgenden Code hinzu:

df = df.withColumn(
    'unique_sequence_id',
    F.when(
        F.col('sequence_id').isNotNull(),
        F.concat_ws('_', F.date_format('date', 'yyyy-MM-dd'), 'sequence_id')
    )
)

Das ist das Ergebnis:

+-------------------+---------+------+-----------+------------------+
|               date|     word|status|sequence_id|unique_sequence_id|
+-------------------+---------+------+-----------+------------------+
|2022-01-10 12:00:00|       un|  null|       null|              null|
|2022-01-10 13:00:00|   itnane|  open|          1|      2022-01-10_1|
|2022-01-10 14:00:00|     tres|  null|          1|      2022-01-10_1|
|2022-01-10 15:00:00|     four| close|          1|      2022-01-10_1|
|2022-01-10 16:00:00|     four| other|       null|              null|
|2022-01-10 17:00:00|     fünf|  open|          2|      2022-01-10_2|
|2022-01-10 18:00:00|      Liù|  null|          2|      2022-01-10_2|
|2022-01-10 18:00:00|    Sette|   any|          2|      2022-01-10_2|
|2022-01-10 20:00:00|    vosem|  null|          2|      2022-01-10_2|
|2022-01-10 21:00:00|Shinchaku| close|          2|      2022-01-10_2|
|2022-01-13 09:00:00|       ve|  null|       null|              null|
|2022-01-13 10:00:00|      col|  open|          1|      2022-01-13_1|
|2022-01-13 11:00:00|     bias|  null|          1|      2022-01-13_1|
|2022-01-13 12:00:00|       no| close|          1|      2022-01-13_1|
+-------------------+---------+------+-----------+------------------+


Beantwortet von –
PieCot


Antwort geprüft von –
Clifford M. (FixError Volunteer)

0 Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like