Primeiros Passos (Getting Started)¶
Requisitos¶
- Python ≥ 3.10
- Poetry (recomendado) ou
pip
Instalação¶
Dependências Opcionais¶
pandas: Suporte ao Pandas ePandasTransformComponentspark: Suporte ao PySparkplugins: Adaptadores REST, PostgreSQL, Parquet I/Oai: Servidor Model Context Protocol (MCP) para IAall: Instala tudo
Verificando a Instalação¶
Rode no terminal:
Saída esperada:
Usage: aptdata [OPTIONS] COMMAND [ARGS]...
Smart Data – framework declarativo de pipelines.
Options:
--help Exibe esta mensagem e encerra.
Commands:
monitor Abre o dashboard TUI de monitoramento.
run Executa um sistema registrado.
scaffold Inicializa um novo projeto aptdata.
mesh Orquestra componentes via mesh.yaml.
Construindo seu Primeiro Sistema¶
O fluxo lógico de orquestração do framework segue cinco etapas principais:
flowchart LR
%% Estilos Customizados (Design Premium)
classDef default fill:#0b132b,stroke:#ff6a00,stroke-width:1px,color:#fff,rx:8px,ry:8px;
DS["1️⃣ Dataset\nLeitura / Escrita (IDataset)"]
CO["2️⃣ Component\nTransformação (IComponent)"]
FL["3️⃣ Flow\nConexões Condicionais (IFlow)"]
SY["4️⃣ System\nOrquestrador Base (ISystem)"]
RG["5️⃣ Execução\nCLI ou Plugin Registry"]
DS --> CO --> FL --> SY --> RG
1. Criar um Dataset¶
Um Dataset implementa as operações de leitura e escrita através do contrato IDataset. Herdando de BaseDataset, você recebe injeção de estado e validação via Pydantic.
from pydantic.dataclasses import dataclass as pydantic_dataclass
from aptdata.core import BaseDataset
@pydantic_dataclass
class MemoryDataset(BaseDataset): # (1)!
"""Dataset em memória para propósitos de teste."""
def __post_init__(self) -> None: # (2)!
self._data = None
def read(self): # (3)!
return self._data
def write(self, data) -> None:
self._data = data
- A herança de
BaseDatasetgarante injeção do esquema Pydantic para validação robusta. - O método
__post_init__é o local ideal para definir propriedades mutáveis ou privadas que não devem ser validadas como input do construtor. - O método
read()é onde a lógica de extração real (ex: chamar a API do Pandas ou do Spark) acontece.
2. Criar um Componente¶
Um Componente (implementa IComponent) recebe uma lista de inputs validados, os processa, e retorna uma lista de outputs (permitindo múltiplas saídas ou fluxos paralelos).
from pydantic.dataclasses import dataclass as pydantic_dataclass
from aptdata.core import BaseComponent, ComponentMeta, ComponentKind, IDataset
@pydantic_dataclass
class DoubleComponent(BaseComponent):
"""Duplica todos os valores numéricos da lista."""
def validate_inputs(self, inputs: list[IDataset]) -> bool:
return len(inputs) == 1
def execute(self, inputs: list[IDataset]) -> list[IDataset]:
data = inputs[0].read()
out = MemoryDataset(uri="memory://output")
out.write([x * 2 for x in data])
return [out]
comp = DoubleComponent(
component_id="double",
metadata=ComponentMeta(kind=ComponentKind.TRANSFORM, tags=["math"]),
)
Dica de DX
Com a API Declarativa (decorators como @pandas_component), a instanciação manual do InMemoryDataset é feita pelo framework "por debaixo dos panos". Você codifica apenas a função recebendo e devolvendo DataFrames.
3. Criar um Fluxo (Flow)¶
Um Fluxo liga componentes em um Grafo Direcionado. Herdando de BaseFlow, as arestas suportam condicionais nativas (FlowEdge).
from pydantic.dataclasses import dataclass as pydantic_dataclass
from aptdata.core import BaseFlow, IComponent, IDataset, FlowEdge, FlowNode
from typing import Callable
@pydantic_dataclass
class SimpleFlow(BaseFlow):
def __post_init__(self) -> None:
self._nodes: dict[str, FlowNode] = {}
self._edges: list[FlowEdge] = []
self._order: list[str] = []
def add_component(self, c: IComponent) -> None:
self._nodes[c.component_id] = FlowNode(component=c, flow=self)
def connect(self, src: str, tgt: str, condition: Callable | None = None) -> None:
self._edges.append(FlowEdge(source_id=src, target_id=tgt, condition=condition))
def compile(self) -> None:
targets = {e.target_id for e in self._edges}
roots = [cid for cid in self._nodes if cid not in targets]
queue = list(roots)
while queue:
current = queue.pop(0)
self._order.append(current)
for e in self._edges:
if e.source_id == current:
queue.append(e.target_id)
def run(self, inputs: list[IDataset]) -> list[IDataset]:
outputs = inputs
for cid in self._order:
comp = self._nodes[cid].component
if comp.validate_inputs(outputs):
outputs = comp.execute(outputs)
return outputs
4. Criar e Registrar um Sistema¶
from pydantic.dataclasses import dataclass as pydantic_dataclass
from aptdata.core import BaseSystem, IFlow
from aptdata.plugins import registry
@pydantic_dataclass
class MySystem(BaseSystem):
def __post_init__(self) -> None:
self._flows: list[IFlow] = []
def register_flow(self, flow: IFlow) -> None:
self._flows.append(flow)
def run(self) -> None:
ds = MemoryDataset(uri="memory://input")
ds.write([1, 2, 3])
inputs = [ds]
for flow in self._flows:
inputs = flow.run(inputs)
# Registre no Plugin Registry global
registry.register("my_system", MySystem)
5. Executar via CLI¶
Com o seu componente registrado no entrypoint do módulo, execute:
Eventos de ciclo de vida automáticos (pre_execute, on_success) são emitidos internamente pelo EventBus.
Saída esperada em formato JSON Lines:
{"event": "pipeline.started", "pipeline": "my_system", "env": "dev", "dry_run": false}
{"event": "pipeline.completed", "pipeline": "my_system", "env": "dev", "dry_run": false, "elapsed_seconds": 0.001}
Opções Úteis da CLI¶
| Opção | Default | Descrição |
|---|---|---|
name |
(obrigatório) | Nome do sistema registrado (registry.register) |
--env, -e |
dev |
Variável de ambiente alvo da execução |
--dry-run |
false |
Instancia componentes mas não dispara o run() |
| Opção | Default | Descrição |
|---|---|---|
--refresh, -r |
1.0 |
Intervalo em segundos de auto-atualização do terminal |
Integração MCP com Agentes de IA¶
O framework inclui um servidor Model Context Protocol (MCP) (aptdata mcp-start) que permite a agentes de IA (Claude, Copilot, Devin) descobrirem e interagirem com seus pipelines.
Adicione ao claude_desktop_config.json:
Agentes podem consultar metadados com as URIs schema://datasets/{name} e chamar tools como run_flow sem alucinações de schema.