Install odap
package containing the time windowed library
%pip install odap
Import time_windows
module
from pyspark.sql import functions as f
# Time windows module
from odap.feature_factory import time_windows as tw
Create widgets for passing in timestamp
dbutils.widgets.text("timestamp", "")
Create a WindowedDataFrame
instance from source data Dataframe, a time column for time window filtering and a list of time windows
wdf = tw.WindowedDataFrame(
df=spark.read.table("odap_digi_sdm_l2.web_visits"),
time_column="visit_timestamp",
time_windows=["14d", "30d", "90d"],
)
Add timestamp column to the dataframe
wdf = wdf.withColumn("timestamp", f.lit(dbutils.widgets.get("timestamp")).cast("timestamp"))
Create a callback function to define time windowed columns
def product_agg_features(time_window: str):
return [
tw.sum_windowed(
f"loans_web_visits_count_in_last_{time_window}",
f.lower("url").contains("loans").cast("integer"),
)
]
Use time_windowed
method of the WindowedDataFrame
to calculate windowed columns
df_final = wdf.time_windowed(group_keys=["customer_id", "timestamp"], agg_columns_function=product_agg_features)