Quickstart¶
1. Instalación¶
# Clonar el repositorio
git clone https://github.com/brrsanchezfi/DKOps
cd DKOps
# Desarrollo local (incluye PySpark + Delta)
pip install -e ".[local]"
# Para Databricks Connect
pip install -e ".[databricks-connect]"
2. config.json¶
El config.json define la configuración de infraestructura: entorno, rutas, catálogos y logging. Vive en config/config.json de cada proyecto (excluido de git).
{
"EXECUTION_ENVIRONMENT": "local",
"SPARK_APP_NAME": "MiPipeline",
"SPARK_WAREHOUSE_DIR": "/tmp/mi-pipeline/warehouse",
"DELTA_VERSION": "3.2.0",
"LOG_LEVEL": "INFO",
"LOG_DIR": "/tmp/logs",
"LOG_ROTATION": "10 MB",
"LOG_RETENTION": "7 days",
"environments": {
"local": {
"env": "local",
"env_short": "l",
"catalogs": {
"bronze": "bronze",
"silver": "silver",
"gold": "gold"
},
"paths": {
"landing": "/tmp/mi-pipeline/landing",
"bronze": "/tmp/mi-pipeline/bronze",
"silver": "/tmp/mi-pipeline/silver",
"gold": "/tmp/mi-pipeline/gold",
"checkpoint": "/tmp/mi-pipeline/checkpoints",
"ops": "/tmp/mi-pipeline/ops"
}
}
}
}
Los placeholders {catalog.bronze}, {path.landing}, {env} se resuelven en todos los contratos JSON al cargarlos.
3. Pipeline completo — Landing → Bronze → Silver → Gold¶
Estructura de archivos recomendada¶
mi-pipeline/
├── config/
│ └── config.json
├── ingestion/
│ ├── batch/ ← contratos Landing → Bronze (batch)
│ │ └── ventas.json
│ ├── streaming/ ← contratos Landing → Bronze (streaming)
│ │ └── eventos.json
│ └── silver/ ← contratos Bronze → Silver
│ └── ventas_current.json
└── tables/
├── bronze/
│ └── ventas_raw.json
├── silver/
│ └── ventas_current.json
└── gold/
└── kpis_ventas.json
pipeline.py¶
from DKOps.launcher import Launcher
from DKOps.ingestion.engine import IngestionEngine
from DKOps.table_governance import load_contract, TableWriter
# 1. Inicializar — detecta runtime automáticamente (local / Databricks)
launcher = Launcher("config/config.json")
# 2. Motor de ingesta
engine = IngestionEngine.from_spark(
spark = launcher.spark,
env = launcher.env,
bronze_contracts_dir = "ingestion/batch",
streaming_contracts_dir = "ingestion/streaming",
silver_contracts_dir = "ingestion/silver",
tables_base_dir = ".",
ops_path = "/tmp/mi-pipeline/ops/control",
)
# 3. Landing → Bronze (batch)
engine.ingest_bronze()
# 4. Landing → Bronze (streaming, availableNow)
engine.run_streaming()
# 5. Bronze → Silver (estrategias declarativas)
engine.promote_silver()
# 6. Silver → Gold (SQL + TableWriter)
ct_gold = load_contract("tables/gold/kpis_ventas.json")
ct_silver = load_contract("tables/silver/ventas_current.json")
df_kpis = launcher.spark.sql(f"""
SELECT canal, COUNT(*) AS total_ventas, SUM(precio_total) AS revenue
FROM {ct_silver.effective_name}
WHERE is_deleted IS NULL OR NOT is_deleted
GROUP BY canal
""")
TableWriter(ct_gold).overwrite(df_kpis)
# 7. Estado y control operativo
engine.status()
4. Solo table_governance (Silver → Gold)¶
Si ya tienes datos en Silver y solo necesitas el módulo de gobierno:
from DKOps.launcher import Launcher
from DKOps.table_governance import load_contract, TableWriter, TableReader
launcher = Launcher("config/config.json")
contract = load_contract("tables/silver/ventas_current.json")
# Escritura
TableWriter(contract).overwrite(df)
TableWriter(contract).upsert(df_delta, keys=["venta_id"])
TableWriter(contract).append(df_nuevos)
TableWriter(contract).overwrite_partition(df, {"canal": "web"})
TableWriter(contract).delete("is_deleted = true")
# Lectura
reader = TableReader(contract)
df = reader.read()
df = reader.read(filter="estado = 'activo'", columns=["venta_id", "precio_total"])
df = reader.read_partition({"canal": "web"})
df = reader.read_cdf(starting_version=1) # Change Data Feed
5. SafeMigrator — evolución de schema sin pérdida de datos¶
from DKOps.table_governance import load_contract, SafeMigrator
contract = load_contract("tables/silver/ventas_current.json")
# Planificar (dry_run — no ejecuta nada)
SafeMigrator(contract, dry_run=True).apply()
# Aplicar cambios
SafeMigrator(contract, dry_run=False).apply()
El SafeMigrator compara el contrato JSON contra el estado real de la tabla Delta y genera el plan de ALTER TABLE mínimo: añade columnas nuevas, actualiza comentarios y propiedades. Nunca elimina columnas.
6. Ejecutar los demos¶
Cada demo es un pipeline completo y autocontenido:
# Demo 1 — Aeronáutica: escritores + SafeMigrator
python demos/demo_1/pipeline.py
# Demo 2 — Manufactura: DQ declarativo + transformaciones testeables
python demos/demo_2/pipeline.py
# Demo 3 — E-commerce: full_merge + cdc_merge + append_dedup + streaming
python demos/demo_3/pipeline.py
# Demo 4 — Retail/Inventario: read_cdf() + read_stream() + SafeMigrator
python demos/demo_4/pipeline.py
# Demo 5 — Marketplace: cdc_merge + full_merge + Gold revenue/engagement
python demos/demo_5/pipeline.py
El primer arranque descarga los JARs de Delta (~30 s). Las siguientes ejecuciones son inmediatas.