Skip to content

Core API

The aptdata.core package exposes the two-layer contract system for all four foundational types.


Dataset

IDataset

aptdata.core.dataset.IDataset dataclass

Bases: ABC, Generic[T]

Dataclass interface for dataset types.

All dataset contracts must implement :meth:read and :meth:write. No concrete fields are defined here – field declarations live in :class:BaseDataset and its subclasses.

Source code in aptdata/core/dataset.py
@dataclass
class IDataset(ABC, Generic[T]):
    """Dataclass interface for dataset types.

    All dataset contracts must implement :meth:`read` and :meth:`write`.
    No concrete fields are defined here – field declarations live in
    :class:`BaseDataset` and its subclasses.
    """

    @abstractmethod
    def read(self) -> T:
        """Read and return data from the dataset."""
        raise NotImplementedError

    @abstractmethod
    def write(self, data: T) -> None:
        """Write data to the dataset."""
        raise NotImplementedError

Functions

read() abstractmethod

Read and return data from the dataset.

Source code in aptdata/core/dataset.py
@abstractmethod
def read(self) -> T:
    """Read and return data from the dataset."""
    raise NotImplementedError

write(data) abstractmethod

Write data to the dataset.

Source code in aptdata/core/dataset.py
@abstractmethod
def write(self, data: T) -> None:
    """Write data to the dataset."""
    raise NotImplementedError

BaseDataset

aptdata.core.dataset.BaseDataset dataclass

Bases: IDataset[Any]

Base dataset with Pydantic-validated fields.

Provides the canonical uri and schema_metadata fields. Concrete dataset implementations must inherit from this class and implement the :meth:read and :meth:write abstract methods inherited from :class:IDataset.

Source code in aptdata/core/dataset.py
@pydantic_dataclass
class BaseDataset(IDataset[Any]):
    """Base dataset with Pydantic-validated fields.

    Provides the canonical ``uri`` and ``schema_metadata`` fields.
    Concrete dataset implementations must inherit from this class and
    implement the :meth:`read` and :meth:`write` abstract methods
    inherited from :class:`IDataset`.
    """

    uri: str
    schema_metadata: dict[str, Any] = field(default_factory=dict)

Component

ComponentKind

aptdata.core.system.ComponentKind

Bases: str, Enum

Supported processing paradigms for a :class:BaseComponent.

Source code in aptdata/core/system.py
class ComponentKind(str, Enum):
    """Supported processing paradigms for a :class:`BaseComponent`."""

    TRANSFORM = "transform"
    FILTER = "filter"
    AGGREGATE = "aggregate"
    EXTRACT = "extract"
    LOAD = "load"
    CUSTOM = "custom"

ComponentMeta

aptdata.core.system.ComponentMeta dataclass

Rich metadata describing a component's role and branching behaviour.

Attributes:

Name Type Description
kind ComponentKind

The processing paradigm this component belongs to.

tags list[str]

Arbitrary string labels for filtering, grouping, or discovery.

branch_on str

When non-empty, names the output field or condition key on which the flow should branch after this component executes.

description str

Human-readable summary of what this component does.

extra dict[str, Any]

Open-ended mapping for framework extensions or user-defined metadata.

Source code in aptdata/core/system.py
@dataclass
class ComponentMeta:
    """Rich metadata describing a component's role and branching behaviour.

    Attributes
    ----------
    kind:
        The processing paradigm this component belongs to.
    tags:
        Arbitrary string labels for filtering, grouping, or discovery.
    branch_on:
        When non-empty, names the output field or condition key on which the
        flow should branch after this component executes.
    description:
        Human-readable summary of what this component does.
    extra:
        Open-ended mapping for framework extensions or user-defined metadata.
    """

    kind: ComponentKind = ComponentKind.CUSTOM
    tags: list[str] = field(default_factory=list)
    branch_on: str = ""
    description: str = ""
    extra: dict[str, Any] = field(default_factory=dict)

IComponent

aptdata.core.system.IComponent dataclass

Bases: ABC

Interface for a reusable unit of work.

