Saltar a contenido

Estrategias Silver

FullMergeStrategy

DKOps.ingestion.strategies.full_merge.FullMergeStrategy

Bases: BasePromotionStrategy

MERGE INTO con deduplicación — mantiene el registro más reciente por clave.

Source code in src/DKOps/ingestion/strategies/full_merge.py
class FullMergeStrategy(BasePromotionStrategy):
    """MERGE INTO con deduplicación — mantiene el registro más reciente por clave."""

    def execute(self) -> int:
        self.log.info(
            f"[{self._contract.name}] FullMerge | "
            f"keys={list(self._contract.merge_keys)} | "
            f"watermark={self._contract.watermark_col}"
        )

        bronze_df = self._read_bronze()

        # Deduplicar: quedarse con el registro más reciente por merge_keys
        deduped = self._dedup(bronze_df)

        # Añadir timestamps Silver si el contrato los pide
        if self._contract.metadata.add_silver_timestamps:
            deduped = (
                deduped
                .withColumn("_silver_modified_at", F.current_timestamp())
            )

        # Seleccionar solo columnas Silver (excluir metadata Bronze)
        deduped = self._select_for_silver(deduped)

        # MERGE INTO Silver
        self._writer.upsert(
            deduped,
            keys           = list(self._contract.merge_keys),
        )

        count = self._dst_reader.read().count()
        self.log.info(f"[{self._contract.name}] FullMerge completado | silver_rows={count:,}")
        return count

    def _dedup(self, df: DataFrame) -> DataFrame:
        keys = list(self._contract.merge_keys)
        wcol = self._contract.watermark_col

        if wcol and wcol in df.columns:
            window = Window.partitionBy(*keys).orderBy(F.col(wcol).desc())
            return (
                df.withColumn("_row_num", F.row_number().over(window))
                  .filter(F.col("_row_num") == 1)
                  .drop("_row_num")
            )

        # Sin watermark: dedup simple por clave
        return df.dropDuplicates(keys)

Functions

execute()

Source code in src/DKOps/ingestion/strategies/full_merge.py
def execute(self) -> int:
    self.log.info(
        f"[{self._contract.name}] FullMerge | "
        f"keys={list(self._contract.merge_keys)} | "
        f"watermark={self._contract.watermark_col}"
    )

    bronze_df = self._read_bronze()

    # Deduplicar: quedarse con el registro más reciente por merge_keys
    deduped = self._dedup(bronze_df)

    # Añadir timestamps Silver si el contrato los pide
    if self._contract.metadata.add_silver_timestamps:
        deduped = (
            deduped
            .withColumn("_silver_modified_at", F.current_timestamp())
        )

    # Seleccionar solo columnas Silver (excluir metadata Bronze)
    deduped = self._select_for_silver(deduped)

    # MERGE INTO Silver
    self._writer.upsert(
        deduped,
        keys           = list(self._contract.merge_keys),
    )

    count = self._dst_reader.read().count()
    self.log.info(f"[{self._contract.name}] FullMerge completado | silver_rows={count:,}")
    return count

CdcMergeStrategy

DKOps.ingestion.strategies.cdc_merge.CdcMergeStrategy

Bases: BasePromotionStrategy

Aplica eventos CDC (I/U/D) desde Bronze hacia Silver. Mantiene el estado actual de cada entidad con soft deletes.

