Saltar a contenido

BronzeIngestor

DKOps.ingestion.bronze_ingestor.BronzeIngestor

Bases: LoggableMixin

Ingesta datos desde Landing hasta Bronze.

Uso

ingestor = BronzeIngestor(spark, env, ops)
rows = ingestor.ingest(contract, dst_contract)

El BronzeIngestor no carga contratos — eso lo hace IngestionEngine.

Source code in src/DKOps/ingestion/bronze_ingestor.py
class BronzeIngestor(LoggableMixin):
    """
    Ingesta datos desde Landing hasta Bronze.

    Uso
    ---
        ingestor = BronzeIngestor(spark, env, ops)
        rows = ingestor.ingest(contract, dst_contract)

    El BronzeIngestor no carga contratos — eso lo hace IngestionEngine.
    """

    def __init__(
        self,
        spark:       SparkSession,
        env:         EnvironmentConfig,
        ops:         IngestionOpsLogger | None = None,
        schema_root: str | None               = None,
        kafka_creds: dict | None              = None,
    ) -> None:
        self._spark       = spark
        self._env         = env
        self._ops         = ops
        self._schema_root = schema_root
        self._kafka_creds = kafka_creds
        self._enricher    = MetadataEnricher()

    def ingest(
        self,
        contract:     IngestionContract,
        dst_contract: TableContract,
    ) -> int:
        """
        Ejecuta la ingesta de un dataset.

        Parámetros
        ----------
        contract     : IngestionContract (fuente + config de ingesta)
        dst_contract : TableContract destino en Bronze

        Devuelve
        --------
        Número de filas escritas (estimado desde streaming o exacto desde batch).
        """
        run_id = self._ops.log_start(contract.name) if self._ops else "local"

        try:
            reader = SourceReaderFactory.create(
                contract    = contract,
                spark       = self._spark,
                env         = self._env,
                kafka_creds = self._kafka_creds,
                schema_root = self._schema_root,
            )

            df = reader.read()
            df = self._enricher.enrich(df, contract.metadata, contract.source.format)

            if df.isStreaming:
                rows = self._write_stream(df, contract, dst_contract)
            else:
                rows = self._write_batch(df, contract, dst_contract)

            if self._ops:
                self._ops.log_success(run_id, contract.name, rows_written=rows)

            return rows

        except Exception as exc:
            if self._ops:
                self._ops.log_failure(run_id, contract.name, exc)
            raise

    def ingest_all(
        self,
        contracts:     list[IngestionContract],
        dst_contracts: dict[str, TableContract],
    ) -> list[str]:
        """
        Ingesta múltiples datasets. Devuelve lista de nombres que fallaron.
        Continúa en caso de error por dataset (fail-continue).
        """
        failed = []
        for c in contracts:
            dst = dst_contracts.get(c.name)
            if dst is None:
                self.log.warning(f"[{c.name}] Sin TableContract destino — omitido")
                continue
            try:
                self.ingest(c, dst)
            except Exception as exc:
                self.log.error(f"[{c.name}] Error: {exc} — continuando")
                failed.append(c.name)

        if failed:
            self.log.warning(f"Datasets fallidos: {failed}")
        else:
            self.log.info("Ingesta batch completada sin errores ✔")
        return failed

    # ── Escritura batch ───────────────────────────────────────────────────

    def _write_batch(
        self,
        df:           DataFrame,
        contract:     IngestionContract,
        dst_contract: TableContract,
    ) -> int:
        """
        Escribe en Bronze usando *partition overwrite* sobre la columna de
        ingesta declarada en el contrato (por defecto ``_ingested_date``).

        Patrón: cada corrida sobreescribe únicamente la partición del día
        actual, dejando el histórico de otras particiones intacto. Esto
        garantiza idempotencia: ejecutar el motor N veces al día siempre
        deja la versión más reciente de los datos en esa partición.

        Si la tabla Bronze no declara ninguna columna de partición de ingesta,
        se usa ``append`` como fallback (con advertencia).

        El ``load_type`` del contrato (FULL / INCREMENTAL / CDC) es una
        etiqueta semántica para Silver — no afecta el mecanismo de escritura
        en Bronze.
        """
        validator = SchemaValidator(dst_contract)
        validator.validate(df).raise_if_critical()

        writer         = TableWriter(dst_contract, strict_columns=False)
        partition_col  = self._find_ingestion_partition(dst_contract)

        if partition_col and partition_col in df.columns:
            # Partition overwrite idempotente — solo la ventana de hoy
            today = str(_date.today())
            writer.overwrite_partition(df, {partition_col: today})
            self.log.info(
                f"[{contract.name}] partition overwrite OK | "
                f"table={dst_contract.full_name} | "
                f"{partition_col}={today} | load_type={contract.load_type.value}"
            )
        else:
            self.log.warning(
                f"[{contract.name}] Sin columna de partición de ingesta en el "
                f"TableContract → usando append. Considera añadir '_ingested_date' "
                f"a partition_columns en {dst_contract.name}.json"
            )
            writer.append(df)

        count = df.count()
        self.log.info(
            f"[{contract.name}] rows={count:,} | table={dst_contract.full_name}"
        )
        return count

    @staticmethod
    def _find_ingestion_partition(dst_contract: TableContract) -> str | None:
        """
        Devuelve la columna de partición de ingesta del contrato Bronze.

        Busca en orden: ``_ingested_date`` → ``_ingested_at``.
        Si ninguna está declarada como columna de partición, devuelve None.
        """
        for candidate in ("_ingested_date", "_ingested_at"):
            if candidate in dst_contract.partition_columns:
                return candidate
        return None

    # ── Escritura streaming ───────────────────────────────────────────────

    def _write_stream(
        self,
        df:           DataFrame,
        contract:     IngestionContract,
        dst_contract: TableContract,
    ) -> int:
        checkpoint = self._resolve_checkpoint(contract)
        self.log.info(
            f"[{contract.name}] streaming write | "
            f"table={dst_contract.full_name} | checkpoint={checkpoint}"
        )

        writer = (
            df.writeStream
                .format("delta")
                .option("checkpointLocation", checkpoint)
                .option("mergeSchema", "true")
        )

        if dst_contract.partition_columns:
            writer = writer.partitionBy(*dst_contract.partition_columns)

        if contract.trigger == StreamTrigger.AVAILABLE_NOW:
            writer = writer.trigger(availableNow=True)

        if self._env._is_databricks:
            query = writer.toTable(dst_contract.full_name)
        else:
            delta_path = dst_contract.location or self._local_delta_path(dst_contract)
            Path(delta_path).mkdir(parents=True, exist_ok=True)
            query = writer.option("path", delta_path).start()

        query.awaitTermination()

        # Registrar tabla local si no existe aún
        if not self._env._is_databricks:
            self._register_local_table(dst_contract)

        try:
            return self._spark.read.format("delta").load(
                dst_contract.location or self._local_delta_path(dst_contract)
            ).count()
        except Exception:
            return -1

    def _resolve_checkpoint(self, contract: IngestionContract) -> str:
        """Construye la ruta del checkpoint según entorno."""
        if self._env.has_path("checkpoint"):
            base = self._env.get_path("checkpoint")
        else:
            warehouse = self._spark.conf.get(
                "spark.sql.warehouse.dir", "/tmp/spark-warehouse"
            )
            base = os.path.join(warehouse, "_checkpoints")

        return os.path.join(base, contract.checkpoint_suffix)

    def _local_delta_path(self, dst_contract: TableContract) -> str:
        warehouse = self._spark.conf.get(
            "spark.sql.warehouse.dir", "/tmp/spark-warehouse"
        )
        return os.path.join(
            warehouse, dst_contract.schema, dst_contract.name
        )

    def _register_local_table(self, dst_contract: TableContract) -> None:
        """Registra la tabla en el catálogo local de Spark si no existe."""
        delta_path = dst_contract.location or self._local_delta_path(dst_contract)
        table_ref  = f"{dst_contract.schema}.{dst_contract.name}"
        try:
            self._spark.sql(
                f"CREATE TABLE IF NOT EXISTS {table_ref} "
                f"USING DELTA LOCATION '{delta_path}'"
            )
        except Exception as exc:
            self.log.debug(f"Tabla ya registrada o error menor: {exc}")