A component receives a list of :class:~aptdata.core.dataset.IDataset inputs, validates them, executes its logic, and returns a list of :class:~aptdata.core.dataset.IDataset outputs. Unlike the legacy IStep, it may produce multiple output datasets to support branching flows.

Source code in aptdata/core/system.py
@dataclass
class IComponent(ABC):
    """Interface for a reusable unit of work.

    A component receives a list of :class:`~aptdata.core.dataset.IDataset`
    inputs, validates them, executes its logic, and returns a list of
    :class:`~aptdata.core.dataset.IDataset` outputs.  Unlike the legacy
    ``IStep``, it may produce *multiple* output datasets to support branching
    flows.
    """

    @property
    @abstractmethod
    def context(self) -> IContext | None:
        """The execution context injected by the orchestrator."""
        pass

    @context.setter
    @abstractmethod
    def context(self, value: IContext | None) -> None:
        """Set the execution context."""
        pass

    @property
    @abstractmethod
    def meta(self) -> ComponentMeta:
        """Metadata describing this component."""

    @abstractmethod
    def validate_inputs(self, inputs: list[IDataset]) -> bool:
        """Return ``True`` when *inputs* are valid for this component."""

    @abstractmethod
    def execute(self, inputs: list[IDataset]) -> list[IDataset]:
        """Execute the component logic and return its output datasets."""

Attributes

context: IContext | None abstractmethod property writable

The execution context injected by the orchestrator.

meta: ComponentMeta abstractmethod property

Metadata describing this component.

Functions

validate_inputs(inputs) abstractmethod

Return True when inputs are valid for this component.

Source code in aptdata/core/system.py
@abstractmethod
def validate_inputs(self, inputs: list[IDataset]) -> bool:
    """Return ``True`` when *inputs* are valid for this component."""

execute(inputs) abstractmethod

Execute the component logic and return its output datasets.

Source code in aptdata/core/system.py
@abstractmethod
def execute(self, inputs: list[IDataset]) -> list[IDataset]:
    """Execute the component logic and return its output datasets."""

BaseComponent

aptdata.core.system.BaseComponent dataclass

Bases: IComponent

Base component with Pydantic-validated identity, metadata, and event hooks.

Concrete component implementations must inherit from this class and implement the :meth:validate_inputs and :meth:execute abstract methods inherited from :class:IComponent.

Automatically emits pre_execute, on_success, on_failure, and post_execute events to the IContext event bus during execute.

Parameters:

Name Type Description Default
component_id

A unique identifier for this component within a flow.

required
metadata

A :class:ComponentMeta instance describing the component's role.

