Bases: LoggableMixin
DELETE FROM tabla WHERE condición.
No hereda BaseWriter porque DELETE no recibe un DataFrame —
opera directamente sobre la tabla con SQL.
Cuándo usar
- GDPR / derecho al olvido.
- Corrección de datos erróneos por ID o rango de fechas.
Notas
Spark y EnvironmentConfig se resuelven automáticamente desde el
Launcher activo (Launcher.current()).
Source code in src/DKOps/table_governance/writers/delete_writer.py
| class DeleteWriter(LoggableMixin):
"""
DELETE FROM tabla WHERE condición.
No hereda BaseWriter porque DELETE no recibe un DataFrame —
opera directamente sobre la tabla con SQL.
Cuándo usar
-----------
- GDPR / derecho al olvido.
- Corrección de datos erróneos por ID o rango de fechas.
Notas
-----
Spark y EnvironmentConfig se resuelven automáticamente desde el
Launcher activo (Launcher.current()).
"""
def __init__(
self,
contract: TableContract,
dry_run: bool = False,
) -> None:
launcher = Launcher.current()
self._spark = launcher.spark
self._env = launcher.env
self._contract = contract
self._dry_run = dry_run
self._table_name = (
contract.full_name
if self._env._is_databricks
else f"{contract.schema}.{contract.name}"
)
@log_operation("delete")
def delete(self, condition: str, preview: bool = False) -> int:
"""
Parámetros
----------
condition : expresión SQL WHERE (sin la palabra WHERE).
Ej: "fecha < '2023-01-01'"
preview : si True, muestra las filas a eliminar antes de borrar.
Devuelve
--------
Número de filas afectadas.
"""
if not condition or not condition.strip():
raise ValueError(
"DeleteWriter requiere una condición SQL no vacía.\n"
"Para borrar TODA la tabla usa CreateWriter con un DF vacío."
)
self.log.info(
f"DELETE preparado | tabla='{self._table_name}' | "
f"condición='{condition}'"
)
affected = 0
if preview:
preview_df = self._spark.sql(
f"SELECT * FROM {self._table_name} WHERE {condition}"
)
affected = preview_df.count()
self.log.warning(
"delete_preview",
f"Filas a eliminar: {affected:,} | condición='{condition}'",
)
preview_df.show(20, truncate=False)
if self._dry_run:
self.log.warning(
"delete",
f"dry_run=True → DELETE simulado, nada eliminado de '{self._table_name}'",
)
return affected
result = self._spark.sql(
f"DELETE FROM {self._table_name} WHERE {condition}"
)
try:
metrics = result.collect()[0].asDict()
affected = int(metrics.get("num_deleted_rows", affected))
except Exception:
pass
self.log.success(
f"✔ DELETE completado | tabla='{self._table_name}' | "
f"filas_eliminadas={affected:,} | condición='{condition}'"
)
return affected
|
Functions
__init__(contract, dry_run=False)
Source code in src/DKOps/table_governance/writers/delete_writer.py
| def __init__(
self,
contract: TableContract,
dry_run: bool = False,
) -> None:
launcher = Launcher.current()
self._spark = launcher.spark
self._env = launcher.env
self._contract = contract
self._dry_run = dry_run
self._table_name = (
contract.full_name
if self._env._is_databricks
else f"{contract.schema}.{contract.name}"
)
|
delete(condition, preview=False)
Parámetros
condition : expresión SQL WHERE (sin la palabra WHERE).
Ej: "fecha < '2023-01-01'"
preview : si True, muestra las filas a eliminar antes de borrar.
Devuelve
Número de filas afectadas.
Source code in src/DKOps/table_governance/writers/delete_writer.py
| @log_operation("delete")
def delete(self, condition: str, preview: bool = False) -> int:
"""
Parámetros
----------
condition : expresión SQL WHERE (sin la palabra WHERE).
Ej: "fecha < '2023-01-01'"
preview : si True, muestra las filas a eliminar antes de borrar.
Devuelve
--------
Número de filas afectadas.
"""
if not condition or not condition.strip():
raise ValueError(
"DeleteWriter requiere una condición SQL no vacía.\n"
"Para borrar TODA la tabla usa CreateWriter con un DF vacío."
)
self.log.info(
f"DELETE preparado | tabla='{self._table_name}' | "
f"condición='{condition}'"
)
affected = 0
if preview:
preview_df = self._spark.sql(
f"SELECT * FROM {self._table_name} WHERE {condition}"
)
affected = preview_df.count()
self.log.warning(
"delete_preview",
f"Filas a eliminar: {affected:,} | condición='{condition}'",
)
preview_df.show(20, truncate=False)
if self._dry_run:
self.log.warning(
"delete",
f"dry_run=True → DELETE simulado, nada eliminado de '{self._table_name}'",
)
return affected
result = self._spark.sql(
f"DELETE FROM {self._table_name} WHERE {condition}"
)
try:
metrics = result.collect()[0].asDict()
affected = int(metrics.get("num_deleted_rows", affected))
except Exception:
pass
self.log.success(
f"✔ DELETE completado | tabla='{self._table_name}' | "
f"filas_eliminadas={affected:,} | condición='{condition}'"
)
return affected
|