Arquitectura
Visión general
DKOps implementa la arquitectura Medallion (Landing → Bronze → Silver → Gold) con un motor de orquestación declarativo que separa la configuración del comportamiento.
flowchart TB
subgraph src["Fuentes externas"]
F1["JSON / CSV\nParquet / Avro"]
F2["Kafka / Event Hub\nStreaming"]
end
subgraph landing["☁ Landing Zone"]
L["Archivos crudos\nDepositados por Data Factory\nKafka Connector, FTP, etc."]
end
subgraph bronze["🥉 Bronze — Raw + metadata"]
B1["ventas_raw\n_ingested_at · _ingested_date\n_source_file"]
B2["clientes_raw\nop_type: I/U/D"]
B3["eventos_raw\nStreaming → Batch"]
end
subgraph silver["🥈 Silver — Estado actual"]
S1["ventas_current\ncdc_merge"]
S2["clientes_current\nfull_merge"]
S3["eventos_current\nappend_dedup"]
end
subgraph gold["🥇 Gold — KPIs y agregados"]
G1["revenue_diario"]
G2["engagement_clientes"]
G3["alertas_criticas"]
end
F1 --> L
F2 --> L
L -->|"ingest_bronze()\nPartition overwrite idempotente"| B1
L -->|"run_streaming()\navailableNow"| B3
L --> B2
B1 -->|"cdc_merge"| S1
B2 -->|"full_merge"| S2
B3 -->|"append_dedup"| S3
S1 & S2 & S3 -->|"TableWriter SQL"| G1 & G2 & G3
Módulos del framework
Módulo 1: ingestion (Landing → Silver)
flowchart LR
subgraph contracts["Contratos JSON"]
BC["ingestion/batch/\n*.json"]
SC["ingestion/streaming/\n*.json"]
SV["ingestion/silver/\n*.json"]
end
subgraph engine["IngestionEngine"]
IE["from_spark()"]
BI["BronzeIngestor\ningest_bronze()"]
SS["run_streaming()\navailableNow"]
SP["SilverPromoter\npromote_silver()"]
end
subgraph strategies["Estrategias Silver"]
FM["full_merge\nMERGE INTO\nSCD1 completo"]
CM["cdc_merge\nI/U/D + soft delete\nis_deleted"]
IR["incremental_replace\nReemplaza por watermark"]
AD["append_dedup\nAnti-join append"]
end
BC --> BI
SC --> SS
SV --> SP
IE --> BI & SS & SP
SP --> FM & CM & IR & AD
BronzeIngestor — Landing → Bronze
- Lee archivos del directorio
source.path del contrato
- Añade metadatos:
_ingested_at, _ingested_date, _source_file
- Escribe con partition overwrite por
_ingested_date → idempotente
- Lee Bronze completo (o filtrado)
- Deduplica por
merge_keys según watermark_col
- Aplica la estrategia declarada en el contrato
- Filtra columnas con
_select_for_silver() — Bronze metadata no pasa a Silver
- Añade
_silver_modified_at si el contrato lo pide
Módulo 2: table_governance (Silver → Gold)
table_governance/
├── contracts/
│ ├── loader.py # JSON → TableContract (frozen dataclass)
│ └── validator.py # SchemaValidator — tipos y nulabilidad
├── writers/
│ ├── table_writer.py # ★ Fachada pública
│ ├── base_writer.py # Bridge local ↔ Databricks + merge_schema + masks
│ ├── create_writer.py # CREATE OR REPLACE TABLE + SET MASK
│ ├── append_writer.py # INSERT INTO (soporta mergeSchema)
│ ├── upsert_writer.py # MERGE INTO (SCD1)
│ ├── partition_writer.py# overwrite_partition (soporta mergeSchema)
│ └── delete_writer.py # DELETE WHERE
└── migrations/
└── safe_migrator.py # Compara contrato vs estado real → ALTER TABLE
Flujo de una escritura
sequenceDiagram
participant P as Pipeline
participant TW as TableWriter
participant SV as SchemaValidator
participant BW as BaseWriter
participant D as Delta Lake
P->>TW: overwrite(df)
TW->>SV: validate(df, contract)
SV-->>TW: OK / ValidationError
TW->>BW: _write_df(df, "overwrite")
BW->>D: DataFrameWriter.format("delta").mode("overwrite")
D-->>BW: commit
BW->>D: ALTER COLUMN COMMENT
BW->>D: SET TBLPROPERTIES
BW->>D: SET MASK (solo Databricks)
D-->>P: done
sequenceDiagram
participant SP as SilverPromoter
participant CM as CdcMergeStrategy
participant B as Bronze (Delta)
participant S as Silver (Delta)
SP->>CM: execute()
CM->>B: read() — lee todos los eventos
CM->>CM: filter op_type IN (I, U) → upserts
CM->>CM: filter op_type = D → deletes
CM->>CM: withColumn("is_deleted", False)
CM->>CM: withColumn("_silver_modified_at", now())
CM->>CM: _select_for_silver() — excluye _ingested_at, _source_file
CM->>S: MERGE INTO (upserts)
CM->>CM: withColumn("is_deleted", True) — soft delete
CM->>S: MERGE INTO (soft deletes)
S-->>SP: filas en Silver
Runtime-agnóstico: local ↔ Databricks
flowchart LR
CF["config.json\nEXECUTION_ENVIRONMENT"] --> L["Launcher"]
L --> E{{"is_databricks?"}}
E -->|"false"| LC["SparkSession local\ncatalog = schema.name\npaths = /tmp/..."]
E -->|"true"| DC["Databricks Connect\ncatalog = catalog.schema.name\npaths = abfss://..."]
LC & DC --> TW["TableWriter / IngestionEngine\n(mismo código)"]
El Launcher se instancia una vez como singleton del proceso. Todos los writers, readers, ingestors y el SafeMigrator obtienen spark y env internamente vía Launcher.current().
Descripción de componentes
Core
| Módulo |
Responsabilidad |
Launcher |
Punto de entrada único. Detecta runtime, crea SparkSession, se registra como singleton. |
EnvironmentConfig |
Resuelve placeholders {catalog.bronze}, {path.silver} según el ambiente activo. |
LoggerConfig |
Logging estructurado con loguru. Mixin LoggableMixin inyecta self.log en cualquier clase. |
Ingestion
| Módulo |
Responsabilidad |
IngestionEngine |
Orquestador. Factory from_spark(). Métodos ingest_bronze(), run_streaming(), promote_silver(), status(). |
BronzeIngestor |
Lee archivos Landing, añade metadata, escribe Bronze con partition overwrite. |
SilverPromoter |
Aplica la estrategia declarada en el contrato Silver. |
FileReader |
Lectura batch de archivos (JSON, CSV, Parquet, Delta). |
FileStreamReader |
Lectura streaming — infiere schema desde archivos existentes si no se declara uno. |
FullMergeStrategy |
MERGE INTO Silver con dedup por watermark. SCD Type 1. |
CdcMergeStrategy |
Aplica I/U/D desde Bronze. Soft delete via is_deleted. |
IncrementalReplaceStrategy |
Filtra la partición más reciente del Bronze y hace upsert. |
AppendDedupStrategy |
Anti-join: solo inserta registros que no existen en Silver. |
Table Governance
| Módulo |
Responsabilidad |
TableContract |
Dataclass inmutable (frozen). Representa el estado deseado de una tabla. |
SchemaValidator |
Compara tipos Spark del DataFrame contra el contrato. Soporta widening. |
TableWriter |
Fachada pública: overwrite, append, upsert, overwrite_partition, delete. |
TableReader |
Lectura gobernada: read(), read_partition(), read_stream(), read_cdf(). |
SafeMigrator |
Compara contrato vs. tabla real. Genera plan ALTER TABLE sin pérdida de datos. |