00 - ETL with SparkSQL and Python

Querying Files

  • Direct File Querying:

    • Query files directly using SQL.
    SELECT * FROM 'csv:/path/to/file.csv';
    
  • External Files and Raw Content:

    • Create tables with CSV source.
    CREATE TABLE my_table
    USING CSV
    OPTIONS (path '/path/to/file.csv');
    
  • External Source Configuration:

    • Use CTAS for Delta/Lake tables.
    CREATE TABLE delta_table
    AS SELECT * FROM source_table;
    
    • Limitation: CTAS doesn’t support custom schemas.

Registering Tables on External Data Sources

  • Create Table Syntax:

    CREATE TABLE table_name (col_name1 col_type1, ...)
    USING data_source
    OPTIONS (path 'location');
    
  • Data Sync Limitation:

    • Data may not sync with storage; Delta tables ensure consistency.
  • Alternative Options:

    • Use temporary views and Delta/Lake tables.
    CREATE TEMP VIEW temp_view AS
    SELECT * FROM source_table;
    

Advanced Transformation

  • Parsing JSON Objects with Struct Types: Convert the JSON string to structured data type based on the schema definition for the JSON
    SELECT from_json(profile, 'STRUCT<name: STRING, age: INT, email: STRING>') AS profile_struct
    FROM customers;
    
    • Use dot (.) notation for struct types.

explode: This function is used to transform an array or map into multiple rows, allowing for detailed analysis of individual elements. In Databricks Spark SQL, you use it with LATERAL VIEW.

collect_set: This function aggregates distinct elements into a set, providing a list of unique values for each group. It’s useful for eliminating duplicates and summarizing data.

  • Operations:
    • UNION Operation:
      SELECT * FROM table1
      UNION
      SELECT * FROM table2;
      
    • PIVOT:
      SELECT * FROM (
        SELECT column1, column2 FROM table
      ) PIVOT (
        SUM(column2) FOR column1 IN ('value1', 'value2')
      );
      

Higher Order Functions

  • Description:

    • Manipulate hierarchical data like arrays and maps.
    • Functions: FILTER, TRANSFORM
    SELECT TRANSFORM(array_column, x -> x + 1) FROM table;
    
  • User Defined Functions (UDFs):

    • Extend functionality and leverage Spark optimization.
    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    
    def add_one(x):
        return x + 1
    
    add_one_udf = udf(add_one, IntegerType())
    df.withColumn('new_column', add_one_udf(df['column']))
    

Upsert

  • Syntax:
    • Combines INSERT and UPDATE operations.
    MERGE INTO target_table
    USING source_table
    ON target_table.id = source_table.id
    WHEN MATCHED THEN
      UPDATE SET target_table.value = source_table.value
    WHEN NOT MATCHED THEN
      INSERT (id, value) VALUES (source_table.id, source_table.value);
    

These detailed notes and examples cover essential concepts in Spark SQL and the ETL process, focusing on table management, data transformation, and advanced functions, crucial for the Databricks Certified Data Engineer Associate exam.

Writing to Tables

  • Create or Replace Table:

    • Completely overwrites the table each time.
    CREATE OR REPLACE TABLE my_table AS
    SELECT * FROM source_table;
    
  • Insert Overwrite Table:

    • Replaces existing data with new data.
    INSERT OVERWRITE TABLE my_table
    SELECT * FROM new_data;
    
  • Merge Operation:

    • Performs inserts, updates, and deletes in a single transaction.
    CREATE OR REPLACE TEMP VIEW Customer_updates AS
    SELECT * FROM json.`${dataset}/Customer-json-needs`;
    
    MERGE INTO Customer C
    USING Customer_updates U
    ON C.customer_id = U.customer_id
    WHEN MATCHED AND C.email IS NULL AND U.email IS NOT NULL THEN
      UPDATE SET email = U.email, updated = U.updated
    WHEN NOT MATCHED THEN INSERT *
    

    alt text