Functions

__init__(spark, env, ops=None, schema_root=None, kafka_creds=None)

Source code in src/DKOps/ingestion/bronze_ingestor.py
def __init__(
    self,
    spark:       SparkSession,
    env:         EnvironmentConfig,
    ops:         IngestionOpsLogger | None = None,
    schema_root: str | None               = None,
    kafka_creds: dict | None              = None,
) -> None:
    self._spark       = spark
    self._env         = env
    self._ops         = ops
    self._schema_root = schema_root
    self._kafka_creds = kafka_creds
    self._enricher    = MetadataEnricher()

ingest(contract, dst_contract)

Ejecuta la ingesta de un dataset.

Parámetros

contract : IngestionContract (fuente + config de ingesta) dst_contract : TableContract destino en Bronze

Devuelve

Número de filas escritas (estimado desde streaming o exacto desde batch).

Source code in src/DKOps/ingestion/bronze_ingestor.py
def ingest(
    self,
    contract:     IngestionContract,
    dst_contract: TableContract,
) -> int:
    """
    Ejecuta la ingesta de un dataset.

    Parámetros
    ----------
    contract     : IngestionContract (fuente + config de ingesta)
    dst_contract : TableContract destino en Bronze

    Devuelve
    --------
    Número de filas escritas (estimado desde streaming o exacto desde batch).
    """
    run_id = self._ops.log_start(contract.name) if self._ops else "local"

    try:
        reader = SourceReaderFactory.create(
            contract    = contract,
            spark       = self._spark,
            env         = self._env,
            kafka_creds = self._kafka_creds,
            schema_root = self._schema_root,
        )

        df = reader.read()
        df = self._enricher.enrich(df, contract.metadata, contract.source.format)

        if df.isStreaming:
            rows = self._write_stream(df, contract, dst_contract)
        else:
            rows = self._write_batch(df, contract, dst_contract)

        if self._ops:
            self._ops.log_success(run_id, contract.name, rows_written=rows)

        return rows

    except Exception as exc:
        if self._ops:
            self._ops.log_failure(run_id, contract.name, exc)
        raise

ingest_all(contracts, dst_contracts)

Ingesta múltiples datasets. Devuelve lista de nombres que fallaron. Continúa en caso de error por dataset (fail-continue).

Source code in src/DKOps/ingestion/bronze_ingestor.py
def ingest_all(
    self,
    contracts:     list[IngestionContract],
    dst_contracts: dict[str, TableContract],
) -> list[str]:
    """
    Ingesta múltiples datasets. Devuelve lista de nombres que fallaron.
    Continúa en caso de error por dataset (fail-continue).
    """
    failed = []
    for c in contracts:
        dst = dst_contracts.get(c.name)
        if dst is None:
            self.log.warning(f"[{c.name}] Sin TableContract destino — omitido")
            continue
        try:
            self.ingest(c, dst)
        except Exception as exc:
            self.log.error(f"[{c.name}] Error: {exc} — continuando")
            failed.append(c.name)

    if failed:
        self.log.warning(f"Datasets fallidos: {failed}")
    else:
        self.log.info("Ingesta batch completada sin errores ✔")
    return failed