Creating a custom counter in Spark based on dataframe conditions

Current Dataset

+---+-----+-----+-----+----+
| ID|Event|Index|start| end|
+---+-----+-----+-----+----+
| 1| run| 0|start|null|
| 1| run| 1| null|null|
| 1| run| 2| null|null|
| 1| swim| 3| null| end|
| 1| run| 4|start|null|
| 1| swim| 5| null|null|
| 1| swim| 6| null| end|
| 1| run| 7|start|null|
| 1| run| 8| null|null|
| 1| run| 9| null|null|
| 1| swim| 10| null| end|
| 1| run| 11|start|null|
| 1| run| 12| null|null|
| 1| run| 13| null| end|
| 2| run| 14|start|null|
| 2| run| 15| null|null|
| 2| run| 16| null|null|
| 2| swim| 17| null| end|
| 2| run| 18|start|null|
| 2| swim| 19| null|null|
| 2| swim| 20| null|null|
| 2| swim| 21| null|null|
| 2| swim| 22| null| end|
| 2| run| 23|start|null|
| 2| run| 24| null|null|
| 2| run| 25| null| end|
| 3| run| 26|start|null|
| 3| run| 27| null|null|
| 3| swim| 28| null|null|
+---+-----+-----+-----+----+

Dataset I'm After

+---+-----+-----+-----+----+-------+
| ID|Event|Index|start| end|EventID|
+---+-----+-----+-----+----+-------+
| 1| run| 0|start|null| 1|
| 1| run| 1| null|null| 1|
| 1| run| 2| null|null| 1|
| 1| swim| 3| null| end| 1|
| 1| run| 4|start|null| 2|
| 1| swim| 5| null|null| 2|
| 1| swim| 6| null| end| 2|
| 1| run| 7|start|null| 3|
| 1| run| 8| null|null| 3|
| 1| run| 9| null|null| 3|
| 1| swim| 10| null| end| 3|
| 1| run| 11|start|null| 4|
| 1| run| 12| null|null| 4|
| 1| run| 13| null| end| 4|
| 2| run| 14|start|null| 1|
| 2| run| 15| null|null| 1|
| 2| run| 16| null|null| 1|
| 2| swim| 17| null| end| 1|
| 2| run| 18|start|null| 2|
| 2| swim| 19| null|null| 2|
| 2| swim| 20| null|null| 2|
| 2| swim| 21| null|null| 2|
| 2| swim| 22| null| end| 2|
| 2| run| 23|start|null| 3|
| 2| run| 24| null|null| 3|
| 2| run| 25| null| end| 3|
| 3| run| 26|start|null| 1|
| 3| run| 27| null|null| 1|
| 3| swim| 28| null|null| 1|
+---+-----+-----+-----+----+-------+

I am trying to create the above EventID Column. Is there a way to create a counter inside of a udf that updates based on column conditions? Note, I'm not sure if a UDF is the best approach here.

Here is my current thinking-logic:

  • When a "start" value is seen, start counting.
  • When an "end" value is seen, end counting
  • Every time a new ID is seen, reset the counter to 1

Thank you all for any assistance.

Here is the raw code to produce the current dataframe:

# Current Dataset
data = [ (1, "run", 0, 'start', None), (1, "run", 1, None, None), (1, "run", 2, None, None), (1, "swim", 3, None, 'end'), (1, "run", 4, 'start',None), (1, "swim", 5, None, None), (1, "swim", 6, None, 'end'), (1, "run",7, 'start', None), (1, "run",8, None, None), (1, "run",9, None, None), (1, "swim",10, None, 'end'), (1, "run",11, 'start', None), (1, "run",12, None, None), (1, "run",13, None, 'end'), (2, "run",14, 'start', None), (2, "run",15, None, None), (2, "run",16, None, None), (2, "swim",17, None, 'end'), (2, "run",18, 'start', None), (2, "swim",19, None, None), (2, "swim",20, None, None), (2, "swim",21, None, None), (2, "swim",22, None, 'end'), (2, "run",23, 'start', None), (2, "run",24, None, None), (2, "run",25, None, 'end'), (3, "run",26, 'start', None), (3, "run",27, None, None), (3, "swim",28, None, None) ]
schema = StructType([ StructField('ID', IntegerType(),True), \ StructField('Event', StringType(),True), \ StructField('Index', IntegerType(),True), \ StructField('start', StringType(),True), \ StructField('end', StringType(),True)
])
df = spark.createDataFrame(data=data, schema=schema)
df.show(30)
1

