Overview #
FlashLearn Core Module is designed to abstract the complexity of interfacing with generative AI models by providing:
- A lightweight client: through
FlashLiteLLMClient
, wrapping external LLM APIs (such as those accessed via thelitellm
package). - A robust orchestration engine: for asynchronous parallel task processing that includes error handling, rate limiting, exponential backoff, and progress tracking.
This module is essential for building scalable, production-grade systems that interact with AI services, as it provides the necessary infrastructure to manage thousands of tasks concurrently while gracefully handling transient errors.
Module and Class Details #
Flash Client #
FlashLiteLLMClient #
File: flashlearn/core/flash_client.py
-
Description:
A lightweight client that wraps the underlying LLM library (litellm
) to generate completions. It serves as an abstraction layer that simplifies API access. -
Constructor:
__init__(self)
Initializes a new instance ofFlashLiteLLMClient
. Currently, no attributes are set at initialization.
Chat and Completions #
-
Inner Class: Chat
-
Purpose:
Encapsulates methods related to conversational completions. -
Nested Class: Completions
- Method:
create(**kwargs)
- Description:
A static method that generates completions by calling the lower-level LLM API. - Behavior:
- Receives a set of keyword arguments.
- Automatically updates these arguments with
{'no-log': True}
to disable logging. - Calls and returns the result of
litellm.completion(**kwargs)
.
- Description:
- Method:
-
-
Property:
chat
- Description:
Exposes an instance of theChat
class. This provides access to the.completions
property, allowing clients to callFlashLiteLLMClient.chat.completions.create(...)
.
- Description:
Orchestration and Task Execution #
File: flashlearn/core/orchestration.py
This module encompasses the logic needed to orchestrate API calls with parallel processing, rate limiting, token management, retry strategies, and error handling.
Custom API Error Classes #
-
ApiError(Exception)
- Docstring: “Base class for all API-related errors.”
-
ApiAuthError(ApiError)
- Docstring: “Authentication/authorization error (e.g., 401/403).”
-
ApiRateLimitError(ApiError)
- Docstring: “Rate limit exceeded (e.g., 429).”
-
ApiServerError(ApiError)
- Docstring: “Server-side error (e.g., 500/503).”
-
ApiUnrecoverableError(ApiError)
- Docstring: “Any other error we can’t or don’t want to retry.”
These exception classes enable the orchestration engine to adapt its behavior depending on the error type encountered during an API call.
analyze_response_for_errors #
-
Signature:
analyze_response_for_errors(response_json: Dict[str, Any]) → None
-
Docstring:
Checks for the presence of an"error"
key in the response JSON and raises an appropriate exception if found. -
Behavior:
- If no error is present, the function returns immediately.
- If an error is found:
- Extracts the error code and message.
- Raises:
ApiAuthError
for error codes 401 or 403.ApiRateLimitError
for error code 429.ApiServerError
for error codes 500 or 503.ApiUnrecoverableError
for any other error situation.
StatusTracker #
-
Class:
StatusTracker
-
Type: Data class
-
Docstring:
Tracks global statistics and usage for the orchestrated parallel processing run. -
Attributes:
num_tasks_started
: int – Number of tasks started.num_tasks_in_progress
: int – Tasks currently in progress.num_tasks_succeeded
: int – Successful task count.num_tasks_failed
: int – Failed task count.num_rate_limit_errors
: int – Rate limit error occurrences.num_api_errors
: int – Occurrences of API errors.num_other_errors
: int – Any other errors.time_of_last_rate_limit_error
: float – Timestamp of the last rate limit error.total_input_tokens
: int – Aggregate input token consumption.total_output_tokens
: int – Aggregate output token consumption.
append_to_jsonl #
-
Signature:
append_to_jsonl(data: Any, filename: Optional[str]) → None
-
Docstring:
Appends a single JSON-serializable item to a JSON Lines file. Each call writes exactly one line in JSON. -
Parameters:
data
: JSON-serializable object.filename
: The file path to which the data should be appended. IfNone
, the function is a no-op.
-
Behavior:
- Opens the file in append mode with UTF-8 encoding.
- Writes the JSON stringified version of the data followed by a newline.
ParallelTask and its Methods #
-
Class:
ParallelTask
-
Type: Data class
-
Docstring:
Represents the data and metadata for one parallelizable request (task). -
Key Attributes:
task_id
: Unique numeric identifier.custom_id
: Custom identifier for the task.request_json
: The JSON payload for the API request.token_consumption
: Estimated tokens required for this task.attempts_left
: Number of retry attempts remaining.client
: Reference to the LLM client (FlashLiteLLMClient).metadata
: Additional metadata (default: empty dict).pbar
: Optional progress bar object fromtqdm
.results_dict
: Dictionary to store results.result
: List to store either successful responses or error messages.
-
Key Methods:
-
_extract_function_call_arguments
- Signature:
def _extract_function_call_arguments(self, completion: Any) → Any
- Docstring:
Extracts JSON arguments from the model’s function call (if present). - Behavior:
- Attempts to extract a string representation of the function arguments via the API response structure.
- Uses
ast.literal_eval
to interpret the string. - Tries to load a JSON string from the parsed object via
json.loads
. - If further parsing fails, returns the raw string or logs an error and returns a parse error message.
- Signature:
-
call_api
- Signature:
async def call_api(self, retry_queue: asyncio.Queue, save_filepath: Optional[str], status_tracker: "StatusTracker") → None
- Docstring:
Invokes the client API in a separate thread, checks for known error codes, and re-queues on retryable failures. - Behavior:
- Uses
asyncio.to_thread
to perform the synchronous API call asynchronously. - Calls
analyze_response_for_errors
on the API response. - Attempts to extract token usage (prompt tokens and completion tokens).
- Calls
_save_success
on successful completion, recording results and updating counters. - On exceptions (rate limit, server errors, authorization errors, or other errors):
- Increments appropriate error counters.
- Implements exponential backoff by calculating a delay (up to 60 seconds) and scheduling a retry if attempts remain.
- If no attempts remain, it calls
_save_failed
to record the failure.
- Uses
- Signature:
-
_save_success
- Signature:
def _save_success(self, filepath: Optional[str], response_json: dict, status_tracker: "StatusTracker", prompt_tokens: int = 0, completion_tokens: int = 0) → None
- Docstring:
Records a successful result both in JSONL (if filepath is provided) and in memory. - Behavior:
- Prepares an array of the request, response, and metadata.
- Utilizes
append_to_jsonl
to persist the result. - Updates the shared results dictionary if provided.
- Adjusts status counters (success count, in-progress tasks, and token tallies).
- Updates the progress bar if configured.
- Signature:
-
_save_failed
- Signature:
def _save_failed(self, filepath: Optional[str], status_tracker: "StatusTracker") → None
- Docstring:
Records a permanently failed result in JSONL (if filepath is provided) and updates counters. - Behavior:
- Writes the failure details (request, error messages, and metadata) to a JSONL file.
- Marks the task as an error in the shared results dictionary.
- Updates failure counters and decreases the tasks in progress.
- Signature:
-
token_count_for_task #
-
Signature:
def token_count_for_task(task_data: dict, token_encoding_name: str = "cl100k_base") → int
-
Docstring:
Estimates how many tokens this request might consume. Adjust logic to match your actual request plus token counting strategy. -
Parameters:
task_data
: The task dictionary (expected to have a “messages” key).token_encoding_name
: The token encoding to use (default is “cl100k_base”).
-
Behavior:
- Uses the
tiktoken
library to encode each message. - Sums the lengths of the encoded messages.
- Returns at least 1 if no tokens were counted.
- Uses the
task_id_generator #
-
Signature:
def task_id_generator()
-
Docstring:
A simple infinite generator for task IDs: 0, 1, 2, 3, … -
Behavior:
- Yields an ever-incrementing integer each time it is called.
run_task_with_timeout #
-
Signature:
async def run_task_with_timeout(task: ParallelTask, retry_queue: asyncio.Queue, save_filepath: Optional[str], status_tracker: StatusTracker, request_timeout: float) → None
-
Docstring:
Invokestask.call_api(...)
with an overall timeout. -
Parameters:
task
: The ParallelTask instance.retry_queue
: Queue to which the task is re-queued on retry if needed.save_filepath
: Optional file path to save task results.status_tracker
: The StatusTracker instance maintaining global stats.request_timeout
: A float timeout (in seconds) for the API request.
-
Behavior:
- Uses
asyncio.wait_for
to enforce the timeout. - On timeout, appends a timeout error message to the task results.
- If attempts remain, schedules a retry with an updated backoff.
- On final timeout failure, calls
_save_failed
.
- Uses
process_tasks_in_parallel #
-
Signature:
async def process_tasks_in_parallel(tasks_data: List[dict], client: Any, max_requests_per_minute: float = 1000, max_tokens_per_minute: float = 1000000, max_attempts: int = 3, token_encoding_name: str = "cl100k_base", logging_level: int = logging.INFO, save_filepath: Optional[str] = None, show_progress: bool = True, return_results: bool = True, request_timeout: float = 5.0) → Tuple[Optional[Dict[str, Any]], StatusTracker]
-
Docstring:
Main orchestrator for concurrent tasks with rate-limiting, retry, concurrency control, and optional result capture. -
Key Concepts and Behavior:
-
Rate Limiting:
Uses available capacity for both the number of requests and tokens per minute. The capacity is refilled over time. -
Concurrency Control:
Loads tasks into an async queue and monitors task progress with a progress bar (tqdm
) if enabled. -
Retry Logic:
Tasks encountering errors (rate limit, server errors, unknown exceptions) are re-queued with exponential backoff. -
Task Scheduling:
Continuously checks if tasks (new or retried) are eligible for execution based on the current capacities. -
Progress and Result Reporting:
Updates a shared status tracker and returns the final aggregated results (if enabled) along with usage statistics. -
Error Notifications:
Logs warnings if rate-limit errors are encountered frequently and informs about enterprise version requirements if thresholds are exceeded.
-
EnterpriseVersionRequiredError #
- Class:
EnterpriseVersionRequiredError(Exception)
- Docstring:
Raised when the maximum limits (e.g., more than 1000 requests per minute or over 1,000,000 tokens per minute) are exceeded. - Behavior:
- Provides a message prompting users to request an enterprise demo if higher throughput is required.
Package Initialization #
File: flashlearn/core/__init__.py
-
Docstring:
Initializes thecore
subpackage of FlashLearn. -
Purpose and Behavior:
- Marks the directory as a subpackage.
- Exposes key classes and functions from the core module.
- Exports:
FlashLiteLLMClient
StatusTracker
ParallelTask
append_to_jsonl
token_count_for_task
run_task_with_timeout
process_tasks_in_parallel
This allows users to import these components directly via:
from flashlearn.core import FlashLiteLLMClient
Testing and Quality Assurance #
The module includes a robust test suite to ensure every component functions as expected. The tests are organized under the flashlearn/core/tests/
directory and cover:
-
Flash Client Tests:
- Instantiation of the
FlashLiteLLMClient
. - Validation of the presence of the
chat
property. - Ensuring that
chat.completions.create
sets theno-log
flag in its kwargs.
- Instantiation of the
-
Orchestration Tests:
- append_to_jsonl: Verifies proper JSONL writing behavior.
- token_count_for_task: Confirms accurate token counting.
- ParallelTask Methods:
- Tests for
_extract_function_call_arguments
correctness under multiple scenarios:- Successful parsing with a function definition.
- Fallback parsing when function definition is missing.
- Handling of parse errors.
- Tests for
_save_success
and_save_failed
methods to ensure proper file writing and status updates.
- Tests for
- run_task_with_timeout:
- Checks that tasks complete under the timeout.
- Validates that timeout errors re-queue tasks when applicable.
- process_tasks_in_parallel:
- Verifies complete processing of tasks, including scenarios with success, retry after an API error, and rate-limit or timeout errors.
- Uses mocking to simulate API responses and time progression for stable testing without real delays.
These comprehensive tests ensure that the orchestration engine operates reliably under various operational conditions and error scenarios.