required
Source code in aptdata/core/system.py
@pydantic_dataclass
class BaseComponent(IComponent):
    """Base component with Pydantic-validated identity, metadata, and event hooks.

    Concrete component implementations must inherit from this class and
    implement the :meth:`validate_inputs` and :meth:`execute` abstract
    methods inherited from :class:`IComponent`.

    Automatically emits `pre_execute`, `on_success`, `on_failure`, and
    `post_execute` events to the `IContext` event bus during `execute`.

    Parameters
    ----------
    component_id:
        A unique identifier for this component within a flow.
    metadata:
        A :class:`ComponentMeta` instance describing the component's role.
    """

    component_id: str
    metadata: ComponentMeta = field(default_factory=ComponentMeta)

    # Use generic Any because pydantic doesn't know IContext unless
    # arbitrary types are allowed
    _context: Any = Field(default=None, init=False, repr=False, exclude=True)

    @property
    def context(self) -> IContext | None:
        return self._context

    @context.setter
    def context(self, value: IContext | None) -> None:
        self._context = value

    def __init_subclass__(cls, **kwargs: Any) -> None:
        """Wrap subclass execute implementations with telemetry spans."""
        super().__init_subclass__(**kwargs)
        execute_fn = cls.__dict__.get("execute")
        if execute_fn is None or getattr(execute_fn, "_aptdata_instrumented", False):
            return

        @wraps(execute_fn)
        def _instrumented_execute(
            self: BaseComponent, inputs: list[IDataset]
        ) -> list[IDataset]:
            span_name = self.component_id or cls.__name__
            kind = self.meta.kind
            kind_value = (
                kind.value if isinstance(kind, ComponentKind) else str(kind or "")
            )
            tags = sorted(self.meta.tags) if self.meta.tags else []

            event_bus = self.context.event_bus if self.context else None
            if event_bus:
                event_bus.dispatch(
                    ComponentExecutionEvent(
                        event_type="pre_execute",
                        component_id=self.component_id,
                        status="pending"
                    )
                )

            start_time = time.time()
            try:
                with get_tracer().start_as_current_span(span_name) as span:
                    span.set_attribute("aptdata.component_id", self.component_id)
                    span.set_attribute("aptdata.kind", kind_value)
                    span.set_attribute("aptdata.tags", tags)
                    span.set_attribute(
                        "aptdata.branch_on",
                        mask_telemetry_value(self.meta.branch_on, key="branch_on"),
                    )
                    span.set_attribute(
                        "aptdata.description",
                        mask_telemetry_value(self.meta.description, key="description"),
                    )
                    outputs = execute_fn(self, inputs)

                exec_time = time.time() - start_time
                if event_bus:
                    io_uris = [ds.uri for ds in outputs if hasattr(ds, "uri")]
                    event_bus.dispatch(
                        ComponentExecutionEvent(
                            event_type="on_success",
                            component_id=self.component_id,
                            status="success",
                            execution_time=exec_time,
                            io_uris=io_uris
                        )
                    )
                return outputs

            except Exception as e:
                exec_time = time.time() - start_time
                if event_bus:
                    event_bus.dispatch(
                        ComponentExecutionEvent(
                            event_type="on_failure",
                            component_id=self.component_id,
                            status="failed",
                            execution_time=exec_time,
                            error_message=str(e)
                        )
                    )
                raise
            finally:
                if event_bus:
                    event_bus.dispatch(
                        ComponentExecutionEvent(
                            event_type="post_execute",
                            component_id=self.component_id,
                            status="completed"
                        )
                    )

        _instrumented_execute.__isabstractmethod__ = getattr(
            execute_fn, "__isabstractmethod__", False
        )
        _instrumented_execute._aptdata_instrumented = True  # type: ignore[attr-defined]
        cls.execute = _instrumented_execute  # type: ignore[method-assign]

    @property
    def meta(self) -> ComponentMeta:
        return self.metadata

Functions

__init_subclass__(**kwargs)

Wrap subclass execute implementations with telemetry spans.

Source code in aptdata/core/system.py
def __init_subclass__(cls, **kwargs: Any) -> None:
    """Wrap subclass execute implementations with telemetry spans."""
    super().__init_subclass__(**kwargs)
    execute_fn = cls.__dict__.get("execute")
    if execute_fn is None or getattr(execute_fn, "_aptdata_instrumented", False):
        return

    @wraps(execute_fn)
    def _instrumented_execute(
        self: BaseComponent, inputs: list[IDataset]
    ) -> list[IDataset]:
        span_name = self.component_id or cls.__name__
        kind = self.meta.kind
        kind_value = (
            kind.value if isinstance(kind, ComponentKind) else str(kind or "")
        )
        tags = sorted(self.meta.tags) if self.meta.tags else []

        event_bus = self.context.event_bus if self.context else None
        if event_bus:
            event_bus.dispatch(
                ComponentExecutionEvent(
                    event_type="pre_execute",
                    component_id=self.component_id,
                    status="pending"
                )
            )

        start_time = time.time()
        try:
            with get_tracer().start_as_current_span(span_name) as span:
                span.set_attribute("aptdata.component_id", self.component_id)
                span.set_attribute("aptdata.kind", kind_value)
                span.set_attribute("aptdata.tags", tags)
                span.set_attribute(
                    "aptdata.branch_on",
                    mask_telemetry_value(self.meta.branch_on, key="branch_on"),
                )
                span.set_attribute(
                    "aptdata.description",
                    mask_telemetry_value(self.meta.description, key="description"),
                )
                outputs = execute_fn(self, inputs)

            exec_time = time.time() - start_time
            if event_bus:
                io_uris = [ds.uri for ds in outputs if hasattr(ds, "uri")]
                event_bus.dispatch(
                    ComponentExecutionEvent(
                        event_type="on_success",
                        component_id=self.component_id,
                        status="success",
                        execution_time=exec_time,
                        io_uris=io_uris
                    )
                )
            return outputs

        except Exception as e:
            exec_time = time.time() - start_time
            if event_bus:
                event_bus.dispatch(
                    ComponentExecutionEvent(
                        event_type="on_failure",
                        component_id=self.component_id,
                        status="failed",
                        execution_time=exec_time,
                        error_message=str(e)
                    )
                )
            raise
        finally:
            if event_bus:
                event_bus.dispatch(
                    ComponentExecutionEvent(
                        event_type="post_execute",
                        component_id=self.component_id,
                        status="completed"
                    )
                )

    _instrumented_execute.__isabstractmethod__ = getattr(
        execute_fn, "__isabstractmethod__", False
    )
    _instrumented_execute._aptdata_instrumented = True  # type: ignore[attr-defined]
    cls.execute = _instrumented_execute  # type: ignore[method-assign]

