03 - ETL with SparkSQL and Python

Querying Files

Direct File Querying

Query files directly using SQL.

SELECT * FROM 'csv:/path/to/file.csv';
SELECT * FROM CSV. 'dbfs:/location/csv_files/'
  • External Files and Raw Content: Create tables with CSV source
CREATE TABLE my_table
USING CSV
OPTIONS (path '/path/to/file.csv');
  • External Source Configuration:
    • Use CTAS for Delta/Lake tables.
CREATE TABLE delta_table
AS SELECT * FROM source_table;
  • Limitation: CTAS doesn’t support custom schemas.

CTAS - Registering Tables on External Data Sources

  • Create Table Syntax:
    CREATE TABLE table_name (col_name1 col_type1, ...)
    USING data_source
    OPTIONS (path 'location');
    
  • Automatically infer schema information from query results
  • Do Not support manual schema declaration.
  • Useful for external data ingestion with well-defined schema
  • Data Sync Limitation:

    • Data may not sync with storage; Delta tables ensure consistency.
  • Alternative Options:

    • Use temporary views and Delta/Lake tables.
    CREATE TEMP VIEW temp_view AS
    SELECT * FROM source_table;
    

Writing to Tables

Create or Replace Table

Completely overwrites the table each time.

CREATE OR REPLACE TABLE my_table AS
SELECT * FROM source_table;

Insert Overwrite Table

Replaces existing data with new data.

INSERT OVERWRITE TABLE my_table
SELECT * FROM new_data;

Merge Operation

Performs inserts, updates, and deletes in a single transaction.

CREATE OR REPLACE TEMP VIEW Customer_updates AS
SELECT * FROM json.`${dataset}/Customer-json-needs`;

MERGE INTO Customer C
USING Customer_updates U
ON C.customer_id = U.customer_id
WHEN MATCHED AND C.email IS NULL AND U.email IS NOT NULL THEN
  UPDATE SET email = U.email, updated = U.updated
WHEN NOT MATCHED THEN INSERT *

alt text

Advanced Transformation

  • Parsing JSON Objects with Struct Types: Convert the JSON string to structured data type based on the schema definition for the JSON
    SELECT from_json(profile, 'STRUCT<name: STRING, age: INT, email: STRING>') AS profile_struct
    FROM customers;
    
    • Use dot (.) notation for struct types.

explode: This function is used to transform an array or map into multiple rows, allowing for detailed analysis of individual elements. In Databricks Spark SQL, you use it with LATERAL VIEW.

collect_set: This function aggregates distinct elements into a set, providing a list of unique values for each group. It’s useful for eliminating duplicates and summarizing data.

  • Operations:
    • UNION Operation:
      SELECT * FROM table1
      UNION
      SELECT * FROM table2;
      
    • PIVOT:
      SELECT * FROM (
        SELECT column1, column2 FROM table
      ) PIVOT (
        SUM(column2) FOR column1 IN ('value1', 'value2')
      );
      

Higher Order Functions

  • Description:
    • Manipulate hierarchical data like arrays and maps.
    • Functions: FILTER, TRANSFORM
    SELECT TRANSFORM(array_column, x -> x + 1) FROM table;
    

User Defined Functions (UDFs)

  • Extend functionality and leverage Spark optimization.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def add_one(x):
    return x + 1

add_one_udf = udf(add_one, IntegerType())
df.withColumn('new_column', add_one_udf(df['column']))

Upsert

  • Syntax:
    • Combines INSERT and UPDATE operations.
    MERGE INTO target_table
    USING source_table
    ON target_table.id = source_table.id
    WHEN MATCHED THEN
      UPDATE SET target_table.value = source_table.value
    WHEN NOT MATCHED THEN
      INSERT (id, value) VALUES (source_table.id, source_table.value);
    

These detailed notes and examples cover essential concepts in Spark SQL and the ETL process, focusing on table management, data transformation, and advanced functions, crucial for the Databricks Certified Data Engineer Associate exam.