Governança (Governance)¶
O aptdata possui uma camada nativa (first-class) de governança de dados que atua de forma transversal sobre a execução dos seus pipelines, garantindo rastreabilidade e conformidade.
A governança é dividida em quatro pilares principais:
-
Data Lineage
Grafo de proveniência rastreando cada leitura, transformação e escrita (
LineageGraph). -
Data Quality & Contracts
Contratos estáticos de schema e suite de expectativas (
SchemaContract). -
Business Rules Registry
Catálogo versionado de regras de negócio com trilha de auditoria (
RuleRegistry). -
Dataset Catalog
Repositório pesquisável de metadados e classificação de dados sensíveis (
DatasetCatalog).
graph LR
%% Estilos Customizados (Design Premium)
classDef default fill:#0b132b,stroke:#ff6a00,stroke-width:1px,color:#fff,rx:8px,ry:8px;
L["📈 Data Lineage\nLineageGraph + LineageStore"]
Q["✅ Schema Contracts\nSchemaContract + Expectations"]
R["📋 Business Rules\nRuleRegistry + AuditLog"]
C["🗂 Dataset Catalog\nDatasetCatalog + CatalogEntry"]
D["🔒 Classification\nColumnClassification + Policy"]
L --- C
Q --- C
R --- C
D --- Q
Data Lineage (Linhagem de Dados)¶
Visão Geral¶
O subsistema de linhagem reside em aptdata.core.lineage. Toda execução de workflow produz um LineageGraph contendo uma lista ordenada e imutável de objetos LineageNode.
flowchart LR
%% Estilos Customizados (Design Premium)
classDef default fill:#0b132b,stroke:#ff6a00,stroke-width:1px,color:#fff,rx:8px,ry:8px;
R["📥 READ\ns3://raw/orders.parquet\n50.000 rows"]
T["⚙️ TRANSFORM\ns3://clean/orders.parquet\n48.500 rows\nrevenue = price × quantity"]
W["📤 WRITE\ns3://final/orders.parquet"]
R --> T --> W
from aptdata.core.lineage import (
ColumnLineage,
LineageEventType,
LineageGraph,
LineageNode,
)
# Inicializa o grafo para uma execução específica
graph = LineageGraph(run_id="run-20240101", workflow_name="etl_pipeline")
read_node = LineageNode(
dataset_uri="s3://raw/orders.parquet",
event_type=LineageEventType.READ,
workflow_name="etl_pipeline",
rows_out=50_000,
)
graph.add_node(read_node)
transform_node = LineageNode(
dataset_uri="s3://clean/orders.parquet",
event_type=LineageEventType.TRANSFORM,
engine="pandas",
rows_in=50_000,
rows_out=48_500,
parent_node_ids=[read_node.node_id],
column_lineage=[
ColumnLineage(
target_column="revenue",
source_columns=["price", "quantity"],
transformation="price * quantity",
)
],
)
graph.add_node(transform_node)
# Navegação no grafo
upstream = graph.get_upstream(transform_node.node_id) # Retorna [read_node]
downstream = graph.get_downstream(read_node.node_id) # Retorna [transform_node]
# Serialização limpa (pronta para JSON Lines)
d = graph.to_dict()
Lineage Store¶
A classe LineageStore (em aptdata.plugins.governance) é utilizada para persistir e consultar grafos em memória ou enviar para um banco de metadados.
from aptdata.plugins.governance import LineageStore
store = LineageStore()
store.save(graph)
loaded = store.load("run-20240101")
runs = store.list_runs()
graphs = store.query_by_dataset("s3://raw/orders.parquet")
Emissão Automática via Event Bus
Como definido na ADR 001, a linhagem é preferencialmente extraída de forma automática via hooks do EventBus durante a compilação e execução do BaseFlow, poupando o desenvolvedor de instanciar os nós manualmente.
Registro de Regras de Negócio (Business Rules Registry)¶
Gerencia e audita políticas aplicadas sobre os dados.
from aptdata.plugins.governance import (
BusinessRule,
RuleAuditEntry,
RuleRegistry,
RuleStatus,
)
registry = RuleRegistry()
# 1. Registro da Regra
registry.register(BusinessRule(
rule_id="BR-001",
name="Revenue must be positive",
owner="finance-team",
expression="revenue > 0",
tags=["finance", "revenue"],
))
# 2. Consultas
rule = registry.get("BR-001")
finance_rules = registry.list_rules(tag="finance")
# 3. Trilha de Auditoria (Logs)
registry.record_audit(RuleAuditEntry(
rule_id="BR-001",
status=RuleStatus.APPLIED,
workflow_name="etl_pipeline",
trace_id="abc123",
rows_affected=48_500,
))
log = registry.get_audit_log(rule_id="BR-001")
Catálogo de Datasets (Dataset Catalog)¶
Repositório central para busca e descoberta de dados.
from aptdata.plugins.governance import DatasetCatalog, DatasetCatalogEntry
from aptdata.plugins.governance.classification import ColumnClassification
catalog = DatasetCatalog()
catalog.register(DatasetCatalogEntry(
uri="s3://datalake/orders.parquet",
name="Orders",
description="Registros de pedidos do sistema OLTP principal.",
owner="data-engineering",
tags=["orders", "finance"],
classification=ColumnClassification.CONFIDENTIAL,
))
# Recuperação e Busca
entry = catalog.get("s3://datalake/orders.parquet")
results = catalog.search(owner="data-engineering", tag="finance")
Políticas de Classificação de Dados¶
Definições formais sobre tempo de retenção, necessidade de criptografia e perfis de acesso baseados na sensibilidade da informação.
from aptdata.plugins.governance.classification import (
ColumnClassification,
DataClassificationPolicy,
)
policy = DataClassificationPolicy(
name="LGPD PII Policy",
description="Controles rígidos para colunas contendo dados pessoais identificáveis.",
pii_columns=["email", "phone", "full_name"],
retention_days=365,
encryption_required=True,
access_roles=["data-engineers", "privacy-team"],
)
Integração com Quality Contracts¶
O catálogo de metadados acopla-se diretamente aos Contratos de Schema (ver Quality):
from aptdata.plugins.quality import ColumnClassification, ColumnContract, SchemaContract
from aptdata.plugins.governance import DatasetCatalog, DatasetCatalogEntry
contract = SchemaContract(
name="orders_v1",
version="1.0.0",
owner="data-engineering",
columns=[
ColumnContract(name="id", dtype="int64", nullable=False),
ColumnContract(name="email", dtype="str", nullable=False, pii=True,
classification=ColumnClassification.PII),
],
)
catalog = DatasetCatalog()
catalog.register(DatasetCatalogEntry(
uri="s3://datalake/orders.parquet",
name="Orders",
schema_contract=contract,
))
# Extrai metadados sensíveis diretamente do contrato
pii_cols = contract.get_pii_columns()
Fluxo de Validação de Contratos (Data Contracts)¶
flowchart LR
%% Estilos Customizados (Design Premium)
classDef default fill:#0b132b,stroke:rgba(255,255,255,0.2),stroke-width:1px,color:#fff,rx:8px,ry:8px;
classDef accent fill:#ff6a00,stroke:#ff6a00,stroke-width:1px,color:#fff,rx:8px,ry:8px;
classDef success fill:#0d2b18,stroke:#178a3f,stroke-width:1px,color:#fff,rx:8px,ry:8px;
classDef danger fill:#2b0d0d,stroke:#8a1717,stroke-width:1px,color:#fff,rx:8px,ry:8px;
classDef interface fill:transparent,stroke:rgba(255,255,255,0.4),stroke-width:1px,stroke-dasharray: 5 5,color:#fff;
%% Nós do Sistema
A[(Fonte de Dados)] --> B{Validar Contrato<br>Pydantic}
%% Ramificações Condicionais
B -- Contrato Válido --> C[Processamento<br>aptdata Engine]:::success
B -- Falha na Validação --> D[Dead Letter Queue<br>Quarentena]:::danger
%% Roteamento Dinâmico
C --> E{Roteamento<br>por Metadados}:::accent
E -- Qualidade Alta --> F[Data Warehouse<br>Gold Layer]
E -- Necessita Tratamento --> G[Limpeza Avançada]
%% Agrupamento (Subgrafos com linha tracejada)
subgraph Governança Estrita
B
D
end
class Governança Estrita interface;