Core API

This section documents the core components of AIECS.

Core Interfaces

class aiecs.core.interface.execution_interface.IToolProvider[source]

Bases: ABC

Tool provider interface - Domain layer abstraction

abstract get_tool(tool_name)[source]

Get tool instance

Parameters:

tool_name (str)

Return type:

Any

abstract has_tool(tool_name)[source]

Check if tool exists

Parameters:

tool_name (str)

Return type:

bool

class aiecs.core.interface.execution_interface.IToolExecutor[source]

Bases: ABC

Tool executor interface - Domain layer abstraction

abstract execute(tool, operation_name, **params)[source]

Execute tool operation synchronously

Parameters:
  • tool (Any)

  • operation_name (str)

Return type:

Any

abstract async execute_async(tool, operation_name, **params)[source]

Execute tool operation asynchronously

Parameters:
  • tool (Any)

  • operation_name (str)

Return type:

Any

class aiecs.core.interface.execution_interface.ICacheProvider[source]

Bases: ABC

Cache provider interface - Domain layer abstraction

abstract generate_cache_key(operation_type, user_id, task_id, args, kwargs)[source]

Generate cache key

Parameters:
Return type:

str

abstract get_from_cache(cache_key)[source]

Get data from cache

Parameters:

cache_key (str)

Return type:

Any | None

abstract add_to_cache(cache_key, value)[source]

Add data to cache

Parameters:
Return type:

None

class aiecs.core.interface.execution_interface.IOperationExecutor[source]

Bases: ABC

Operation executor interface - Domain layer abstraction

abstract async execute_operation(operation_spec, params)[source]

Execute single operation

Parameters:
Return type:

Any

abstract async batch_execute_operations(operations)[source]

Batch execute operations

Parameters:

operations (List[Dict[str, Any]])

Return type:

List[Any]

abstract async execute_operations_sequence(operations, user_id, task_id, stop_on_failure=False, save_callback=None)[source]

Execute operations sequence sequentially

Parameters:
Return type:

List[TaskStepResult]

abstract async execute_parallel_operations(operations)[source]

Execute operations in parallel

Parameters:

operations (List[Dict[str, Any]])

Return type:

List[TaskStepResult]

class aiecs.core.interface.execution_interface.ExecutionInterface[source]

Bases: ABC

Unified execution interface that defines standard methods for service and tool execution. Supports plugin-based execution engines, allowing future introduction of new executors without modifying upper-level code.

abstract async execute_operation(operation_spec, params)[source]

Execute a single operation (e.g., tool operation or service subtask).

Parameters:
  • operation_spec (str) – Operation specification, format as ‘tool_name.operation_name’ or other identifier

  • params (Dict[str, Any]) – Operation parameters

Returns:

Operation result

Return type:

Any

abstract async execute_task(task_name, input_data, context)[source]

Execute a single task (e.g., service task).

Parameters:
  • task_name (str) – Task name

  • input_data (Dict[str, Any]) – Input data

  • context (Dict[str, Any]) – Context information

Returns:

Task result

Return type:

Any

abstract async batch_execute_operations(operations)[source]

Batch execute multiple operations.

Parameters:

operations (List[Dict[str, Any]]) – List of operations, each containing ‘operation’ and ‘params’

Returns:

List of operation results

Return type:

List[Any]

abstract async batch_execute_tasks(tasks)[source]

Batch execute multiple tasks.

Parameters:

tasks (List[Dict[str, Any]]) – List of tasks, each containing ‘task_name’, ‘input_data’, ‘context’

Returns:

List of task results

Return type:

List[Any]

register_executor(executor_type, executor_instance)[source]

Register new executor type, supporting plugin-based extension.

Parameters:
  • executor_type (str) – Executor type identifier

  • executor_instance (Any) – Executor instance

Return type:

None

Storage interfaces for the middleware architecture.

This module defines the core storage abstractions following the same pattern as other core interfaces, enabling dependency inversion and clean architecture.

class aiecs.core.interface.storage_interface.ISessionStorage[source]

Bases: ABC

Session storage interface - Domain layer abstraction

abstract async create_session(session_id, user_id, metadata=None)[source]

Create a new session.

Parameters:
Return type:

Dict[str, Any]

abstract async get_session(session_id)[source]

Get session by ID.

Parameters:

session_id (str)

Return type:

Dict[str, Any] | None

abstract async update_session(session_id, updates=None, increment_requests=False, add_processing_time=0.0, mark_error=False)[source]

Update session with activity and metrics.

Parameters:
Return type:

bool

abstract async end_session(session_id, status='completed')[source]

End a session and update metrics.

Parameters:
  • session_id (str)

  • status (str)

Return type:

bool

class aiecs.core.interface.storage_interface.IConversationStorage[source]

Bases: ABC

Conversation storage interface - Domain layer abstraction

abstract async add_conversation_message(session_id, role, content, metadata=None)[source]

