AgentSkillsCN

gold-layer-merge-patterns

为黄金层表防止重复源键错误,提供Delta MERGE去重模式。在从银表向黄金表合并时使用此功能,尤其是当银表因增量DLT流式传输、CDC模式、SCD类型2追踪,或多次批量加载而可能包含重复的业务主键时。在合并前进行去重处理,即可有效预防DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE错误。

SKILL.md
--- frontmatter
name: gold-layer-merge-patterns
description: Provides production-grade patterns for Gold layer MERGE operations from Silver to Gold tables. Covers column mapping, schema evolution, SCD Type 1/2 patterns, fact table aggregation, and preventing variable naming conflicts with PySpark functions. Use when creating Gold layer MERGE operations, handling column name differences between Silver and Gold, implementing SCD Type 1/2 dimensions, aggregating fact tables, or troubleshooting MERGE errors. Triggers on "Gold merge", "MERGE operation", "upsert Gold", "SCD Type 1", "SCD Type 2", "fact table merge", "Silver to Gold", "column mapping", "schema evolution".
license: Apache-2.0
metadata:
  author: prashanth subrahmanyam
  version: "1.0.0"
  domain: gold
  role: worker
  pipeline_stage: 4
  pipeline_stage_name: gold-implementation
  called_by:
    - gold-layer-setup
  standalone: true
  last_verified: "2026-02-07"
  volatility: low
  upstream_sources: []  # Stable MERGE SQL patterns

Gold Layer MERGE Patterns

Core Principle: Schema-Aware Transformations

Gold layer merge operations read from Silver and must handle:

  1. Column name differences
  2. Data type transformations
  3. Business logic calculations
  4. SCD Type 2 tracking

Column Name Mapping Pattern

Problem: Column Names Differ Between Layers

Example: Silver has company_rcn, but Gold expects company_retail_control_number

❌ DON'T: Reference non-existent columns

python
updates_df = (
    silver_df
    .select(
        "store_number",
        "company_retail_control_number",  # ❌ This column doesn't exist in Silver!
        # ...
    )
)

✅ DO: Map columns explicitly with withColumn

python
updates_df = (
    silver_df
    # Map Silver column to Gold column name
    .withColumn("company_retail_control_number", col("company_rcn"))
    .select(
        "store_number",
        "company_retail_control_number",  # ✅ Now this exists
        # ...
    )
)

Variable Naming Conflicts

Problem: Import Conflicts with Local Variables

Critical Rule: NEVER name local variables the same as imported PySpark functions.

❌ DON'T: Shadow imported functions

python
from pyspark.sql.functions import count

def merge_data():
    # Later in the code...
    count = updates_df.count()  # ❌ Shadows the imported 'count' function!
    
    # This will fail:
    df.agg(count("*"))  # Error: 'int' object is not callable

✅ DO: Use descriptive variable names

python
from pyspark.sql.functions import count

def merge_data():
    record_count = updates_df.count()  # ✅ No conflict
    
    # This works:
    df.agg(count("*"))  # ✅ Uses the imported function

Common PySpark Functions to Avoid as Variable Names

  • count → use record_count, row_count, num_records
  • sum → use total, sum_value, aggregated_sum
  • min → use min_value, minimum
  • max → use max_value, maximum
  • round → use rounded_value, result
  • filter → use filtered_df, subset

Merge Operation Patterns

SCD Type 1 (Overwrite)

Use for: Dimension tables where history doesn't matter

Template: See assets/templates/scd-type1-merge.py for complete pattern.

