PySpark feature has to contain timestamp widget.
dbutils.widgets.text("timestamp", "2020-12-12")
Feature DataFrame has to be assigned to df_final
variable.
Create timestamp widget.
dbutils.widgets.text("timestamp", "2020-12-12")
Add imports
from pyspark.sql import functions as f
Add Feature metadata in form of a python dictionary:
metadata = {
"category": "customer",
"table": "customer_features",
"features": {
"customer_email": {
"description": "Users email",
"tags": ["email", "sensitive"],
"fillna_with": "",
}
}
}
Assign result of your operations to variable df_final
df_load = spark.read.table("dev_odap_offline_sdm_l2.transactions")
def calculate_features(df):
return (
df.groupby("client_id")
.agg(
f.sum("cardtr_amount_czk").alias("sum_amount"),
f.avg("cardtr_amount_czk").alias("avg_amount"),
)
.withColumn("timestamp", f.lit(dbutils.widgets.get("timestamp")))
)
df_final = df_load.transform(calculate_features)