05 - Code Snippets
CREATE OR REPLACE TABLE current_books
AS SELECT book_id, title, author, price
FROM books_silver
WHERE current IS TRUE
Unpack JSON to Schema Example
from pyspark.sql import functions as F
schema = "customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING, country_code STRING, row_status STRING, row_time timestamp"
customers_df = (spark.table("bronze")
.filter("topic = 'customers'")
.select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
.select("v.*")
.filter(F.col("row_status").isin(["insert", "update"])))
display(customers_df)