class BaseWriter(LoggableMixin, ABC):
"""
Base para todos los writers del módulo table_governance.
Parámetros
----------
contract : TableContract cargado por ContractLoader.
strict_columns : si True, columnas extra en el DF generan WARNING.
fail_on_warning : si True, WARNINGs también bloquean la escritura.
dry_run : si True, valida pero no escribe nada.
Notas
-----
Spark y EnvironmentConfig se resuelven automáticamente desde el
Launcher activo (Launcher.current()).
"""
def __init__(
self,
contract: TableContract,
strict_columns: bool = True,
fail_on_warning: bool = False,
dry_run: bool = False,
) -> None:
launcher = Launcher.current()
self._spark = launcher.spark
self._env = launcher.env
self._contract = contract
self._strict_columns = strict_columns
self._fail_on_warning = fail_on_warning
self._dry_run = dry_run
self._validator = SchemaValidator(contract, strict_columns)
# Nombre efectivo según runtime
self._table_name = (
contract.full_name
if self._env._is_databricks
else f"{contract.schema}.{contract.name}"
)
# Path físico en warehouse — solo usado en local PC
self._table_path = self._resolve_table_path()
self.log.debug(
f"Writer listo | tabla='{self._table_name}' | "
f"runtime={'databricks' if self._env._is_databricks else 'local-pc'} | "
f"dry_run={dry_run}"
)
# ── API pública ───────────────────────────────────────────────────────
@abstractmethod
def write(self, df: DataFrame, **kwargs) -> None:
...
# ── Escritura abstracta — el corazón del bridge local/databricks ──────
def _write_df(
self,
df: DataFrame,
mode: str,
overwrite_schema: bool = False,
) -> None:
"""
Escribe el DataFrame según el runtime.
Las particiones se pasan SIEMPRE al DataFrameWriter via .partitionBy()
— no depender del DDL previo porque saveAsTable lo ignora al hacer
overwrite y recrea la tabla sin particiones.
Databricks → df.write.partitionBy(...).saveAsTable(full_name)
Local PC → df.write.partitionBy(...).save(path) + registro catálogo
"""
writer = (
df.write
.format(self._contract.format.lower())
.mode(mode)
)
if overwrite_schema:
writer = writer.option("overwriteSchema", "true")
elif self._contract.merge_schema:
writer = writer.option("mergeSchema", "true")
# Particiones — crítico pasarlas aquí, no solo en el DDL
if self._contract.partition_columns:
writer = writer.partitionBy(*self._contract.partition_columns)
self.log.debug(
f"Particionando por: {self._contract.partition_columns}"
)
if self._env._is_databricks:
writer.saveAsTable(self._table_name)
else:
writer.save(self._table_path)
self._register_local_table()
def _register_local_table(self) -> None:
"""
Registra (o refresca) la tabla en el catálogo local de Spark.
Crea el schema si no existe. Usa LOCATION para apuntar al path Delta.
"""
schema = self._contract.schema
name = self._contract.name
self._spark.sql(f"CREATE DATABASE IF NOT EXISTS `{schema}`")
# Verificar si la tabla ya está registrada
try:
self._spark.sql(f"DESCRIBE TABLE `{schema}`.`{name}`")
# Ya existe — refrescar metadatos
self._spark.sql(f"REFRESH TABLE `{schema}`.`{name}`")
except Exception:
# No existe — registrarla apuntando al path
self._spark.sql(
f"CREATE TABLE `{schema}`.`{name}` "
f"USING DELTA "
f"LOCATION '{self._table_path}'"
)
def _resolve_table_path(self) -> str:
"""
Resuelve el path físico de la tabla en el warehouse local.
Solo relevante en local PC — en Databricks Unity Catalog gestiona el path.
"""
if self._env._is_databricks:
return "" # no aplica
warehouse_dir = self._spark.conf.get(
"spark.sql.warehouse.dir", "/tmp/spark-warehouse"
)
return os.path.join(
warehouse_dir,
self._contract.schema,
self._contract.name,
)
# ── Validación ────────────────────────────────────────────────────────
def _validate(self, df: DataFrame) -> ValidationResult:
result = self._validator.validate(df)
result.raise_if_critical()
if self._fail_on_warning and result.warnings:
lines = "\n ".join(str(e) for e in result.warnings)
raise ValueError(
f"Escritura cancelada por warnings en '{self._table_name}' "
f"(fail_on_warning=True):\n {lines}"
)
return result
# ── Columnas con default ──────────────────────────────────────────────
def _apply_defaults(self, df: DataFrame) -> DataFrame:
df_col_names = set(df.columns)
for col_def in self._contract.default_columns:
if col_def.name not in df_col_names:
self.log.debug(
f"Añadiendo columna con default | "
f"col='{col_def.name}' | expr='{col_def.default}'"
)
df = df.withColumn(col_def.name, F.expr(col_def.default))
return df
# ── Reordenar columnas ────────────────────────────────────────────────
def _reorder_columns(self, df: DataFrame) -> DataFrame:
contract_cols = self._contract.column_names
df_cols = set(df.columns)
ordered = [c for c in contract_cols if c in df_cols]
extra = [c for c in df.columns if c not in set(contract_cols)]
return df.select(*ordered, *extra)
# ── Comentarios ───────────────────────────────────────────────────────
def _apply_column_comments(self) -> None:
"""
Aplica los comentarios de columna definidos en el contrato
via ALTER TABLE ALTER COLUMN SET COMMENT.
Necesario porque saveAsTable sobrescribe la tabla ignorando
el DDL previo — los comentarios del CREATE TABLE se pierden.
Solo aplica columnas que tengan comentario definido en el contrato.
"""
cols_con_comment = [
col for col in self._contract.columns if col.comment
]
if not cols_con_comment:
return
self.log.debug(
f"Aplicando comentarios a {len(cols_con_comment)} columnas "
f"en '{self._table_name}'"
)
for col in cols_con_comment:
sql = (
f"ALTER TABLE {self._table_name} "
f"ALTER COLUMN `{col.name}` "
f"COMMENT '{col.comment.replace(chr(39), chr(39)*2)}'"
)
try:
self._spark.sql(sql)
except Exception as exc:
self.log.warning(
"apply_column_comments",
f"No se pudo aplicar comentario en '{col.name}': {exc}",
)
def _apply_column_masks(self) -> None:
"""
Aplica políticas de enmascaramiento de columna via ALTER TABLE.
Solo aplica en Databricks (Unity Catalog). Se ignora en local PC.
"""
cols_with_mask = self._contract.masked_columns
if not cols_with_mask:
return
if not self._env._is_databricks:
self.log.debug("Column masks omitidas — solo aplican en Databricks Unity Catalog")
return
if self._dry_run:
self.log.info("dry_run=True → column masks no aplicadas")
return
self.log.debug(
f"Aplicando masks a {len(cols_with_mask)} columnas "
f"en '{self._table_name}'"
)
for col in cols_with_mask:
sql = (
f"ALTER TABLE {self._table_name} "
f"ALTER COLUMN `{col.name}` "
f"SET MASK {col.mask}"
)
self.log.debug(f"Mask SQL: {sql}")
try:
self._spark.sql(sql)
except Exception as exc:
self.log.warning(
"apply_column_masks",
f"No se pudo aplicar mask en '{col.name}': {exc}",
)
def _apply_table_comment(self) -> None:
"""
Aplica el comentario de tabla via ALTER TABLE SET TBLPROPERTIES.
Mismo problema que los comentarios de columna — saveAsTable los borra.
"""
if not self._contract.comment:
return
sql = (
f"COMMENT ON TABLE {self._table_name} "
f"IS '{self._contract.comment.replace(chr(39), chr(39)*2)}'"
)
try:
self._spark.sql(sql)
except Exception as exc:
self.log.warning(
"apply_table_comment",
f"No se pudo aplicar comentario de tabla: {exc}",
)
# ── Permisos ──────────────────────────────────────────────────────────
def _apply_permissions(self) -> None:
if not self._contract.permissions:
return
if not self._env._is_databricks:
self.log.debug("Permisos omitidos — solo aplican en Databricks")
return
if self._dry_run:
self.log.info("dry_run=True → permisos no aplicados")
return
for perm in self._contract.permissions:
sql = (
f"{perm.operation} {perm.action} "
f"ON TABLE {self._table_name} "
f"TO `{perm.principal}`"
)
self.log.debug(f"Aplicando permiso: {sql}")
try:
self._spark.sql(sql)
except Exception as exc:
self.log.warning("apply_permissions", f"Error: {sql} | {exc}")
# ── DDL helpers ───────────────────────────────────────────────────────
def _table_exists(self) -> bool:
try:
self._spark.sql(f"DESCRIBE TABLE {self._table_name}")
return True
except Exception:
return False
def _build_create_ddl(self, or_replace: bool = False) -> str:
"""
DDL de CREATE TABLE.
En local PC omite LOCATION (se gestiona vía _register_local_table).
En Databricks añade LOCATION si la tabla es EXTERNAL.
"""
c = self._contract
replace = "OR REPLACE " if or_replace else ""
col_defs = []
for col in c.columns:
parts = [f"`{col.name}` {col.type}"]
if not col.nullable:
parts.append("NOT NULL")
if col.comment:
parts.append(f"COMMENT '{col.comment}'")
col_defs.append(" ".join(parts))
cols_sql = ",\n ".join(col_defs)
ddl = f"CREATE {replace}TABLE {self._table_name} (\n {cols_sql}\n)"
ddl += f"\nUSING {c.format}"
if c.comment:
ddl += f"\nCOMMENT '{c.comment}'"
if c.partition_columns:
parts = ", ".join(f"`{p}`" for p in c.partition_columns)
ddl += f"\nPARTITIONED BY ({parts})"
if c.clustering and c.clustering.columns:
cols = ", ".join(f"`{col}`" for col in c.clustering.columns)
ddl += f"\nCLUSTER BY ({cols})"
effective_props: dict[str, str] = dict(c.properties)
if c.change_data_feed:
effective_props["delta.enableChangeDataFeed"] = "true"
if effective_props:
props = ", ".join(f"'{k}' = '{v}'" for k, v in effective_props.items())
ddl += f"\nTBLPROPERTIES ({props})"
if self._env._is_databricks and c.is_external() and c.location:
ddl += f"\nLOCATION '{c.location}'"
return ddl
# ── dry_run guard ─────────────────────────────────────────────────────
def _log_dry_run(self, operation: str) -> None:
self.log.warning(
operation,
f"dry_run=True → simulado, nada escrito en '{self._table_name}'",
)