Flow

FlowEdge

aptdata.core.system.FlowEdge

A directed edge in a :class:BaseFlow execution graph.

Optionally carries a condition callable; when present the edge is only traversed when condition(outputs) evaluates to True, enabling conditional / branching flows.

Parameters:

Name Type Description Default
source_id

The :attr:~BaseComponent.component_id of the upstream component.

required
target_id

The :attr:~BaseComponent.component_id of the downstream component.

required
condition

Optional predicate evaluated against the source component's outputs.

required
Source code in aptdata/core/system.py
@pydantic_dataclass
class FlowEdge:
    """A directed edge in a :class:`BaseFlow` execution graph.

    Optionally carries a *condition* callable; when present the edge is only
    traversed when ``condition(outputs)`` evaluates to ``True``, enabling
    conditional / branching flows.

    Parameters
    ----------
    source_id:
        The :attr:`~BaseComponent.component_id` of the upstream component.
    target_id:
        The :attr:`~BaseComponent.component_id` of the downstream component.
    condition:
        Optional predicate evaluated against the source component's outputs.
    """

    source_id: str
    target_id: str
    condition: Any = Field(default=None, exclude=True)

FlowNode

aptdata.core.system.FlowNode dataclass

A node wrapping a :class:IComponent inside a :class:IFlow graph.

Parameters:

Name Type Description Default
component IComponent

The component held by this node.

required
flow IFlow | None

Back-reference to the owning flow (set by the flow on insertion).

None
Source code in aptdata/core/system.py
@dataclass
class FlowNode:
    """A node wrapping a :class:`IComponent` inside a :class:`IFlow` graph.

    Parameters
    ----------
    component:
        The component held by this node.
    flow:
        Back-reference to the owning flow (set by the flow on insertion).
    """

    component: IComponent
    flow: IFlow | None = field(default=None, repr=False)

IFlow

aptdata.core.system.IFlow dataclass

Bases: ABC

Interface for a directed execution graph of :class:IComponent nodes.

A flow owns a set of components and the directed edges that connect them. It is responsible for validating the graph structure (:meth:compile) and driving execution (:meth:run).