Source code in src/DKOps/ingestion/strategies/cdc_merge.py
class CdcMergeStrategy(BasePromotionStrategy):
    """
    Aplica eventos CDC (I/U/D) desde Bronze hacia Silver.
    Mantiene el estado actual de cada entidad con soft deletes.
    """

    def __init__(
        self,
        spark:        SparkSession,
        contract:     IngestionContract,
        src_contract: TableContract,
        dst_contract: TableContract,
        op_col:       str  = "op_type",
        soft_delete:  bool = True,
    ) -> None:
        super().__init__(spark, contract, src_contract, dst_contract)
        self._op_col      = op_col
        self._soft_delete = soft_delete

    def execute(self) -> int:
        self.log.info(
            f"[{self._contract.name}] CdcMerge | "
            f"keys={list(self._contract.merge_keys)} | "
            f"op_col={self._op_col} | soft_delete={self._soft_delete}"
        )

        bronze_df = self._read_bronze()
        latest_df = self._keep_latest_per_key(bronze_df)

        # Separar inserts/updates de deletes
        deletes = latest_df.filter(F.col(self._op_col) == "D")
        upserts = latest_df.filter(F.col(self._op_col).isin("I", "U"))

        if upserts.count() > 0:
            # Añadir timestamps Silver
            if self._contract.metadata.add_silver_timestamps:
                upserts = upserts.withColumn("_silver_modified_at", F.current_timestamp())

            # Añadir is_deleted=False si la columna existe en Silver pero no en el DataFrame
            if (
                "is_deleted" in self._dst_contract.column_names
                and "is_deleted" not in upserts.columns
            ):
                upserts = upserts.withColumn("is_deleted", F.lit(False))

            # Seleccionar solo columnas Silver (excluir metadata Bronze)
            upserts = self._select_for_silver(upserts)

            self._writer.upsert(
                upserts,
                keys = list(self._contract.merge_keys),
            )

        if deletes.count() > 0:
            self._apply_deletes(deletes)

        count = self._dst_reader.read().count()
        self.log.info(f"[{self._contract.name}] CdcMerge completado | silver_rows={count:,}")
        return count

    def _keep_latest_per_key(self, df: DataFrame) -> DataFrame:
        """Retiene solo el evento más reciente por clave de negocio."""
        keys  = list(self._contract.merge_keys)
        wcol  = self._contract.watermark_col

        if wcol and wcol in df.columns:
            window = Window.partitionBy(*keys).orderBy(F.col(wcol).desc())
            return (
                df.withColumn("_row_num", F.row_number().over(window))
                  .filter(F.col("_row_num") == 1)
                  .drop("_row_num")
            )
        return df.dropDuplicates(keys)

    def _apply_deletes(self, deletes: DataFrame) -> None:
        keys      = list(self._contract.merge_keys)
        dst_table = self._dst_contract.effective_name

        if self._soft_delete and "is_deleted" in self._dst_contract.column_names:
            # Soft delete: marcar is_deleted=True vía upsert
            soft = deletes.withColumn("is_deleted", F.lit(True))

            # Añadir timestamps Silver también en soft-deletes
            if self._contract.metadata.add_silver_timestamps:
                soft = soft.withColumn("_silver_modified_at", F.current_timestamp())

            # Seleccionar solo columnas Silver (excluir metadata Bronze)
            soft = self._select_for_silver(soft)

            self._writer.upsert(soft, keys=keys)
        else:
            # Hard delete: DELETE WHERE key IN (...)
            key_col   = keys[0]
            key_vals  = [str(row[key_col]) for row in deletes.select(key_col).collect()]
            quoted    = ", ".join(f"'{v}'" for v in key_vals)
            condition = f"{key_col} IN ({quoted})"
            self._writer.delete(condition)

Functions

__init__(spark, contract, src_contract, dst_contract, op_col='op_type', soft_delete=True)

Source code in src/DKOps/ingestion/strategies/cdc_merge.py
def __init__(
    self,
    spark:        SparkSession,
    contract:     IngestionContract,
    src_contract: TableContract,
    dst_contract: TableContract,
    op_col:       str  = "op_type",
    soft_delete:  bool = True,
) -> None:
    super().__init__(spark, contract, src_contract, dst_contract)
    self._op_col      = op_col
    self._soft_delete = soft_delete

execute()

