Saltar a contenido

Partition Writer

DKOps.table_governance.writers.partition_writer

partition_writer.py

Reemplaza una partición específica sin tocar el resto de la tabla.

Databricks → dynamic partition overwrite + saveAsTable Local PC → dynamic partition overwrite + save(path) + refresh catálogo

Classes

PartitionWriter

Bases: BaseWriter

Overwrite de partición específica — idempotente en esa partición.

Uso
PartitionWriter(contract).write(df, partition={"fecha": "2024-01-15"})
Source code in src/DKOps/table_governance/writers/partition_writer.py
class PartitionWriter(BaseWriter):
    """
    Overwrite de partición específica — idempotente en esa partición.

    Uso
    ---
        PartitionWriter(contract).write(df, partition={"fecha": "2024-01-15"})
    """

    @log_operation("overwrite_partition")
    def write(
        self,
        df:        DataFrame,
        partition: dict[str, str] | None = None,
        **kwargs,
    ) -> None:
        if not partition:
            raise ValueError(
                "PartitionWriter requiere 'partition'. "
                "Ejemplo: writer.write(df, partition={'fecha': '2024-01-15'})"
            )

        declared = set(self._contract.partition_columns)
        for col in partition:
            if col not in declared:
                raise ValueError(
                    f"Columna '{col}' no es columna de partición en "
                    f"'{self._table_name}'.\n"
                    f"Particiones declaradas: {sorted(declared)}"
                )

        self.log.info(
            f"Iniciando OVERWRITE PARTITION | tabla='{self._table_name}' | "
            f"partición={partition}"
        )

        self._validate(df)
        df = self._apply_defaults(df)
        df = self._reorder_columns(df)

        if self._dry_run:
            self._log_dry_run("overwrite_partition")
            return

        # Dynamic partition overwrite — solo reemplaza las particiones del DF
        self._spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

        row_count = df.count()
        self._write_df(df, mode="overwrite")

        self._spark.conf.set("spark.sql.sources.partitionOverwriteMode", "static")

        self.log_write_ok(
            "overwrite_partition",
            rows=row_count,
            target=self._table_name,
            mode=f"overwrite partition={partition}",
        )
Functions
write(df, partition=None, **kwargs)
Source code in src/DKOps/table_governance/writers/partition_writer.py
@log_operation("overwrite_partition")
def write(
    self,
    df:        DataFrame,
    partition: dict[str, str] | None = None,
    **kwargs,
) -> None:
    if not partition:
        raise ValueError(
            "PartitionWriter requiere 'partition'. "
            "Ejemplo: writer.write(df, partition={'fecha': '2024-01-15'})"
        )

    declared = set(self._contract.partition_columns)
    for col in partition:
        if col not in declared:
            raise ValueError(
                f"Columna '{col}' no es columna de partición en "
                f"'{self._table_name}'.\n"
                f"Particiones declaradas: {sorted(declared)}"
            )

    self.log.info(
        f"Iniciando OVERWRITE PARTITION | tabla='{self._table_name}' | "
        f"partición={partition}"
    )

    self._validate(df)
    df = self._apply_defaults(df)
    df = self._reorder_columns(df)

    if self._dry_run:
        self._log_dry_run("overwrite_partition")
        return

    # Dynamic partition overwrite — solo reemplaza las particiones del DF
    self._spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

    row_count = df.count()
    self._write_df(df, mode="overwrite")

    self._spark.conf.set("spark.sql.sources.partitionOverwriteMode", "static")

    self.log_write_ok(
        "overwrite_partition",
        rows=row_count,
        target=self._table_name,
        mode=f"overwrite partition={partition}",
    )

Functions