Source code in aptdata/core/system.py
@dataclass
class IFlow(ABC):
    """Interface for a directed execution graph of :class:`IComponent` nodes.

    A flow owns a set of components and the directed edges that connect them.
    It is responsible for validating the graph structure (:meth:`compile`)
    and driving execution (:meth:`run`).
    """

    @property
    @abstractmethod
    def context(self) -> IContext | None:
        """The execution context injected by the orchestrator."""
        pass

    @context.setter
    @abstractmethod
    def context(self, value: IContext | None) -> None:
        """Set the execution context."""
        pass

    @abstractmethod
    def build(self) -> None:
        """Declarative hook to define components and edges before compilation."""
        pass

    @abstractmethod
    def add_component(
        self,
        component: type[IComponent] | IComponent,
        output_contract: type[BaseModel] | None = None,
    ) -> None:
        """Add *component* as a node in this flow. Can be a class or instance.
        If a class is provided, the flow handles instantiation and validates outputs
        against *output_contract*."""

    @abstractmethod
    def connect(
        self,
        source_id: str,
        target_id: str,
        condition: Callable[[list[IDataset]], bool] | None = None,
    ) -> None:
        """Create a directed edge from *source_id* to *target_id*.

        Parameters
        ----------
        source_id:
            The :attr:`~BaseComponent.component_id` of the upstream component.
        target_id:
            The :attr:`~BaseComponent.component_id` of the downstream component.
        condition:
            Optional predicate that gates traversal of the edge.
        """

    @abstractmethod
    def compile(self) -> None:
        """Validate the graph structure before execution.

        Implementations should raise :exc:`ValueError` when the graph is
        invalid (e.g. unknown node references, cycles in a DAG-only flow).
        """

    @abstractmethod
    def run(self, initial_inputs: list[IDataset]) -> list[IDataset]:
        """Execute the flow starting with *initial_inputs*.

        Returns the outputs produced by the terminal component(s).
        """

Attributes

context: IContext | None abstractmethod property writable

The execution context injected by the orchestrator.

Functions

build() abstractmethod

Declarative hook to define components and edges before compilation.

Source code in aptdata/core/system.py
@abstractmethod
def build(self) -> None:
    """Declarative hook to define components and edges before compilation."""
    pass

add_component(component, output_contract=None) abstractmethod

Add component as a node in this flow. Can be a class or instance. If a class is provided, the flow handles instantiation and validates outputs against output_contract.

Source code in aptdata/core/system.py
@abstractmethod
def add_component(
    self,
    component: type[IComponent] | IComponent,
    output_contract: type[BaseModel] | None = None,
) -> None:
    """Add *component* as a node in this flow. Can be a class or instance.
    If a class is provided, the flow handles instantiation and validates outputs
    against *output_contract*."""

connect(source_id, target_id, condition=None) abstractmethod

Create a directed edge from source_id to target_id.

Parameters:

Name Type Description Default
source_id str

The :attr:~BaseComponent.component_id of the upstream component.

required
target_id str

The :attr:~BaseComponent.component_id of the downstream component.

required
condition Callable[[list[IDataset]], bool] | None

Optional predicate that gates traversal of the edge.

None
Source code in aptdata/core/system.py
@abstractmethod
def connect(
    self,
    source_id: str,
    target_id: str,
    condition: Callable[[list[IDataset]], bool] | None = None,
) -> None:
    """Create a directed edge from *source_id* to *target_id*.

    Parameters
    ----------
    source_id:
        The :attr:`~BaseComponent.component_id` of the upstream component.
    target_id:
        The :attr:`~BaseComponent.component_id` of the downstream component.
    condition:
        Optional predicate that gates traversal of the edge.
    """

compile() abstractmethod

Validate the graph structure before execution.

Implementations should raise :exc:ValueError when the graph is invalid (e.g. unknown node references, cycles in a DAG-only flow).

Source code in aptdata/core/system.py
@abstractmethod
def compile(self) -> None:
    """Validate the graph structure before execution.

    Implementations should raise :exc:`ValueError` when the graph is
    invalid (e.g. unknown node references, cycles in a DAG-only flow).
    """

run(initial_inputs) abstractmethod

Execute the flow starting with initial_inputs.

Returns the outputs produced by the terminal component(s).

Source code in aptdata/core/system.py
@abstractmethod
def run(self, initial_inputs: list[IDataset]) -> list[IDataset]:
    """Execute the flow starting with *initial_inputs*.

    Returns the outputs produced by the terminal component(s).
    """

BaseFlow

aptdata.core.system.BaseFlow dataclass

Bases: IFlow

Base flow with Pydantic-validated identity and a managed graph.