Source code in src/DKOps/ingestion/strategies/cdc_merge.py
def execute(self) -> int:
    self.log.info(
        f"[{self._contract.name}] CdcMerge | "
        f"keys={list(self._contract.merge_keys)} | "
        f"op_col={self._op_col} | soft_delete={self._soft_delete}"
    )

    bronze_df = self._read_bronze()
    latest_df = self._keep_latest_per_key(bronze_df)

    # Separar inserts/updates de deletes
    deletes = latest_df.filter(F.col(self._op_col) == "D")
    upserts = latest_df.filter(F.col(self._op_col).isin("I", "U"))

    if upserts.count() > 0:
        # Añadir timestamps Silver
        if self._contract.metadata.add_silver_timestamps:
            upserts = upserts.withColumn("_silver_modified_at", F.current_timestamp())

        # Añadir is_deleted=False si la columna existe en Silver pero no en el DataFrame
        if (
            "is_deleted" in self._dst_contract.column_names
            and "is_deleted" not in upserts.columns
        ):
            upserts = upserts.withColumn("is_deleted", F.lit(False))

        # Seleccionar solo columnas Silver (excluir metadata Bronze)
        upserts = self._select_for_silver(upserts)

        self._writer.upsert(
            upserts,
            keys = list(self._contract.merge_keys),
        )

    if deletes.count() > 0:
        self._apply_deletes(deletes)

    count = self._dst_reader.read().count()
    self.log.info(f"[{self._contract.name}] CdcMerge completado | silver_rows={count:,}")
    return count

IncrementalReplaceStrategy

DKOps.ingestion.strategies.incremental_replace.IncrementalReplaceStrategy

Bases: BasePromotionStrategy

Reemplaza la partición más reciente de Silver con el snapshot de Bronze.

Source code in src/DKOps/ingestion/strategies/incremental_replace.py
class IncrementalReplaceStrategy(BasePromotionStrategy):
    """Reemplaza la partición más reciente de Silver con el snapshot de Bronze."""

    def execute(self) -> int:
        partition_col = (
            self._contract.watermark_col
            or (self._dst_contract.partition_columns[0] if self._dst_contract.partition_columns else "_ingested_date")
        )

        self.log.info(
            f"[{self._contract.name}] IncrementalReplace | partition_col={partition_col}"
        )

        bronze_df = self._read_bronze()

        # Fecha de la última partición disponible
        max_val_row = bronze_df.select(F.max(F.col(partition_col)).alias("max_val")).collect()
        if not max_val_row or max_val_row[0]["max_val"] is None:
            self.log.warning(f"[{self._contract.name}] Bronze vacío, nada que procesar")
            return 0

        max_val   = max_val_row[0]["max_val"]
        latest_df = bronze_df.filter(F.col(partition_col) == max_val)

        # Añadir timestamps Silver antes de filtrar columnas
        if self._contract.metadata.add_silver_timestamps:
            latest_df = latest_df.withColumn("_silver_modified_at", F.current_timestamp())

        # Seleccionar solo columnas Silver (excluir metadata Bronze)
        latest_df = self._select_for_silver(latest_df)

        self._writer.overwrite_partition(
            latest_df,
            partition = {partition_col: str(max_val)},
        )

        count = latest_df.count()
        self.log.info(
            f"[{self._contract.name}] IncrementalReplace completado | "
            f"partition={max_val} | rows={count:,}"
        )
        return count

Functions

execute()

Source code in src/DKOps/ingestion/strategies/incremental_replace.py
def execute(self) -> int:
    partition_col = (
        self._contract.watermark_col
        or (self._dst_contract.partition_columns[0] if self._dst_contract.partition_columns else "_ingested_date")
    )

    self.log.info(
        f"[{self._contract.name}] IncrementalReplace | partition_col={partition_col}"
    )

    bronze_df = self._read_bronze()

    # Fecha de la última partición disponible
    max_val_row = bronze_df.select(F.max(F.col(partition_col)).alias("max_val")).collect()
    if not max_val_row or max_val_row[0]["max_val"] is None:
        self.log.warning(f"[{self._contract.name}] Bronze vacío, nada que procesar")
        return 0

    max_val   = max_val_row[0]["max_val"]
    latest_df = bronze_df.filter(F.col(partition_col) == max_val)

    # Añadir timestamps Silver antes de filtrar columnas
    if self._contract.metadata.add_silver_timestamps:
        latest_df = latest_df.withColumn("_silver_modified_at", F.current_timestamp())

    # Seleccionar solo columnas Silver (excluir metadata Bronze)
    latest_df = self._select_for_silver(latest_df)

    self._writer.overwrite_partition(
        latest_df,
        partition = {partition_col: str(max_val)},
    )

    count = latest_df.count()
    self.log.info(
        f"[{self._contract.name}] IncrementalReplace completado | "
        f"partition={max_val} | rows={count:,}"
    )
    return count

