Saltar a contenido

Create Writer

DKOps.table_governance.writers.create_writer

create_writer.py

Escritura full load — CREATE OR REPLACE TABLE. Idempotente en ambos runtimes.

Databricks → CREATE OR REPLACE TABLE DDL + saveAsTable + ALTER TABLE para restaurar comentarios y owner (saveAsTable sobrescribe la tabla borrando metadata del DDL) Local PC → DROP + recrear path Delta + registrar en catálogo

Classes

CreateWriter

Bases: BaseWriter

Carga full (CREATE OR REPLACE TABLE).

Uso
CreateWriter(contract).write(df)
Source code in src/DKOps/table_governance/writers/create_writer.py
class CreateWriter(BaseWriter):
    """
    Carga full (CREATE OR REPLACE TABLE).

    Uso
    ---
        CreateWriter(contract).write(df)
    """

    @log_operation("create_or_replace")
    def write(self, df: DataFrame, **kwargs) -> None:
        self.log.info(f"Iniciando CREATE OR REPLACE | tabla='{self._table_name}'")

        self._validate(df)
        df = self._apply_defaults(df)
        df = self._reorder_columns(df)

        if self._dry_run:
            self._log_dry_run("create_or_replace")
            return

        if self._env._is_databricks:
            ddl = self._build_create_ddl(or_replace=True)
            self.log.debug(f"DDL:\n{ddl}")
            self._spark.sql(ddl)
        else:
            self._drop_local_table_if_exists()

        row_count = df.count()
        self._write_df(df, mode="overwrite", overwrite_schema=True)

        # Post-escritura — saveAsTable borra la metadata del DDL previo.
        # Hay que reaplicar comentarios, masks, owner y permisos después de escribir.
        self._apply_table_comment()
        self._apply_column_comments()
        self._apply_column_masks()

        if self._env._is_databricks and self._contract.owner:
            try:
                self._spark.sql(
                    f"ALTER TABLE {self._table_name} "
                    f"SET OWNER TO `{self._contract.owner}`"
                )
            except Exception as exc:
                self.log.warning("set_owner", f"No se pudo asignar owner: {exc}")

        self._apply_permissions()
        self.log_write_ok(
            "create_or_replace",
            rows=row_count,
            target=self._table_name,
            mode="overwrite",
        )

    def _drop_local_table_if_exists(self) -> None:
        """En local PC elimina el registro del catálogo y los datos del path."""
        try:
            self._spark.sql(
                f"DROP TABLE IF EXISTS "
                f"`{self._contract.schema}`.`{self._contract.name}`"
            )
        except Exception:
            pass

        try:
            import shutil, os
            if self._table_path and os.path.exists(self._table_path):
                shutil.rmtree(self._table_path)
                self.log.debug(f"Path anterior eliminado: {self._table_path}")
        except Exception as exc:
            self.log.debug(f"No se pudo limpiar path anterior: {exc}")
Functions
write(df, **kwargs)
Source code in src/DKOps/table_governance/writers/create_writer.py
@log_operation("create_or_replace")
def write(self, df: DataFrame, **kwargs) -> None:
    self.log.info(f"Iniciando CREATE OR REPLACE | tabla='{self._table_name}'")

    self._validate(df)
    df = self._apply_defaults(df)
    df = self._reorder_columns(df)

    if self._dry_run:
        self._log_dry_run("create_or_replace")
        return

    if self._env._is_databricks:
        ddl = self._build_create_ddl(or_replace=True)
        self.log.debug(f"DDL:\n{ddl}")
        self._spark.sql(ddl)
    else:
        self._drop_local_table_if_exists()

    row_count = df.count()
    self._write_df(df, mode="overwrite", overwrite_schema=True)

    # Post-escritura — saveAsTable borra la metadata del DDL previo.
    # Hay que reaplicar comentarios, masks, owner y permisos después de escribir.
    self._apply_table_comment()
    self._apply_column_comments()
    self._apply_column_masks()

    if self._env._is_databricks and self._contract.owner:
        try:
            self._spark.sql(
                f"ALTER TABLE {self._table_name} "
                f"SET OWNER TO `{self._contract.owner}`"
            )
        except Exception as exc:
            self.log.warning("set_owner", f"No se pudo asignar owner: {exc}")

    self._apply_permissions()
    self.log_write_ok(
        "create_or_replace",
        rows=row_count,
        target=self._table_name,
        mode="overwrite",
    )

Functions