Concrete flow implementations must inherit from this class and implement the :meth:add_component, :meth:connect, :meth:compile and :meth:run abstract methods inherited from :class:IFlow.

Parameters:

Name Type Description Default
flow_id

A unique identifier for this flow within a system.

required
Source code in aptdata/core/system.py
@pydantic_dataclass
class BaseFlow(IFlow):
    """Base flow with Pydantic-validated identity and a managed graph.

    Concrete flow implementations must inherit from this class and implement
    the :meth:`add_component`, :meth:`connect`, :meth:`compile` and
    :meth:`run` abstract methods inherited from :class:`IFlow`.

    Parameters
    ----------
    flow_id:
        A unique identifier for this flow within a system.
    """

    flow_id: str

    _nodes: dict[str, FlowNode] = field(default_factory=dict, init=False, repr=False)
    _edges: list[FlowEdge] = field(default_factory=list, init=False, repr=False)

    # Use generic Any because pydantic doesn't know IContext unless
    # arbitrary types are allowed
    _context: Any = Field(default=None, init=False, repr=False, exclude=True)

    @property
    def context(self) -> IContext | None:
        return self._context

    @context.setter
    def context(self, value: IContext | None) -> None:
        self._context = value

    def build(self) -> None:
        pass

    def add_component(
        self,
        component: type[IComponent] | IComponent,
        output_contract: type[BaseModel] | None = None,
    ) -> None:
        if isinstance(component, type):
            # Instantiate component with DI. (For this scaffold, default init)
            # A real DI container could be used here.
            comp_instance = component(component_id=component.__name__)  # type: ignore
        else:
            comp_instance = component

        if self._context:
            comp_instance.context = self._context

        # Optional: Wrap execution to enforce output_contract if needed,
        # though usually components themselves should use PydanticDataset directly
        # if they want.
        # However, the prompt says "aplicar um wrapper de interceptação para validar
        # o contrato na saída."
        if output_contract is not None:
            original_execute = comp_instance.execute

            def _wrapped_execute(inputs: list[IDataset]) -> list[IDataset]:
                results = original_execute(inputs)
                validated_results = []
                for res in results:
                    # Enforce the contract by wrapping the result in a PydanticDataset
                    data = res.read()
                    ds = PydanticDataset(
                        uri=res.uri if hasattr(res, "uri") else "memory://contract",
                        contract=output_contract,
                    )
                    ds.write(data)
                    validated_results.append(ds)
                return validated_results

            comp_instance.execute = _wrapped_execute  # type: ignore[method-assign]

        self._nodes[comp_instance.component_id] = FlowNode(
            component=comp_instance, flow=self
        )

    def connect(
        self,
        source_id: str,
        target_id: str,
        condition: Callable[[list[IDataset]], bool] | None = None,
    ) -> None:
        self._edges.append(
            FlowEdge(source_id=source_id, target_id=target_id, condition=condition)
        )

    def compile(self) -> None:
        self.build()
        # Basic validation
        for edge in self._edges:
            if edge.source_id not in self._nodes or edge.target_id not in self._nodes:
                raise ValueError(f"Invalid edge: {edge}")

    def run(self, initial_inputs: list[IDataset]) -> list[IDataset]:
        self.compile()
        # A simple linear execution for now if no edges are defined
        # This is a naive runner for the scaffold. In a real system,
        # a DAG runner is used.
        if not self._edges:
            current_inputs = initial_inputs
            for node in self._nodes.values():
                if node.component.validate_inputs(current_inputs):
                    current_inputs = node.component.execute(current_inputs)
            return current_inputs

        # Proper topological sort / DAG execution would go here.
        # Since this scaffold mainly cares about linear Flows, we return inputs.
        return initial_inputs

System

ISystem

aptdata.core.system.ISystem dataclass

Bases: ABC

Interface for a system that orchestrates one or more :class:IFlow instances.

