Example time windowed feature

Install odap package containing the time windowed library

%pip install odap

Import time_windows module

from pyspark.sql import functions as f

# Time windows module
from odap.feature_factory import time_windows as tw

Create widgets for passing in timestamp

dbutils.widgets.text("timestamp", "")

Create a WindowedDataFrame instance from source data Dataframe, a time column for time window filtering and a list of time windows

wdf = tw.WindowedDataFrame(
    df=spark.read.table("odap_digi_sdm_l2.web_visits"),
    time_column="visit_timestamp",
    time_windows=["14d", "30d", "90d"],
)

Add timestamp column to the dataframe

wdf = wdf.withColumn("timestamp", f.lit(dbutils.widgets.get("timestamp")).cast("timestamp"))

Create a callback function to define time windowed columns

def product_agg_features(time_window: str):    
    return [
        tw.sum_windowed(
            f"loans_web_visits_count_in_last_{time_window}",
            f.lower("url").contains("loans").cast("integer"),
        )
    ]

Use time_windowed method of the WindowedDataFrame to calculate windowed columns

df_final = wdf.time_windowed(group_keys=["customer_id", "timestamp"], agg_columns_function=product_agg_features)