python
def merge_dim_product(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
    """Merge dim_product from Silver to Gold (SCD Type 1)."""
    
    silver_table = f"{catalog}.{silver_schema}.silver_product_dim"
    gold_table = f"{catalog}.{gold_schema}.dim_product"
    
    silver_df = spark.table(silver_table)
    
    # Prepare updates with column mappings
    updates_df = (
        silver_df
        .withColumn("product_key", col("upc_code"))  # Business key
        .withColumn("record_updated_timestamp", current_timestamp())
        .select(
            "product_key", "upc_code", "product_description",
            # ... other columns
            "record_updated_timestamp"
        )
    )
    
    delta_gold = DeltaTable.forName(spark, gold_table)
    
    # SCD Type 1: Update all fields when matched
    delta_gold.alias("target").merge(
        updates_df.alias("source"),
        "target.product_key = source.product_key"
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()
    
    record_count = updates_df.count()  # ✅ Good variable name
    print(f"✓ Merged {record_count} records into dim_product")

SCD Type 2 (Historical Tracking)

Use for: Dimension tables where you need to track changes over time

Template: See assets/templates/scd-type2-merge.py for complete pattern.

python
def merge_dim_store(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
    """Merge dim_store from Silver to Gold (SCD Type 2)."""
    
    silver_table = f"{catalog}.{silver_schema}.silver_store_dim"
    gold_table = f"{catalog}.{gold_schema}.dim_store"
    
    silver_df = spark.table(silver_table)
    
    updates_df = (
        silver_df
        # Generate surrogate key
        .withColumn("store_key", md5(concat_ws("||", col("store_id"), col("processed_timestamp"))))
        
        # SCD Type 2 columns
        .withColumn("effective_from", col("processed_timestamp"))
        .withColumn("effective_to", lit(None).cast("timestamp"))
        .withColumn("is_current", lit(True))
        
        # Derived columns
        .withColumn("store_status", 
                   when(col("close_date").isNotNull(), "Closed").otherwise("Active"))
        
        # Column mappings
        .withColumn("company_retail_control_number", col("company_rcn"))
        
        # Timestamps
        .withColumn("record_created_timestamp", current_timestamp())
        .withColumn("record_updated_timestamp", current_timestamp())
        
        .select(
            "store_key", "store_number", "store_name",
            "company_retail_control_number",  # Mapped column
            "effective_from", "effective_to", "is_current",
            # ... other columns
        )
    )
    
    delta_gold = DeltaTable.forName(spark, gold_table)
    
    # SCD Type 2: Only update timestamp for current records
    delta_gold.alias("target").merge(
        updates_df.alias("source"),
        "target.store_number = source.store_number AND target.is_current = true"
    ).whenMatchedUpdate(set={
        "record_updated_timestamp": "source.record_updated_timestamp"
    }).whenNotMatchedInsertAll(
    ).execute()
    
    record_count = updates_df.count()
    print(f"✓ Merged {record_count} records into dim_store")

Fact Table Aggregation

Use for: Pre-aggregated fact tables from transactional Silver data

Template: See assets/templates/fact-table-aggregation-merge.py for complete pattern.

python
def merge_fact_sales_daily(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
    """Merge fact_sales_daily from Silver to Gold."""
    
    silver_table = f"{catalog}.{silver_schema}.silver_transactions"
    gold_table = f"{catalog}.{gold_schema}.fact_sales_daily"
    
    transactions = spark.table(silver_table)
    
    # Aggregate daily sales
    daily_sales = (
        transactions
        .groupBy("store_number", "upc_code", "transaction_date")
        .agg(
            spark_sum(when(col("quantity_sold") > 0, col("final_sales_price")).otherwise(0)).alias("gross_revenue"),
            spark_sum(col("final_sales_price")).alias("net_revenue"),
            spark_sum(when(col("quantity_sold") > 0, col("quantity_sold")).otherwise(0)).alias("units_sold"),
            count("*").alias("transaction_count"),  # ✅ PySpark function
            # ... more aggregations
        )
        .withColumn("record_created_timestamp", current_timestamp())
        .withColumn("record_updated_timestamp", current_timestamp())
    )
    
    delta_gold = DeltaTable.forName(spark, gold_table)
    
    delta_gold.alias("target").merge(
        daily_sales.alias("source"),
        """target.store_number = source.store_number 
           AND target.upc_code = source.upc_code 
           AND target.transaction_date = source.transaction_date"""
    ).whenMatchedUpdate(set={
        "net_revenue": "source.net_revenue",
        "units_sold": "source.units_sold",
        "transaction_count": "source.transaction_count",
        "record_updated_timestamp": "source.record_updated_timestamp"
    }).whenNotMatchedInsertAll(
    ).execute()
    
    record_count = daily_sales.count()  # ✅ Good variable name
    print(f"✓ Merged {record_count} records into fact_sales_daily")

Schema Evolution Handling

Data Type Changes

python
# Example: INT to BIGINT migration
.withColumn("quantity_sold", col("quantity_sold").cast("bigint"))

# Example: DECIMAL to DOUBLE migration
.withColumn("price", col("price").cast("double"))

Adding Derived Columns

python
# Always calculate in the merge script, not in the target table
.withColumn("total_discount",
           coalesce(col("multi_unit_discount"), lit(0)) +
           coalesce(col("coupon_discount"), lit(0)) +
           coalesce(col("loyalty_discount"), lit(0)))

Error Handling Pattern

python
def main():
    """Main entry point for Gold layer MERGE operations."""
    
    catalog, silver_schema, gold_schema = get_parameters()
    
    spark = SparkSession.builder.appName("Gold Layer MERGE").getOrCreate()
    
    try:
        # Merge dimensions
        merge_dim_store(spark, catalog, silver_schema, gold_schema)
        merge_dim_product(spark, catalog, silver_schema, gold_schema)
        merge_dim_date(spark, catalog, silver_schema, gold_schema)
        
        # Merge facts
        merge_fact_sales_daily(spark, catalog, silver_schema, gold_schema)
        merge_fact_inventory_snapshot(spark, catalog, silver_schema, gold_schema)
        
        print("\n" + "=" * 80)
        print("✓ Gold layer MERGE completed successfully!")
        print("=" * 80)
        
    except Exception as e:
        print(f"\n❌ Error during Gold layer MERGE: {str(e)}")
        raise
        
    finally:
        spark.stop()

Validation Checklist

Before deploying Gold merge scripts:

  • All Silver column references exist
  • Column mappings are explicit (using .withColumn())
  • No variable names shadow PySpark functions
  • MERGE conditions use correct join keys
  • SCD Type 2 includes is_current filter
  • Timestamps are added (record_created_timestamp, record_updated_timestamp)
  • Error handling with try/except
  • Meaningful print statements for monitoring

Common Errors and Solutions

Error: Column 'X' does not exist

Solution: Check Silver table schema. Add explicit column mapping if names differ.

Error: 'int' object is not callable

Solution: Variable name shadows a PySpark function. Rename the variable.

Error: Cartesian product detected

Solution: MERGE condition is missing or incorrect. Add proper join keys.

Error: Schema mismatch during MERGE

Solution: Cast columns to match target table schema explicitly.

References


Reference Files

This skill includes the following template files:

  • assets/templates/scd-type1-merge.py - Complete SCD Type 1 merge pattern template
  • assets/templates/scd-type2-merge.py - Complete SCD Type 2 merge pattern template
  • assets/templates/fact-table-aggregation-merge.py - Fact table aggregation merge template