Source code in aptdata/core/system.py
@dataclass
class ISystem(ABC):
    """Interface for a system that orchestrates one or more :class:`IFlow` instances."""

    @abstractmethod
    def setup(self) -> None:
        """Lifecycle hook called before flow registration and execution."""
        pass

    @abstractmethod
    def register_flow(self, flow: IFlow) -> None:
        """Register *flow* in this system."""

    @abstractmethod
    def run(self) -> None:
        """Execute all registered flows."""

    def on_complete(self, context: IContext) -> None:
        """Lifecycle hook called after all flows complete."""
        pass

Functions

setup() abstractmethod

Lifecycle hook called before flow registration and execution.

Source code in aptdata/core/system.py
@abstractmethod
def setup(self) -> None:
    """Lifecycle hook called before flow registration and execution."""
    pass

register_flow(flow) abstractmethod

Register flow in this system.

Source code in aptdata/core/system.py
@abstractmethod
def register_flow(self, flow: IFlow) -> None:
    """Register *flow* in this system."""

run() abstractmethod

Execute all registered flows.

Source code in aptdata/core/system.py
@abstractmethod
def run(self) -> None:
    """Execute all registered flows."""

on_complete(context)

Lifecycle hook called after all flows complete.

Source code in aptdata/core/system.py
def on_complete(self, context: IContext) -> None:
    """Lifecycle hook called after all flows complete."""
    pass

BaseSystem

aptdata.core.system.BaseSystem dataclass

Bases: ISystem

Base system with Pydantic-validated identity and Event Bus manager.

Concrete system implementations must inherit from this class and implement the :meth:register_flow and :meth:run abstract methods inherited from :class:ISystem.

Instantiates a global EventBus on __post_init__ and manages the lifecycle for decoupled observability and governance logic.

Parameters:

Name Type Description Default
system_id

A unique identifier for this system.

required
Source code in aptdata/core/system.py
@pydantic_dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class BaseSystem(ISystem):
    """Base system with Pydantic-validated identity and Event Bus manager.

    Concrete system implementations must inherit from this class and implement
    the :meth:`register_flow` and :meth:`run` abstract methods inherited from
    :class:`ISystem`.

    Instantiates a global `EventBus` on `__post_init__` and manages the lifecycle
    for decoupled observability and governance logic.

    Parameters
    ----------
    system_id:
        A unique identifier for this system.
    """

    system_id: str

    _flows: list[IFlow] = field(default_factory=list, init=False, repr=False)
    # We use Field(..., exclude=True) so it is not exported in Pydantic schema
    # generation
    _context: Any = Field(
        default_factory=ExecutionContext, init=False, repr=False, exclude=True
    )

    def __post_init__(self) -> None:
        """Initialize the event bus in the context for this system."""
        if not hasattr(self, "_context") or self._context is None:
            self._context = ExecutionContext()
        self._context.event_bus = EventBus()

    def setup(self) -> None:
        pass

    def register_flow(self, flow: IFlow) -> None:
        self._flows.append(flow)

    def run(self) -> None:
        self.setup()

        # A simple system execution
        current_inputs: list[IDataset] = []
        for flow in self._flows:
            flow.context = self._context
            if isinstance(flow, BaseFlow):
                for node in flow._nodes.values():
                    node.component.context = self._context
            current_inputs = flow.run(current_inputs)

        self.on_complete(self._context)

        # Ensure all async events are fully flushed to avoid zombie threads
        # or missing logs on CLI exit.
        if hasattr(self._context.event_bus, "shutdown"):
            self._context.event_bus.shutdown()

    def on_complete(self, context: IContext) -> None:
        pass

Functions

__post_init__()

Initialize the event bus in the context for this system.

Source code in aptdata/core/system.py
def __post_init__(self) -> None:
    """Initialize the event bus in the context for this system."""
    if not hasattr(self, "_context") or self._context is None:
        self._context = ExecutionContext()
    self._context.event_bus = EventBus()

Quick-import

All names are re-exported from the top-level aptdata.core package:

from aptdata.core import (
    IDataset, BaseDataset,
    ComponentKind, ComponentMeta,
    IComponent,   BaseComponent,
    FlowEdge,     FlowNode,
    IFlow,        BaseFlow,
    ISystem,      BaseSystem,
)