AppendDedupStrategy

DKOps.ingestion.strategies.append_dedup.AppendDedupStrategy

Bases: BasePromotionStrategy

Inserta registros de Bronze que no existen en Silver (por merge_keys).

Source code in src/DKOps/ingestion/strategies/append_dedup.py
class AppendDedupStrategy(BasePromotionStrategy):
    """Inserta registros de Bronze que no existen en Silver (por merge_keys)."""

    def execute(self) -> int:
        keys = list(self._contract.merge_keys)
        self.log.info(
            f"[{self._contract.name}] AppendDedup | keys={keys}"
        )

        bronze_df = self._read_bronze()

        # Leer Silver (destino) para el anti-join.
        # En el primer run la tabla aún no existe → todos los registros de Bronze son nuevos.
        try:
            silver_df   = self._dst_reader.read()
            new_records = self._filter_new(bronze_df, silver_df, keys)
        except Exception:
            self.log.info(
                f"[{self._contract.name}] Silver aún no existe — "
                f"insertando todos los registros de Bronze como registros nuevos"
            )
            new_records = bronze_df

        count = new_records.count()

        if count == 0:
            self.log.info(f"[{self._contract.name}] AppendDedup: sin registros nuevos")
            return 0

        # Añadir timestamps Silver antes de filtrar columnas
        if self._contract.metadata.add_silver_timestamps:
            new_records = new_records.withColumn("_silver_modified_at", F.current_timestamp())

        # Seleccionar solo columnas Silver (excluir metadata Bronze)
        new_records = self._select_for_silver(new_records)
        self._writer.append(new_records)
        self.log.info(f"[{self._contract.name}] AppendDedup completado | new_rows={count:,}")
        return count

    @staticmethod
    def _filter_new(
        source: DataFrame,
        target: DataFrame,
        keys:   list[str],
    ) -> DataFrame:
        """Anti-join: retorna filas de source que no existen en target por keys."""
        target_keys = target.select(*keys).withColumn("_exists", F.lit(True))
        joined = source.join(target_keys, on=keys, how="left")
        return joined.filter(F.col("_exists").isNull()).drop("_exists")

Functions

execute()

Source code in src/DKOps/ingestion/strategies/append_dedup.py
def execute(self) -> int:
    keys = list(self._contract.merge_keys)
    self.log.info(
        f"[{self._contract.name}] AppendDedup | keys={keys}"
    )

    bronze_df = self._read_bronze()

    # Leer Silver (destino) para el anti-join.
    # En el primer run la tabla aún no existe → todos los registros de Bronze son nuevos.
    try:
        silver_df   = self._dst_reader.read()
        new_records = self._filter_new(bronze_df, silver_df, keys)
    except Exception:
        self.log.info(
            f"[{self._contract.name}] Silver aún no existe — "
            f"insertando todos los registros de Bronze como registros nuevos"
        )
        new_records = bronze_df

    count = new_records.count()

    if count == 0:
        self.log.info(f"[{self._contract.name}] AppendDedup: sin registros nuevos")
        return 0

    # Añadir timestamps Silver antes de filtrar columnas
    if self._contract.metadata.add_silver_timestamps:
        new_records = new_records.withColumn("_silver_modified_at", F.current_timestamp())

    # Seleccionar solo columnas Silver (excluir metadata Bronze)
    new_records = self._select_for_silver(new_records)
    self._writer.append(new_records)
    self.log.info(f"[{self._contract.name}] AppendDedup completado | new_rows={count:,}")
    return count