Saltar a contenido

Upsert Writer

DKOps.table_governance.writers.upsert_writer

upsert_writer.py

Escritura incremental con actualizaciones — MERGE INTO.

MERGE INTO funciona en ambos runtimes (Delta lo soporta nativamente). En local PC la tabla debe existir previamente — usa CreateWriter primero.

Uso

UpsertWriter(contract).write(df, merge_keys=["vuelo_id"])

Classes

UpsertWriter

Bases: BaseWriter

MERGE INTO — inserta filas nuevas, actualiza las existentes.

Uso
UpsertWriter(contract).write(
    df,
    merge_keys=["vuelo_id"],
    update_columns=["retraso_min"],   # opcional — default: todas las no-key
)
Source code in src/DKOps/table_governance/writers/upsert_writer.py
class UpsertWriter(BaseWriter):
    """
    MERGE INTO — inserta filas nuevas, actualiza las existentes.

    Uso
    ---
        UpsertWriter(contract).write(
            df,
            merge_keys=["vuelo_id"],
            update_columns=["retraso_min"],   # opcional — default: todas las no-key
        )
    """

    @log_operation("upsert")
    def write(
        self,
        df:             DataFrame,
        merge_keys:     list[str] | None = None,
        update_columns: list[str] | None = None,
        **kwargs,
    ) -> None:
        if not merge_keys:
            raise ValueError(
                "UpsertWriter requiere 'merge_keys'. "
                "Ejemplo: writer.write(df, merge_keys=['vuelo_id'])"
            )

        contract_cols = set(self._contract.column_names)
        for key in merge_keys:
            if key not in contract_cols:
                raise ValueError(
                    f"merge_key '{key}' no definida en contrato de "
                    f"'{self._table_name}'.\n"
                    f"Columnas disponibles: {sorted(contract_cols)}"
                )

        self.log.info(
            f"Iniciando UPSERT | tabla='{self._table_name}' | "
            f"merge_keys={merge_keys}"
        )

        self._validate(df)
        df = self._apply_defaults(df)
        df = self._reorder_columns(df)

        if self._dry_run:
            self._log_dry_run("upsert")
            return

        # Primer run: si la tabla no existe, hacer carga inicial via overwrite
        # (equivalente a upsert donde todos los registros son inserciones)
        if not self._table_exists():
            self.log.info(
                f"[upsert] Tabla '{self._table_name}' no existe — "
                f"carga inicial via overwrite (todos son inserciones nuevas)"
            )
            # Seleccionar solo las columnas del contrato Silver (excluir metadata Bronze)
            df_cols = set(df.columns)
            silver_cols = [c for c in self._contract.column_names if c in df_cols]
            self._write_df(df.select(*silver_cols), mode="overwrite")
            return

        # Vista temporal para el MERGE
        tmp_view = f"_tmp_upsert_{self._contract.name}"
        df.createOrReplaceTempView(tmp_view)

        join_cond = " AND ".join(
            f"target.`{k}` = source.`{k}`" for k in merge_keys
        )

        all_cols = self._contract.column_names
        cols_to_update = (
            [c for c in update_columns if c not in merge_keys]
            if update_columns
            else [c for c in all_cols if c not in merge_keys]
        )

        set_clause    = ", ".join(f"target.`{c}` = source.`{c}`" for c in cols_to_update)
        insert_cols   = ", ".join(f"`{c}`" for c in all_cols)
        insert_values = ", ".join(f"source.`{c}`" for c in all_cols)

        merge_sql = f"""
        MERGE INTO {self._table_name} AS target
        USING {tmp_view} AS source
        ON {join_cond}
        WHEN MATCHED THEN UPDATE SET {set_clause}
        WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_values})
        """

        self.log.debug(f"MERGE SQL:\n{merge_sql.strip()}")
        result = self._spark.sql(merge_sql)

        try:
            metrics  = result.collect()[0].asDict()
            inserted = metrics.get("num_inserted_rows", "?")
            updated  = metrics.get("num_updated_rows", "?")
            self.log.success(
                f"✔ UPSERT completado | tabla='{self._table_name}' | "
                f"insertados={inserted} | actualizados={updated}"
            )
        except Exception:
            self.log.success(f"✔ UPSERT completado | tabla='{self._table_name}'")
Functions
write(df, merge_keys=None, update_columns=None, **kwargs)
Source code in src/DKOps/table_governance/writers/upsert_writer.py
@log_operation("upsert")
def write(
    self,
    df:             DataFrame,
    merge_keys:     list[str] | None = None,
    update_columns: list[str] | None = None,
    **kwargs,
) -> None:
    if not merge_keys:
        raise ValueError(
            "UpsertWriter requiere 'merge_keys'. "
            "Ejemplo: writer.write(df, merge_keys=['vuelo_id'])"
        )

    contract_cols = set(self._contract.column_names)
    for key in merge_keys:
        if key not in contract_cols:
            raise ValueError(
                f"merge_key '{key}' no definida en contrato de "
                f"'{self._table_name}'.\n"
                f"Columnas disponibles: {sorted(contract_cols)}"
            )

    self.log.info(
        f"Iniciando UPSERT | tabla='{self._table_name}' | "
        f"merge_keys={merge_keys}"
    )

    self._validate(df)
    df = self._apply_defaults(df)
    df = self._reorder_columns(df)

    if self._dry_run:
        self._log_dry_run("upsert")
        return

    # Primer run: si la tabla no existe, hacer carga inicial via overwrite
    # (equivalente a upsert donde todos los registros son inserciones)
    if not self._table_exists():
        self.log.info(
            f"[upsert] Tabla '{self._table_name}' no existe — "
            f"carga inicial via overwrite (todos son inserciones nuevas)"
        )
        # Seleccionar solo las columnas del contrato Silver (excluir metadata Bronze)
        df_cols = set(df.columns)
        silver_cols = [c for c in self._contract.column_names if c in df_cols]
        self._write_df(df.select(*silver_cols), mode="overwrite")
        return

    # Vista temporal para el MERGE
    tmp_view = f"_tmp_upsert_{self._contract.name}"
    df.createOrReplaceTempView(tmp_view)

    join_cond = " AND ".join(
        f"target.`{k}` = source.`{k}`" for k in merge_keys
    )

    all_cols = self._contract.column_names
    cols_to_update = (
        [c for c in update_columns if c not in merge_keys]
        if update_columns
        else [c for c in all_cols if c not in merge_keys]
    )

    set_clause    = ", ".join(f"target.`{c}` = source.`{c}`" for c in cols_to_update)
    insert_cols   = ", ".join(f"`{c}`" for c in all_cols)
    insert_values = ", ".join(f"source.`{c}`" for c in all_cols)

    merge_sql = f"""
    MERGE INTO {self._table_name} AS target
    USING {tmp_view} AS source
    ON {join_cond}
    WHEN MATCHED THEN UPDATE SET {set_clause}
    WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_values})
    """

    self.log.debug(f"MERGE SQL:\n{merge_sql.strip()}")
    result = self._spark.sql(merge_sql)

    try:
        metrics  = result.collect()[0].asDict()
        inserted = metrics.get("num_inserted_rows", "?")
        updated  = metrics.get("num_updated_rows", "?")
        self.log.success(
            f"✔ UPSERT completado | tabla='{self._table_name}' | "
            f"insertados={inserted} | actualizados={updated}"
        )
    except Exception:
        self.log.success(f"✔ UPSERT completado | tabla='{self._table_name}'")

Functions