Add message to conversation history.

Parameters:
Return type:

bool

abstract async get_conversation_history(session_id, limit=50)[source]

Get conversation history for a session.

Parameters:
  • session_id (str)

  • limit (int)

Return type:

List[Dict[str, Any]]

class aiecs.core.interface.storage_interface.ICheckpointStorage[source]

Bases: ABC

Checkpoint storage interface - Domain layer abstraction

abstract async store_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None)[source]

Store checkpoint data.

Parameters:
Return type:

bool

abstract async get_checkpoint(thread_id, checkpoint_id=None)[source]

Get checkpoint data. If checkpoint_id is None, get the latest.

Parameters:
  • thread_id (str)

  • checkpoint_id (str | None)

Return type:

Dict[str, Any] | None

abstract async list_checkpoints(thread_id, limit=10)[source]

List checkpoints for a thread, ordered by creation time.

Parameters:
Return type:

List[Dict[str, Any]]

class aiecs.core.interface.storage_interface.ITaskContextStorage[source]

Bases: ABC

Task context storage interface - Domain layer abstraction

abstract async get_task_context(session_id)[source]

Get TaskContext for a session.

Parameters:

session_id (str)

Return type:

Any | None

abstract async store_task_context(session_id, context)[source]

Store TaskContext for a session.

Parameters:
  • session_id (str)

  • context (Any)

Return type:

bool

class aiecs.core.interface.storage_interface.IStorageBackend[source]

Bases: ISessionStorage, IConversationStorage, ICheckpointStorage, ITaskContextStorage

Unified storage backend interface - Domain layer abstraction

This interface combines all storage capabilities and follows the same pattern as other core interfaces in the middleware architecture.

abstract async initialize()[source]

Initialize the storage backend.

Return type:

bool

abstract async close()[source]

Close the storage backend.

abstract async health_check()[source]

Perform health check.

Return type:

Dict[str, Any]

abstract async get_metrics()[source]

Get comprehensive metrics.

Return type:

Dict[str, Any]

abstract async cleanup_expired_sessions(max_idle_hours=24)[source]

Clean up expired sessions and associated data.

Parameters:

max_idle_hours (int)

Return type:

int

class aiecs.core.interface.storage_interface.IPermanentStorageBackend[source]

Bases: ABC

Permanent storage backend interface for disk-based cold archive.

Used for dual-write alongside Redis (hot cache). Append-only semantics, optimized for analytics and long-term retention. Typical implementation: ClickHouse, PostgreSQL, etc.

All methods are fire-and-forget: failures should not block the primary Redis write path. Implementations should handle errors internally.

abstract async append_session_event(session_id, user_id, event_type, payload, created_at=None)[source]

Append session create/update/end event for audit/analytics.

Parameters:
Return type:

bool

abstract async append_conversation_message(session_id, role, content, metadata=None, created_at=None)[source]

Append conversation message (append-only).

Parameters:
Return type:

bool

abstract async append_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None, created_at=None)[source]

Append checkpoint data.

Parameters:
Return type:

bool

abstract async append_checkpoint_writes(thread_id, checkpoint_id, task_id, writes_data, created_at=None)[source]

Append checkpoint writes.

Parameters:
Return type:

bool

abstract async append_conversation_session(session_key, session_data, created_at=None)[source]

Append conversation session metadata.

Parameters:
Return type:

bool

abstract async append_task_context_snapshot(session_id, context_data, created_at=None)[source]

Append task context snapshot (versioned).

Parameters:
Return type:

bool

abstract async initialize()[source]

Initialize the permanent storage backend.

Return type:

bool

abstract async close()[source]

Close the permanent storage backend.

Return type:

None

class aiecs.core.interface.storage_interface.ICheckpointerBackend[source]

Bases: ABC

Checkpointer backend interface for LangGraph integration.

This interface defines the minimal contract needed by BaseServiceCheckpointer to work with any storage backend, following dependency inversion principle.

abstract async put_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None)[source]

Store a checkpoint for LangGraph workflows.

Parameters:
Return type:

bool

abstract async get_checkpoint(thread_id, checkpoint_id=None)[source]

Retrieve a checkpoint for LangGraph workflows.

Parameters:
  • thread_id (str)

  • checkpoint_id (str | None)

Return type:

Dict[str, Any] | None

abstract async list_checkpoints(thread_id, limit=10)[source]

List checkpoints for LangGraph workflows.

Parameters:
Return type:

List[Dict[str, Any]]

abstract async put_writes(thread_id, checkpoint_id, task_id, writes_data)[source]

Store intermediate writes for a checkpoint.

Parameters:
Return type:

bool

abstract async get_writes(thread_id, checkpoint_id)[source]

Get intermediate writes for a checkpoint.

Parameters:
  • thread_id (str)

  • checkpoint_id (str)

Return type:

List[tuple]