Ausgabe
Ich möchte meinem eine Spalte hinzufügen df_ordored
, die Fenster basierend auf der Statusspalte identifiziert. Ich möchte eine ID für alle Protokolle zwischen “Öffnen” und “Schließen” wie folgt generieren:
df_ordored =
+----+---------+------+
|date|word |status|
+----+---------+------+
|1 |un | |
|2 |itnane |open |
|3 |tres | |
|4 |four |close |
|4.1 |four |other |
|5 |fünf |open |
|6 |Liù |null |
|7 |Sette |any |
|8 |vosem | |
|9 |Shinchaku|close |
+----+---------+------+
df_expected =
+----+---------+------+--+
|date|word |status|id|
+----+---------+------+--+
|1 |un | | |
|2 |itnane |open |a |
|3 |tres | |a |
|4 |four |close |a |
|4.1 |four |other | |
|5 |fünf |open |b |
|6 |Liù |null |b |
|7 |Sette |any |b |
|8 |vosem | |b |
|9 |Shinchaku|close |b |
+----+---------+------+--+
Ist es möglich, dies in der Dataframe/Dataset-Abstraktion und ohne das Sammeln von Daten über den Treiber zu tun?
Lösung
Sie können eine Fensterfunktion verwenden, um inkrementelle Ganzzahlen als IDs von Öffnungs-/Schließsequenzen zu generieren:
# Window to compute cumulative sums
cumsum_window = (
Window
# .partitionBy('something') # if you can use a column to partition the data, is a good idea to use it to improve performance in the case of DataFrames with a lot of data/rows
.orderBy('date')
.rangeBetween(Window.unboundedPreceding, 0)
)
# find row with status open
df = df.withColumn('is_open', F.when(F.col('status') == 'open', 1).otherwise(0))
# mark both open and close, with 1 and -1 respectively
df = df.withColumn('is_open_close', F.when(F.col('status') == 'close', -1).otherwise(F.col('is_open')))
# A sequence is composed by rows between open and close (included).
# So, row belongs to a sequence if one of the following holds:
# 1. have close status or 1 in the cumulative sum
# 2. its cumulative sum of the column is_open_close is 1
# IS_OPEN_CLOSE: 0 0 0 ... 1 (open) 0 0 ... 0 -1 (close) 0 1 (open) ...
# CUMSUM: 0 0 0 ... 1 (open) 1 1 ... 1 0 (close) 0 1 (open) ...
df = df.withColumn('is_in_sequence', (F.col('status') == 'close') | (F.sum('is_open_close').over(cumsum_window).cast(T.BooleanType())))
# Compute an id for rows in a sequence as the ordinal of their correspondent
# open in the is_open column. Use the cumulative sum of is_open to compute it.
# IS_OPEN: 0 0 0 ... 1 (open) 0 0 ... 0 0 (close) 0 1 (open) ...
# CUMSUM: 0 0 0 ... 1 (open) 1 1 ... 1 1 (close) 1 2 (open) ...
# Assign the just created ID only to rows belonging to a sequence
df = df.withColumn('sequence_id', F.when(F.col('is_in_sequence'), F.sum('is_open').over(cumsum_window)))
# remove temporary columns
df = df.drop('is_open')
df = df.drop('is_open_close')
df = df.drop('is_in_sequence')
Das ist was du bekommst:
+----+---------+------+-----------+
|date| word|status|sequence_id|
+----+---------+------+-----------+
| 1| un| null| null|
| 2| itnane| open| 1|
| 3| tres| null| 1|
| 4| four| close| 1|
| 4.1| four| other| null|
| 5| fünf| open| 2|
| 6| Liù| null| 2|
| 7| Sette| any| 2|
| 8| vosem| null| 2|
| 9|Shinchaku| close| 2|
+----+---------+------+-----------+
Wenn Sie eine Partitionsspalte im Fenster verwenden, ist die ID einer Sequenz offensichtlich das Paar, das aus der sequence_id
und dieser Spalte besteht. Sie können es leicht in eine insgesamt eindeutige ID übersetzen, indem Sie sie kombinieren. Angenommen, Ihr DataFrame ist der folgende:
+-------------------+---------+------+
| date| word|status|
+-------------------+---------+------+
|2022-01-10 12:00:00| un| null|
|2022-01-10 13:00:00| itnane| open|
|2022-01-10 14:00:00| tres| null|
|2022-01-10 15:00:00| four| close|
|2022-01-10 16:00:00| four| other|
|2022-01-10 17:00:00| fünf| open|
|2022-01-10 18:00:00| Liù| null|
|2022-01-10 18:00:00| Sette| any|
|2022-01-10 20:00:00| vosem| null|
|2022-01-10 21:00:00|Shinchaku| close|
|2022-01-13 09:00:00| ve| null|
|2022-01-13 10:00:00| col| open|
|2022-01-13 11:00:00| bias| null|
|2022-01-13 12:00:00| no| close|
+-------------------+---------+------+
und Sie möchten nach Tag partitionieren, dann können Sie das folgende Fenster verwenden:
cumsum_window = (
Window
.partitionBy(F.date_trunc('day', 'date'))
.orderBy('date')
.rangeBetween(Window.unboundedPreceding, 0)
)
und fügen Sie als letzten Schritt den folgenden Code hinzu:
df = df.withColumn(
'unique_sequence_id',
F.when(
F.col('sequence_id').isNotNull(),
F.concat_ws('_', F.date_format('date', 'yyyy-MM-dd'), 'sequence_id')
)
)
Das ist das Ergebnis:
+-------------------+---------+------+-----------+------------------+
| date| word|status|sequence_id|unique_sequence_id|
+-------------------+---------+------+-----------+------------------+
|2022-01-10 12:00:00| un| null| null| null|
|2022-01-10 13:00:00| itnane| open| 1| 2022-01-10_1|
|2022-01-10 14:00:00| tres| null| 1| 2022-01-10_1|
|2022-01-10 15:00:00| four| close| 1| 2022-01-10_1|
|2022-01-10 16:00:00| four| other| null| null|
|2022-01-10 17:00:00| fünf| open| 2| 2022-01-10_2|
|2022-01-10 18:00:00| Liù| null| 2| 2022-01-10_2|
|2022-01-10 18:00:00| Sette| any| 2| 2022-01-10_2|
|2022-01-10 20:00:00| vosem| null| 2| 2022-01-10_2|
|2022-01-10 21:00:00|Shinchaku| close| 2| 2022-01-10_2|
|2022-01-13 09:00:00| ve| null| null| null|
|2022-01-13 10:00:00| col| open| 1| 2022-01-13_1|
|2022-01-13 11:00:00| bias| null| 1| 2022-01-13_1|
|2022-01-13 12:00:00| no| close| 1| 2022-01-13_1|
+-------------------+---------+------+-----------+------------------+
Beantwortet von – PieCot
Antwort geprüft von – Clifford M. (FixError Volunteer)