Skip to content

Motores de Transformação (Transform Engines)

O aptdata fornece wrappers agnósticos de engine que se integram perfeitamente com a classe Workflow, emitem rastreamento (spans) para o OpenTelemetry e lidam com serialização transparente para o InMemoryDataset.

Para utilizá-los, instale o grupo opcional correspondente:

pip install "aptdata[pandas]"
pip install "aptdata[spark]"

Imports Preguiçosos (Lazy Imports)

O framework isola dependências pesadas. A importação do módulo (from aptdata.plugins.transform import PandasTransformer) sempre funciona, e o PluginDependencyError só é levantado caso você tente instanciar a classe e o pacote falte no ambiente.


Comparativo de Engines

O PandasTransformer empacota funções puras do tipo (pd.DataFrame) -> pd.DataFrame, interceptando o input/output para conversão automática quando necessário.

import pandas as pd
from aptdata.plugins.transform import PandasTransformer
from aptdata.core.workflow import Workflow

def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Função de negócio pura, isolada do framework."""
    return df.dropna().drop_duplicates()

transformer = PandasTransformer("clean_records", clean)

wf = Workflow("my_pipeline")
wf.add_step(transformer.transform)
result = wf.execute(my_dataset)

Resolução Automática de Tipos:

Input Recebido Comportamento do Wrapper
InMemoryDataset Extrai .read() para DataFrame, roda a função e repacota a saída no InMemoryDataset.
pd.DataFrame Executa diretamente; Retorna um pd.DataFrame.
list[dict] Converte para DataFrame temporário; Retorna um pd.DataFrame.

O PySparkTransformer empacota funções do tipo (SparkSession, DataFrame) -> DataFrame.

from pyspark.sql import functions as F
from aptdata.plugins.transform import PySparkTransformer
from aptdata.core.workflow import Workflow

def compute_revenue(spark, df):
    return df.withColumn("revenue", F.col("price") * F.col("quantity"))

transformer = PySparkTransformer("compute_revenue", compute_revenue, app_name="ETL_Job")

wf = Workflow("spark_pipeline")
wf.add_step(transformer.transform)
result = wf.execute(spark_df)

Atributos de Telemetria (OTel)

A injeção dos transformadores no Workflow garante observabilidade imediata de performance.

Atributo Padrão OTel Descrição
aptdata.transformer.name Identificador humano do step.
aptdata.transformer.engine Engine detectada (pandas ou pyspark).
aptdata.transformer.rows_in Contagem de linhas antes da transformação.
aptdata.transformer.rows_out Contagem de linhas após a transformação.
aptdata.transformer.compute_time_ms Tempo efetivo de execução da função encapsulada.

Atributos exclusivos do Spark:

Atributo OTel Descrição
aptdata.spark.app_name Nome da aplicação (Session).
aptdata.spark.ui_url URL do Spark UI (se em execução).

Orquestração (Workflow)

Ambos implementam a abstração BaseTransformer e são compatíveis com Workflow.add_step() de forma intercalada a validadores de qualidade.

flowchart LR
    DS["📥 Dataset\n(InMemoryDataset)"]
    PT["⚙️ PandasTransformer\nou PySparkTransformer"]
    QV["✅ QualityValidator"]
    OUT["📤 Dataset Limpo"]

    DS --> PT --> QV --> OUT