Saltar a contenido

SilverPromoter

DKOps.ingestion.silver_promoter.SilverPromoter

Bases: LoggableMixin

Promueve datos desde Bronze hacia Silver aplicando la estrategia del contrato.

Uso

promoter = SilverPromoter(spark, env, ops)
rows = promoter.promote(contract, src_contract, dst_contract)
Source code in src/DKOps/ingestion/silver_promoter.py
class SilverPromoter(LoggableMixin):
    """
    Promueve datos desde Bronze hacia Silver aplicando la estrategia del contrato.

    Uso
    ---
        promoter = SilverPromoter(spark, env, ops)
        rows = promoter.promote(contract, src_contract, dst_contract)
    """

    def __init__(
        self,
        spark: SparkSession,
        env:   EnvironmentConfig,
        ops:   IngestionOpsLogger | None = None,
    ) -> None:
        self._spark = spark
        self._env   = env
        self._ops   = ops

    def promote(
        self,
        contract:     IngestionContract,
        src_contract: TableContract,
        dst_contract: TableContract,
    ) -> int:
        """
        Ejecuta la promoción de un dataset.

        Parámetros
        ----------
        contract     : IngestionContract con la estrategia y config
        src_contract : TableContract de la tabla Bronze (fuente)
        dst_contract : TableContract de la tabla Silver (destino)
        """
        if contract.strategy is None:
            raise ValueError(
                f"[{contract.name}] SilverPromoter requiere 'strategy' en el contrato."
            )

        run_id = self._ops.log_start(f"silver.{contract.name}") if self._ops else "local"

        try:
            strategy = self._build_strategy(contract, src_contract, dst_contract)
            rows     = strategy.execute()

            if self._ops:
                self._ops.log_success(
                    run_id, f"silver.{contract.name}", rows_written=rows
                )

            return rows

        except Exception as exc:
            if self._ops:
                self._ops.log_failure(run_id, f"silver.{contract.name}", exc)
            raise

    def promote_all(
        self,
        contracts:     list[IngestionContract],
        src_contracts: dict[str, TableContract],
        dst_contracts: dict[str, TableContract],
    ) -> list[str]:
        """
        Promueve múltiples datasets. Devuelve lista de nombres que fallaron.
        """
        failed = []
        for c in contracts:
            src = src_contracts.get(c.name)
            dst = dst_contracts.get(c.name)
            if src is None or dst is None:
                self.log.warning(f"[{c.name}] Falta src o dst contract — omitido")
                continue
            try:
                self.promote(c, src, dst)
            except Exception as exc:
                self.log.error(f"[{c.name}] Error en promoción: {exc} — continuando")
                failed.append(c.name)

        if failed:
            self.log.warning(f"Datasets Silver fallidos: {failed}")
        else:
            self.log.info("Promoción Silver completada sin errores ✔")
        return failed

    def _build_strategy(
        self,
        contract:     IngestionContract,
        src_contract: TableContract,
        dst_contract: TableContract,
    ) -> BasePromotionStrategy:
        kwargs = dict(
            spark        = self._spark,
            contract     = contract,
            src_contract = src_contract,
            dst_contract = dst_contract,
        )

        strategy_map: dict[SilverStrategy, type[BasePromotionStrategy]] = {
            SilverStrategy.FULL_MERGE:          FullMergeStrategy,
            SilverStrategy.INCREMENTAL_REPLACE: IncrementalReplaceStrategy,
            SilverStrategy.APPEND_DEDUP:        AppendDedupStrategy,
        }

        if contract.strategy in strategy_map:
            return strategy_map[contract.strategy](**kwargs)

        if contract.strategy == SilverStrategy.CDC_MERGE:
            return CdcMergeStrategy(**kwargs)

        raise ValueError(
            f"Estrategia '{contract.strategy}' no reconocida. "
            f"Válidas: {[s.value for s in SilverStrategy]}"
        )

Functions

__init__(spark, env, ops=None)

Source code in src/DKOps/ingestion/silver_promoter.py
def __init__(
    self,
    spark: SparkSession,
    env:   EnvironmentConfig,
    ops:   IngestionOpsLogger | None = None,
) -> None:
    self._spark = spark
    self._env   = env
    self._ops   = ops

promote(contract, src_contract, dst_contract)

Ejecuta la promoción de un dataset.

Parámetros

contract : IngestionContract con la estrategia y config src_contract : TableContract de la tabla Bronze (fuente) dst_contract : TableContract de la tabla Silver (destino)

Source code in src/DKOps/ingestion/silver_promoter.py
def promote(
    self,
    contract:     IngestionContract,
    src_contract: TableContract,
    dst_contract: TableContract,
) -> int:
    """
    Ejecuta la promoción de un dataset.

    Parámetros
    ----------
    contract     : IngestionContract con la estrategia y config
    src_contract : TableContract de la tabla Bronze (fuente)
    dst_contract : TableContract de la tabla Silver (destino)
    """
    if contract.strategy is None:
        raise ValueError(
            f"[{contract.name}] SilverPromoter requiere 'strategy' en el contrato."
        )

    run_id = self._ops.log_start(f"silver.{contract.name}") if self._ops else "local"

    try:
        strategy = self._build_strategy(contract, src_contract, dst_contract)
        rows     = strategy.execute()

        if self._ops:
            self._ops.log_success(
                run_id, f"silver.{contract.name}", rows_written=rows
            )

        return rows

    except Exception as exc:
        if self._ops:
            self._ops.log_failure(run_id, f"silver.{contract.name}", exc)
        raise

promote_all(contracts, src_contracts, dst_contracts)

Promueve múltiples datasets. Devuelve lista de nombres que fallaron.

Source code in src/DKOps/ingestion/silver_promoter.py
def promote_all(
    self,
    contracts:     list[IngestionContract],
    src_contracts: dict[str, TableContract],
    dst_contracts: dict[str, TableContract],
) -> list[str]:
    """
    Promueve múltiples datasets. Devuelve lista de nombres que fallaron.
    """
    failed = []
    for c in contracts:
        src = src_contracts.get(c.name)
        dst = dst_contracts.get(c.name)
        if src is None or dst is None:
            self.log.warning(f"[{c.name}] Falta src o dst contract — omitido")
            continue
        try:
            self.promote(c, src, dst)
        except Exception as exc:
            self.log.error(f"[{c.name}] Error en promoción: {exc} — continuando")
            failed.append(c.name)

    if failed:
        self.log.warning(f"Datasets Silver fallidos: {failed}")
    else:
        self.log.info("Promoción Silver completada sin errores ✔")
    return failed