2 Answers

You can use a window function:

import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('ID').rowsBetween(Window.unboundedPreceding,0).orderBy('index')
df.withColumn('EventId', F.sum(F.when(F.col('start') == 'start', 1).otherwise(0))\ .over(w)).orderBy('ID', 'Index').show(100)

results in

+---+-----+-----+-----+----+-------+
| ID|Event|Index|start| end|EventId|
+---+-----+-----+-----+----+-------+
| 1| run| 0|start|null| 1|
| 1| run| 1| null|null| 1|
| 1| run| 2| null|null| 1|
| 1| swim| 3| null| end| 1|
| 1| run| 4|start|null| 2|
| 1| swim| 5| null|null| 2|
| 1| swim| 6| null| end| 2|
| 1| run| 7|start|null| 3|
| 1| run| 8| null|null| 3|
| 1| run| 9| null|null| 3|
| 1| swim| 10| null| end| 3|
| 1| run| 11|start|null| 4|
| 1| run| 12| null|null| 4|
| 1| run| 13| null| end| 4|
| 2| run| 14|start|null| 1|
| 2| run| 15| null|null| 1|
| 2| run| 16| null|null| 1|
| 2| swim| 17| null| end| 1|
| 2| run| 18|start|null| 2|
| 2| swim| 19| null|null| 2|
| 2| swim| 20| null|null| 2|
| 2| swim| 21| null|null| 2|
| 2| swim| 22| null| end| 2|
| 2| run| 23|start|null| 3|
| 2| run| 24| null|null| 3|
| 2| run| 25| null| end| 3|
| 3| run| 26|start|null| 1|
| 3| run| 27| null|null| 1|
| 3| swim| 28| null|null| 1|
+---+-----+-----+-----+----+-------+

You can calculate dense_rank based on the most recent start time:

from pyspark.sql import functions as F, Window
df2 = df.withColumn( 'laststart', F.last(F.when(F.col('start') == 'start', F.col('Index')), True).over(Window.partitionBy('ID').orderBy('Index'))
).withColumn( 'EventID', F.dense_rank().over(Window.partitionBy('ID').orderBy('laststart'))
)
df2.show(999)
+---+-----+-----+-----+----+---------+-------+
| ID|Event|Index|start| end|laststart|EventID|
+---+-----+-----+-----+----+---------+-------+
| 1| run| 0|start|null| 0| 1|
| 1| run| 1| null|null| 0| 1|
| 1| run| 2| null|null| 0| 1|
| 1| swim| 3| null| end| 0| 1|
| 1| run| 4|start|null| 4| 2|
| 1| swim| 5| null|null| 4| 2|
| 1| swim| 6| null| end| 4| 2|
| 1| run| 7|start|null| 7| 3|
| 1| run| 8| null|null| 7| 3|
| 1| run| 9| null|null| 7| 3|
| 1| swim| 10| null| end| 7| 3|
| 1| run| 11|start|null| 11| 4|
| 1| run| 12| null|null| 11| 4|
| 1| run| 13| null| end| 11| 4|
| 2| run| 14|start|null| 14| 1|
| 2| run| 15| null|null| 14| 1|
| 2| run| 16| null|null| 14| 1|
| 2| swim| 17| null| end| 14| 1|
| 2| run| 18|start|null| 18| 2|
| 2| swim| 19| null|null| 18| 2|
| 2| swim| 20| null|null| 18| 2|
| 2| swim| 21| null|null| 18| 2|
| 2| swim| 22| null| end| 18| 2|
| 2| run| 23|start|null| 23| 3|
| 2| run| 24| null|null| 23| 3|
| 2| run| 25| null| end| 23| 3|
| 3| run| 26|start|null| 26| 1|
| 3| run| 27| null|null| 26| 1|
| 3| swim| 28| null|null| 26| 1|
+---+-----+-----+-----+----+---------+-------+

Your Answer

Sign up or log in

Sign up using Google Sign up using Facebook Sign up using Email and Password

Post as a guest

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge that you have read and understand our privacy policy and code of conduct.

You Might Also Like