SDK Reference

Module Contents

class After(value: datetime)[source]

Bases: object

class AgentInput(*, messages: list[AgentMessage])[source]

Bases: BaseModel

as_chat_messages() list[Message][source]
class AgentMessage(*, role: Literal['user', 'agent'], content: str)[source]

Bases: BaseModel

as_chat_message() Message[source]
class AtOrAfter(value: datetime)[source]

Bases: object

class AtOrBefore(value: datetime)[source]

Bases: object

class Before(value: datetime)[source]

Bases: object

class ChatParams(max_tokens: int | None = None, max_completion_tokens: int | None = None, temperature: float | None = None, top_p: float | None = None, frequency_penalty: float | None = None, presence_penalty: float | None = None, logprobs: TopLogprobs | Literal['no'] | Literal['sampled'] = 'no', tools: list[Tool] | None = None, tool_choice: Literal['none', 'auto', 'required'] | NamedToolChoice | None = None, parallel_tool_calls: bool | None = None, response_format: Literal['text', 'json_object'] | JsonSchema | None = None, reasoning_effort: ReasoningEffort | None = None)[source]

Bases: object

Chat request parameters.

max-tokens

The maximum number of tokens that can be generated in the chat completion. This value can be used to control costs for text generated via API. This value is now deprecated by OpenAI in favor of max_completion_tokens, and is not compatible with OpenAI o-series models. Whether to set max-tokens or max-completion-tokens is an inference provider specific decision. While some inference providers like GitHub models and the Aleph Alpha inference expect the user to set max-tokens, OpenAI deprecated it in favor of max-completion-tokens. For OpenAI reasoning models, settings max-tokens raises an error.

Type:

int, optional, default None

max-completion-tokens

An upper bound for the number of tokens that can be generated for a completion, including visible output tokens and reasoning tokens. Only supported by distinct inference providers (e.g. OpenAI). Note that either max-tokens or max-completion-tokens can be set, but not both.

Type:

int, optional, default None

temperature

The randomness with which the next token is selected.

Type:

float, optional, default None

top-p

The probability total of next tokens the model will choose from.

Type:

float, optional, default None

frequency-penalty

The presence penalty reduces the probability of generating tokens that are already present in the generated text respectively prompt. Presence penalty is independent of the number of occurrences. Increase the value to reduce the probability of repeating text.

Type:

float, optional, default None

presence-penalty

The presence penalty reduces the probability of generating tokens that are already present in the generated text respectively prompt. Presence penalty is independent of the number of occurrences. Increase the value to reduce the probability of repeating text.

Type:

float, optional, default None

logprobs

Use this to control the logarithmic probabilities you want to have returned. This is useful to figure out how likely it had been that this specific token had been sampled.

Type:

Logprobs, optional, default NoLogprobs()

tools

A list of tools the model may call. Use this to provide a list of functions the model may generate JSON inputs for. A max of 128 functions are supported.

Type:

list[Function], optional, default None

parallel-tool-calls

Whether to allow the model to call multiple tools in parallel.

Type:

bool, optional, default None

reasoning-effort

Constrains effort on reasoning for reasoning models.

Type:

ReasoningEffort, optional, default None

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]
class ChatRequest(model: str, messages: list[~pharia_skill.csi.inference.types.Message], params: ~pharia_skill.csi.inference.inference.ChatParams = <factory>)[source]

Bases: object

A request for a model to generate a response from a conversation.

Only one optional “system” message is allowed at the beginning of the conversation. The remaining conversation must alternate between “user” and “assistant” messages, and must begin with a “user” message.

model

Name of model to use.

Type:

str, required

messages

A list of messages comprising the conversation so far.

Type:

list[Message], required

params

Parameters for the requested chat.

Type:

ChatParams, optional, Default ChatParams()

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]

The attributes specified by the GenAI Otel Semantic convention.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

Note that the list of attributes specified here is currently not complete, as we are still in exploring the conventions.

class ChatResponse(message: Message, finish_reason: FinishReason, logprobs: list[Distribution], usage: TokenUsage)[source]

Bases: object

The result of a chat request.

message

The generated message.

Type:

Message

finish_reason

Why the model finished completing.

Type:

FinishReason

logprobs

Contains the logprobs for the sampled and top n tokens, given that chat-request.params.logprobs has been set to sampled or top.

Type:

list[Distribution]

usage

Usage statistics for the chat request.

Type:

TokenUsage

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]

The attributes specified by the GenAI Otel Semantic convention.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

static from_dict(body: dict[str, Any]) ChatResponse[source]
class ChatStreamResponse[source]

Bases: ABC

Abstract base class for streaming chat responses.

This class provides the core functionality for streaming chat from a model. Concrete implementations only need to implement the next() method to provide the next event in the stream, and optionally override __enter__ and __exit__ methods for proper resource management.

The __enter__ and __exit__ methods are particularly important for implementations that need to manage external resources. For example, in the WitCsi implementation, these methods ensure that resources are properly released when the stream is no longer needed.

The content of the message can be streamed by calling stream(). If finish_reason() or usage() has been called, the stream is consumed.

role

The role of the message.

Type:

str, required

consume_message() Message[source]

A helper method that extracts the contained message from a chat stream.

This method consumes the stream and only returns the entire messages as long as the stream has not been consumed. It can be useful for testing purposes, where you are interested in the content of the entire message and not in the individual events. In case the stream has already been consumed, an empty message is returned.

Example:

def test_my_prompt():
    user = Message.user("What is the meaning of life?")
    with csi.chat_stream("llama-3.1-8b-instruct", [user]) as response:
        message = response.consume_message()

    assert message.content == "42"
Returns:

The message of the chat request.

finish_reason() FinishReason[source]

The reason the model finished generating.

next() MessageBegin | Reasoning | MessageAppend | FinishReason | TokenUsage | ToolCallEvent | None[source]

Get the next chat event.

If there are events stored in the internal buffer, use them as event source. Otherwise, get the next event from the stream. Keeping track of events in the buffer allows others to peek at the next stream event without altering the stream. An example where this is necessary is when checking for a tool call.

stream() Generator[MessageAppend | Reasoning, None, None][source]

Stream the content of the message.

This does not include the role, any tool calls, or the finish reason and usage. If you are using the tool calling abilties, you should check via tool_calls() to see if the model is calling a tool.

tool_calls() list[ToolCall] | None[source]

Inspect the stream to find out if the model is calling a tool.

This method must be called before the stream is consumed. A typical usage pattern would be to check for the tool call, and, if there is none, stream the rest of the message. In case the response is not a tool call, normally only one element of the stream needs to be inspected, so the impact is minimal. However, in edge scenarios, the full stream might need to be inspected.

Returns:

The tool call if there is one in the request, otherwise None.

Example:

response = csi.chat_stream("llama-3.1-8b-instruct", [system, user], params)
tool_call = response.tool_call()
if tool_call:
    # Handle the tool call
else:
    writer.forward_response(response)
usage() TokenUsage[source]

Usage statistics for the chat request.

class Chunk(text: str, character_offset: int)[source]

Bases: object

Chunk object with offset information.

text

The text that was chunked

Type:

str, required

character_offset

The character offset relative to the start of the original text

Type:

int, required

class ChunkParams(model: str, max_tokens: int, overlap: int = 0)[source]

Bases: object

Chunking parameters.

model

The name of the model the chunk is intended to be used for. This must be a known model.

Type:

str, required

max_tokens

The maximum number of tokens that should be returned per chunk.

Type:

int, required

overlap

The amount of allowed overlap between chunks. Must be less than max_tokens. By default, there is no overlap between chunks.

Type:

int, optional, default 0

class ChunkRequest(text: str, params: ChunkParams)[source]

Bases: object

Chunking request parameters.

text

The text to be chunked.

Type:

str, required

params

Parameter used for chunking.

Type:

ChunkParams, required

serialize() dict[str, Any][source]
class Completion(text: str, finish_reason: FinishReason, logprobs: list[Distribution], usage: TokenUsage)[source]

Bases: object

The result of a completion, including the text generated as well as why the model finished completing.

text

The text generated by the model.

Type:

str, required

finish-reason

The reason the model finished generating.

Type:

FinishReason, required

logprobs

Contains the logprobs for the sampled and top n tokens, given that completion-request.params.logprobs has been set to sampled or top.

Type:

list[Distribution], required

usage

Usage statistics for the completion request.

Type:

TokenUsage, required

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]

The attributes specified by the GenAI Otel Semantic convention.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

classmethod from_dict(body: dict[str, Any]) Completion[source]
class CompletionParams(max_tokens: int | None = None, temperature: float | None = None, top_k: int | None = None, top_p: float | None = None, stop: list[str] = <factory>, return_special_tokens: bool = True, frequency_penalty: float | None = None, presence_penalty: float | None = None, logprobs: ~pharia_skill.csi.inference.inference.TopLogprobs | ~typing.Literal['no'] | ~typing.Literal['sampled'] = 'no', echo: bool = False)[source]

Bases: object

Completion request parameters.

max-tokens

The maximum tokens that should be inferred. Note, the backing implementation may return less tokens due to other stop reasons.

Type:

int, optional, default None

temperature

The randomness with which the next token is selected.

Type:

float, optional, default None

top-k

The number of possible next tokens the model will choose from.

Type:

int, optional, default None

top-p

The probability total of next tokens the model will choose from.

Type:

float, optional, default None

stop

A list of sequences that, if encountered, the API will stop generating further tokens.

Type:

list(str), optional, default []

return_special_tokens

Whether to include special tokens (e.g. <|endoftext|>, <|python_tag|>) in the completion response.

Type:

bool, optional, default True

frequency-penalty

The presence penalty reduces the probability of generating tokens that are already present in the generated text respectively prompt. Presence penalty is independent of the number of occurrences. Increase the value to reduce the probability of repeating text.

Type:

float, optional, default None

presence-penalty

The presence penalty reduces the probability of generating tokens that are already present in the generated text respectively prompt. Presence penalty is independent of the number of occurrences. Increase the value to reduce the probability of repeating text.

Type:

float, optional, default None

logprobs

Use this to control the logarithmic probabilities you want to have returned. This is useful to figure out how likely it had been that this specific token had been sampled.

Type:

Logprobs, optional, default NoLogprobs()

echo

Whether to include the prompt in the completion response. This parameter is not supported for streaming requests.

Type:

bool, optional, default False

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]

The attributes specified by the GenAI Otel Semantic convention.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

class CompletionRequest(model: str, prompt: str, params: ~pharia_skill.csi.inference.inference.CompletionParams = <factory>)[source]

Bases: object

Request a completion from the model

model

Name of model to use.

Type:

str, required

prompt

The text to be completed.

Type:

str, required

params

Parameters for the requested completion.

Type:

CompletionParams, optional, Default CompletionParams()

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]

The attributes specified by the GenAI Otel Semantic convention.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

class CompletionStreamResponse[source]

Bases: ABC

Abstract base class for streaming completion responses.

This class provides the core functionality for streaming completion from a model. Concrete implementations only need to implement the next() method to provide the next event in the stream, and optionally override __enter__ and __exit__ methods for proper resource management.

The __enter__ and __exit__ methods are particularly important for implementations that need to manage external resources. For example, in the WitCsi implementation, these methods ensure that resources are properly released when the stream is no longer needed.

finish_reason() FinishReason[source]

The reason the model finished generating.

abstractmethod next() CompletionAppend | FinishReason | TokenUsage | None[source]

Get the next completion event.

stream() Generator[CompletionAppend, None, None][source]

Stream completion chunks.

usage() TokenUsage[source]

Usage statistics for the completion request.

class Csi(*args, **kwargs)[source]

Bases: Protocol

The Cognitive System Interface (CSI) is a protocol that allows skills to interact with the PhariaEngine.

Most functionality in the CSI is offered in two forms: As a single request, and as multiple concurrent requests. For all concurrent requests, it is guaranteed that the responses are returned in the same order as the requests. Therefore, our interface requires the user to provide Sequences, as we want the input to be ordered.

chat(model: str, messages: list[Message], params: ChatParams | None = None) ChatResponse[source]

Generate a model response from a list of messages comprising a conversation.

Compared to completions, chat requests introduces the messages concept, abstracting away the details of model-specific prompt formats. A message represents a single natural language turn in a conversation.

For more details, see <https://docs.aleph-alpha.com/products/apis/pharia-inference/chat-completions/>.

Parameters:
  • model (str, required) – Name of model to use.

  • messages (list[Message], required) – List of messages, alternating between messages from user and assistant.

  • params (ChatParams, optional, Default None) – Parameters used for the chat.

Examples:

system = Message.system("You are a helpful assistant.")
msg = Message.user("What is the capital of France?")
model = "llama-3.1-8b-instruct"
chat_response = csi.chat(model, [system, msg], ChatParams(max_tokens=64))
chat_concurrent(requests: Sequence[ChatRequest]) list[ChatResponse][source]

Generate model responses for a list of chat requests concurrently.

This represents the concurrent version of chat()

Parameters:

requests (list[ChatRequest], required) – List of chat requests.

Returns:

List of chat responses in the same order as the requests.

Return type:

list[ChatResponse]

chat_stream(model: str, messages: list[Message], params: ChatParams | None = None, tools: list[str] | None = None) ChatStreamResponse[source]

Chat with a model with automatic tool invocation.

While chat_stream_step allows to pass in tools that are then available to the model, it leaves the responsibility of executing the tool call to the caller. This method goes one step further and automatically executes the tool call. If the tool call fails, the model is informed about the failure and can try to recover with a different approach. Once the model returns a non-tool message, it is returned to the caller.

Parameters:
  • model (str, required) – Name of model to use.

  • messages (list[Message], required) – List of messages, alternating between messages from user and assistant.

  • params (ChatParams, optional, Default None) – Parameters used for the chat.

  • tools (list[str], optional, Default None) – List of tool names that are available to the model.

chat_stream_step(model: str, messages: list[Message], params: ChatParams | None = None) ChatStreamResponse[source]

Generate a model response from a list of messages comprising a conversation.

This method represents the streaming version of chat(). Instead of returning a single message, this method returns a ChatStreamResponse, allowing to receive the response in small chunks.

Parameters:
  • model (str, required) – Name of model to use.

  • messages (list[Message], required) – List of messages, alternating between messages from user and assistant.

  • params (ChatParams, optional, Default None) – Parameters used for the chat.

chunk(text: str, params: ChunkParams) list[Chunk][source]

Chunks a text into chunks according to params.

Parameters:
  • text (str, required) – Text to be chunked.

  • params (ChunkParams, required) – Parameter used for chunking, model and maximal number of tokens.

Examples:

text = "A very very very long text that can be chunked."
params = ChunkParams("llama-3.1-8b-instruct", max_tokens=5)
result = csi.chunk(text, params)
assert len(result) == 3
chunk_concurrent(requests: Sequence[ChunkRequest]) list[list[Chunk]][source]

Chunk a text into chunks concurrently.

Parameters:

requests (list[ChunkRequest], required) – List of chunk requests.

complete(model: str, prompt: str, params: CompletionParams | None = None) Completion[source]

Complete a prompt using a specific model.

Parameters:
  • model (str, required) – Name of model to use.

  • prompt (str, required) – The text to be completed. Prompts need to adhere to the format expected by the specified model.

  • params (CompletionParams, optional, Default None) – Parameters for the requested completion.

Examples:

prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>

You are a poet who strictly speaks in haikus.<|eot_id|><|start_header_id|>user<|end_header_id|>

{input.root}<|eot_id|><|start_header_id|>assistant<|end_header_id|>"""
params = CompletionParams(max_tokens=64)
completion = csi.complete("llama-3.1-8b-instruct", prompt, params)
complete_concurrent(requests: list[CompletionRequest]) list[Completion][source]

Complete multiple prompts concurrently.

This represents the concurrent version of complete().

Parameters:

requests (list[CompletionRequest], required) – List of completion requests.

Returns:

List of completions in the same order as the requests.

Return type:

list[Completion]

completion_stream(model: str, prompt: str, params: CompletionParams | None = None) CompletionStreamResponse[source]

Complete a prompt using a specific model.

This method represents the streaming version of complete(). Instead of returning a single completion, this method returns a CompletionStreamResponse, allowing to receive the response in small chunks.

Parameters:
  • model (str, required) – Name of model to use.

  • prompt (str, required) – The text to be completed.

  • params (CompletionParams, optional, Default None) – Parameters for the requested completion.

document(document_path: DocumentPath) Document[source]

Fetch a document from the Document Index.

Parameters:

document_path (DocumentPath, required) – The document path to get the document from.

Examples:

document_path = DocumentPath("f13", "wikipedia-de", "Heidelberg")
document = csi.document(document_path)
assert document.path == document_path
document_metadata(document_path: DocumentPath) dict[str, Any] | list[Any] | str | int | float | bool | None[source]

Return the metadata of a document in the Document Index.

Parameters:

document_path (DocumentPath, required) – The document path to get metadata from.

documents(document_paths: Sequence[DocumentPath]) list[Document][source]

Fetch multiple documents from the Document Index.

The documents are guaranteed to be returned in the same order as the document paths.

Parameters:

document_paths (list[DocumentPath], required) – The document paths to get the documents from.

Returns:

List of documents in the same order as the provided document paths.

Return type:

list[Document]

documents_metadata(document_paths: Sequence[DocumentPath]) list[dict[str, Any] | list[Any] | str | int | float | bool | None][source]

Return the metadata of multiple documents in the Document Index.

The metadata is guaranteed to be returned in the same order as the document paths.

Parameters:

document_paths (list[DocumentPath], required) – The document paths to get metadata from.

Returns:

List of metadata in the same order as the provided document paths.

Return type:

list[JsonSerializable]

invoke_tool(name: str, **kwargs: JsonValue) ToolOutput[source]

Invoke a tool that is configured with the Engine.

Tools can be configured for each namespace by listing MCP servers in the namespace config. The Engine then exposes the tools of these MCP servers to Skills. The list of available tools per namespace can be queried from the Engine API.

Parameters:
  • name (str, required) – Name of the tool to invoke.

  • **kwargs (JsonValue, required) – Arguments to pass to the tool.

Raises:

ToolError – If the tool invocation fails.

invoke_tool_concurrent(requests: Sequence[InvokeRequest]) list[ToolOutput | ToolError][source]

Invoke multiple tools concurrently.

This function does not raise an error if a tool invocation fails, but rather returns a ToolResult, which can either be a ToolOutput or a ToolError. The reason for this is that for concurrent tool invocations, raising an error would prevent the caller from accessing the results of other tool calls.

Parameters:

requests (list[InvokeRequest], required) – List of invoke requests.

Returns:

List of tool results in the same order as the requests.

Return type:

list[ToolResult]

list_tools() list[Tool][source]

List all tools that are available to the skill.

Returns:

List of tools.

Return type:

list[Tool]

search(index_path: IndexPath, query: str, max_results: int = 1, min_score: float | None = None, filters: list[Without | With | WithOneOf] | None = None) list[SearchResult][source]

Search an existing Index in the Document Index.

Parameters:
  • index_path (IndexPath, required) – Index path in the Document Index to access.

  • query (str, required) – Text to be search for.

  • max_results (int, optional, Default 1) – Maximal number of results.

  • min_score (float, optional, Default None) – Minimal score for result to be included.

  • filters (list[SearchFilter], optional, Default None) – Filters to be applied to the search.

Examples:

index_path = IndexPath("f13", "wikipedia-de", "luminous-base-asymmetric-64")
query = "What is the population of Heidelberg?"
result = csi.search(index_path, query)
r0 = result[0]
"Heidelberg" in r0.content, "Heidelberg" in r0.document_path.name # True, True
search_concurrent(requests: Sequence[SearchRequest]) list[list[SearchResult]][source]

Execute multiple search requests against the Document Index.

Parameters:

requests (list[SearchRequest], required) – List of search requests.

Returns:

List of search results in the same order as the requests.

Return type:

list[list[SearchResult]]

select_language(text: str, languages: list[Language]) Language | None[source]

Select the detected language for the provided input based on the list of possible languages.

If no language matches, None is returned.

Parameters:
  • text (str, required) – Text input.

  • languages (list[Language], required) – All languages that should be considered during detection.

Examples:

text = "Ich spreche Deutsch nur ein bisschen."
languages = [Language.English, Language.German]
result = csi.select_language(text, languages)
select_language_concurrent(requests: Sequence[SelectLanguageRequest]) list[Language | None][source]

Detect the language for multiple texts concurrently.

Parameters:

requests (list[SelectLanguageRequest], required) – List of select language requests.

Returns:

List of detected languages in the same order as the requests.

Return type:

list[Language | None]

class Cursor(item: int, position: int)[source]

Bases: object

A position within a document.

The cursor is always inclusive of the current position, in both start and end positions.

item

Index of the item in the document. A document is an array of text and image elements. These elements are referred to as items.

Type:

int

position

The character position the cursor can be found at within the string.

Type:

int

class Distribution(sampled: Logprob, top: list[Logprob])[source]

Bases: object

classmethod from_dict(body: dict[str, Any]) Distribution[source]
class Document(path: DocumentPath, contents: list[Text | Image], metadata: dict[str, Any] | list[Any] | str | int | float | bool | None)[source]

Bases: object

A document in the Document Index.

path

The path that identifies the document.

Type:

DocumentPath

contents

The contents of the document. Split into sections of different modalities.

Type:

list[Modality]

metadata

The (custom) metadata of the document.

Type:

JsonSerializable

class DocumentPath(namespace: str, collection: str, name: str)[source]

Bases: object

Path identifying a document.

A DocumentPath consists of a namespace, within the namespace a collection and within the collection a document has a name.

A user might want to filter for unique documents. By making DocumentPath a frozen dataclass, we ensure that it is hashable and a user can use a set to filter for unique ones before requesting the documents.

namespace

The namespace.

Type:

str

collection

The collection within the namespace.

Type:

str

name

The name identifying the document in the collection.

Type:

str

class EqualTo(value: str | int | bool)[source]

Bases: object

class FinishReason(value)[source]

Bases: str, Enum

The reason the model finished generating.

STOP

The model hit a natural stopping point or a provided stop sequence.

LENGTH

The maximum number of tokens specified in the request was reached.

CONTENT_FILTER

Content was omitted due to a flag from content filters.

TOOL_CALLS

The model called a tool.

as_gen_ai_otel_attributes() dict[str, AttributeValue][source]

How to format the finish reason as a GenAI attribute.

The OTel spec specifies two possibilities: Either including it in the message (gen_ai.output.messages.0.finish_reason) or as a separate field on the response (gen_ai.response.finish_reason). Langfuse get’s a bit confused by the first option, so we use the second.

class GreaterThan(value: float)[source]

Bases: object

class GreaterThanOrEqualTo(value: float)[source]

Bases: object

class Image(modality: Literal['image'] = 'image')[source]

Bases: object

An image that is part of a document.

At the moment, we do not expose the image contents, as none of the models support multi-modal inputs. We still inform the developer that the document contains an image.

class IndexPath(namespace: str, collection: str, index: str)[source]

Bases: object

Which documents you want to search in, and which type of index should be used.

namespace

The namespace the collection belongs to.

Type:

string

collection

The collection you want to search in.

Type:

string

index

The search index you want to use for the collection.

Type:

str

class InvokeRequest(name: str, arguments: dict[str, JsonValue])[source]

Bases: object

class IsNull(value: Literal[True] = True)[source]

Bases: object

class Language(value)[source]

Bases: str, Enum

ISO 639-3 language.

class LessThan(value: float)[source]

Bases: object

class LessThanOrEqualTo(value: float)[source]

Bases: object

class Logprob(token: bytes, logprob: Annotated[float, BeforeValidator(func=none_to_nan, json_schema_input_type=PydanticUndefined)])[source]

Bases: object

Logarithmic probability of the token returned in the completion.

classmethod convert_token_to_bytes(v: Any) Any[source]
try_as_utf8() str | None[source]

Try to decode the token as utf-8.

A token may also represent just a part of an utf-8 character, in which case it does not have a valid utf-8 encoding on its own.

class Message(role: Role, content: str | None, reasoning_content: str | None = None, tool_calls: list[ToolCall] | None = None, tool_call_id: str | None = None)[source]

Bases: object

A single turn in a conversation.

Parameters:
  • role (Role, required) – The role of the message.

  • content (str, required) – The content of the message.

  • reasoning_content (str, optional) – Reasoning trace the model emitted. This is only returned for reasoning-capable models. Even if provided as part of the message history for consecutive requests, the reasoning trace of previous responses is not rendered as part of the prompt for multi-turn conversations.

as_gen_ai_otel_attributes() dict[str, Any][source]

Format as specified by OpenTelemetry GenAI semantic conventions.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

classmethod assistant(content: str | None, tool_calls: list[ToolCall] | None = None, reasoning_content: str | None = None) Self[source]
classmethod developer(content: str) Self[source]
classmethod from_dict(body: dict[str, Any]) Self[source]
classmethod system(content: str) Self[source]
classmethod tool(content: str, tool_call_id: str) Self[source]
classmethod user(content: str) Self[source]
class MessageAppend(text: str)[source]

Bases: object

class MessageBegin(role: str | None)[source]

Bases: object

class MessageEnd(payload: Payload | None)[source]

Bases: Generic[Payload]

class MessageWriter(*args, **kwargs)[source]

Bases: Protocol, Generic[Payload]

Write messages to the output stream.

append_to_message(text: str) None[source]

Write some text chunks to the output stream.

append_to_reasoning(text: str) None[source]

Write some reasoning chunks to the output stream.

begin_message(role: str | None = None) None[source]
end_message(payload: Payload | None = None) None[source]
forward_response(response: CompletionStreamResponse | ChatStreamResponse, payload: Callable[[...], Payload] | Payload | None = None) None[source]

Forward the response of a chat completion to the output stream.

For chat requests, this forwards both the reasoning and message chunks. If you need more fine-grained control over what is being exposed, use the append_to_message and `append_to_reasoning methods directly.

write(item: MessageBegin | Reasoning | MessageAppend | MessageEnd[Payload]) None[source]
class MetadataFilter(field: str, condition: GreaterThan | GreaterThanOrEqualTo | LessThan | LessThanOrEqualTo | After | AtOrAfter | Before | AtOrBefore | EqualTo | IsNull)[source]

Bases: object

Matches sections whose metadata fields match the given condition. You must specify the field, and can only specify a single condition.

While the Document Index also offers a Modality filter, we do not expose this to the developer. The reasoning is that we only allow for text modalities in the Engine. So for each search request, we append a Modality filter that only allows for text modalities.

field

The metadata field on which to filter search results. Field names must only contain alphanumeric characters, dashes and underscores. Nested fields can be specified using dot notation (e.g. ‘a.b’). Array-valued fields can either use a wildcard specifier (e.g. ‘a[].b’) or a specific index (e.g. ‘a[1].b’). The maximum length of the field name is 1000 characters.

Type:

str

condition

The condition to filter on.

Type:

FilterCondition

serialize() dict[str, Any][source]

How to serialize a metadata filter to a dictionary.

It would be nice to specify this as a model_serializer and let pydantic handle the serialization. However, as we are already doing custom serialization on the outside, and this is not a Pydantic model we could call .model_dump() on, it seems to be the simplest solution to just implement the serialization manually.

class Role(value)[source]

Bases: str, Enum

A role used for a message in a chat.

class SearchRequest(index_path: ~pharia_skill.csi.document_index.IndexPath, query: str, max_results: int = 1, min_score: float | None = None, filters: list[~pharia_skill.csi.document_index.Without | ~pharia_skill.csi.document_index.With | ~pharia_skill.csi.document_index.WithOneOf] = <factory>)[source]

Bases: object

A request to search the document index.

index_path

The index path to search in.

Type:

IndexPath

query

The query to search for.

Type:

str

max_results

Maximum number of results to return. Defaults to 1.

Type:

int

min_score

Filter out results with a cosine similarity score below this value. Scores range from -1 to 1. For searches on hybrid indexes, the Document Index applies the min_score to the semantic results before fusion of result sets. As fusion re-scores results, returned scores may exceed this value.

Type:

float | None

filters

A filter for search results that restricts the results to those document sections that match the filter criteria. The individual conditions of this array are AND-combined (i.e. all conditions must match). This can for example be used to restrict the returned sections based on their modality (i.e. image or text), or on their metadata.

Type:

list[SearchFilter]

serialize() dict[str, Any][source]
class SearchResult(document_path: DocumentPath, content: str, score: float, start: Cursor, end: Cursor)[source]

Bases: object

The relevant documents as result of a search request.

document_path

The path to a document. A path uniquely identifies a document among all managed documents.

Type:

DocumentPath

content

The text of the found section. As we do not support multi-modal, this is always a string.

Type:

str

score

Search score of the found section, where a higher score indicates a closer match. Will be between -1 and 1. A score closer to -1 indicates the section opposes the query. A score close 0 suggests the section is unrelated to the query. A score close to 1 suggests the section is related to the query.

Type:

float

start

Where the result starts in the document.

Type:

Cursor

end

Where the result ends in the document.

Type:

Cursor

class SelectLanguageRequest(text: str, languages: list[Language])[source]

Bases: object

Select the detected language for the provided input based on the list of possible languages. If no language matches, None is returned.

text

Text input

Type:

str, required

languages

All languages that should be considered during detection.

Type:

list[Language], required

class Text(text: str, modality: Literal['text'] = 'text')[source]

Bases: object

A text section that is part of a document.

If the document only contains text, then the contents of the document is a list of length one, where the only element is a Text.

class TokenUsage(prompt: int, completion: int)[source]

Bases: object

Usage statistics for the completion request.

as_gen_ai_otel_attributes() dict[str, int][source]

The attributes specified by the GenAI Otel Semantic convention.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

class Tool(name: str, description: str | None = None, parameters: dict[str, Any] | None = None, strict: bool | None = None)[source]

Bases: object

A tool that the model can call.

exception ToolError(message: str)[source]

Bases: Exception

The error message in case the tool invocation failed.

A tool error can have different causes. The tool might not have been found, the arguments to the tool might have been in the wrong format, there could have been an error while connecting to the tool, or there could have been an error executing the tool.

message: str
class ToolOutput(contents: list[str])[source]

Bases: object

The output of a tool invocation.

A tool result is a list of modalities. See <https://modelcontextprotocol.io/specification/2025-03-26/server/tools#tool-result>. At the moment, the Engine only supports text modalities.

Most tools will return a content list of size 1.

as_message(tool_call_id: str) Message[source]

Render the tool output to a message.

text() str[source]

Append all text contents to a single string.

While the MCP specification allows for multiple modalities, in most cases MCP tools will return a single text modality. This property allows accessing the text content of the tool output as a single string.

class TopLogprobs(top: int)[source]

Bases: object

Request between 0 and 20 tokens

class With(value: list[MetadataFilter])[source]

Bases: object

Logical conjunction, i.e. forms the predicate “filterCondition1 AND filterCondition2 AND …”

value

The list of filter conditions.

Type:

list[Filter]

serialize() dict[str, list[Any]][source]
class WithOneOf(value: list[MetadataFilter])[source]

Bases: object

Logical disjunction, i.e. forms the predicate “filterCondition1 OR filterCondition2 OR …”

value

The list of filter conditions.

Type:

list[Filter]

serialize() dict[str, list[Any]][source]
class Without(value: list[MetadataFilter])[source]

Bases: object

Logical conjunction of negations, i.e. forms the predicate “(NOT filterCondition1) AND (NOT filterCondition2) AND …”

value

The list of filter conditions.

Type:

list[Filter]

serialize() dict[str, list[Any]][source]
agent(func: Callable[[Csi, MessageWriter[None], AgentInput], None]) Callable[[Csi, MessageWriter[None], AgentInput], None][source]

Define agents that can be deployed on PhariaEngine.

While the message_stream and skill decorator leave the developer some room to define the input and output of the skill, the agent decorator is more opinionated. By being more opinionated, we aim to (later) expose these agents via A2A from the Engine. Before doing this, and propagating the concepts into the WIT world, we can already create value for developers by introducing a CLI based way to interact with these agents. In it’s [core concepts](https://a2a-protocol.org/latest/topics/key-concepts/), A2A defines message and task concepts. While we are not ready to support the task concept in the Engine, Agents can also be valuable without it.

An example can be found [here](https://a2a-protocol.org/latest/specification/#92-basic-execution-synchronous-polling-style), where the agent responds quickly with a message, without creating a task. A2A supports both streaming and non-streaming responses, but we’ll start with only streaming ones.

message_stream(func: Callable[[Csi, MessageWriter[Payload], UserInput], None]) Callable[[Csi, MessageWriter[Payload], UserInput], None][source]

Turn a function with a specific signature into a (streaming) skill that can be deployed on PhariaEngine.

By using the response object, a Skill decorated with @message_stream can return intermediate results that are streamed to the caller.

The decorated function must be typed. It must have exactly three arguments. The first argument must be of type Csi. The second argument must be a Response object. The third argument must be a Pydantic model. The function must not return anything.

Example:

from pharia_skill import Csi, ChatParams, Message, message_stream, MessageWriter
from pydantic import BaseModel
from pharia_skill.csi.inference import FinishReason

class Input(BaseModel):
    topic: str

class SkillOutput(BaseModel):
    finish_reason: FinishReason

@message_stream
def haiku_stream(csi: Csi, writer: MessageWriter[SkillOutput], input: Input) -> None:
    model = "llama-3.1-8b-instruct"
    messages = [
        Message.system("You are a poet who strictly speaks in haikus."),
        Message.user(input.topic),
    ]
    params = ChatParams()
    with csi.chat_stream(model, messages, params) as response:
        writer.begin_message()
        for event in response.stream():
            writer.append_to_message(event.content)
        writer.end_message(SkillOutput(finish_reason=response.finish_reason()))
skill(func: Callable[[Csi, UserInput], UserOutput]) Callable[[Csi, UserInput], UserOutput][source]

Turn a function with a specific signature into a skill that can be deployed on PhariaEngine.

The decorated function must be typed. It must have exactly two input arguments. The first argument must be of type Csi. The second argument must be a Pydantic model. The type of the return value must also be a Pydantic model. Each module is expected to have only one function that is decorated with skill.

Example:

from pharia_skill import ChatParams, Csi, Message, skill
from pydantic import BaseModel

class Input(BaseModel):
    topic: str

class Output(BaseModel):
    haiku: str

@skill
def run(csi: Csi, input: Input) -> Output:
    system = Message.system("You are a poet who strictly speaks in haikus.")
    user = Message.user(input.topic)
    params = ChatParams(max_tokens=64)
    response = csi.chat("llama-3.1-8b-instruct", [system, user], params)
    return Output(haiku=response.message.content.strip())

Submodules

Studio Module

exception OutdatedPhariaAI[source]

Bases: Exception

class StudioClient(project_name: str)[source]

Bases: object

Client for communicating with Pharia Studio.

The Studio instance is determined by the environment variable PHARIA_STUDIO_ADDRESS.

project_id

The unique identifier of the project currently in use.

Type:

int, required

assert_new_trace_endpoint_is_available() None[source]

Assert that the trace v2 endpoint accepting traces as protobuf is available.

Historically, Studio supported trace ingestion via a custom format and endpoint. Starting with feature set 251000, Studio offers an OTEL compatible endpoint, that we are now using in the SDK. Since the SDK is shipped independently of PhariaAI, we need to check if the new endpoint is available.

create_project(project: str, description: str | None = None) str[source]

Creates a project in Studio.

Projects are uniquely identified by the user provided name.

Parameters:
  • project (str, required) – User provided name of the project.

  • description (str, optional, default None) – Description explaining the usage of the project.

Returns:

The ID of the newly created project.

delete_project() None[source]

Helper method for tests to delete a project.

exporter() OTLPSpanExporter[source]

Create an OTLP exporter for Studio.

This exporter uses OpenTelemetry’s OTLP HTTP/PROTOBUF exporter to send traces directly to Studio’s traces_v2 endpoint.

list_traces() list[str][source]

Helper method for tests to assert on the traces that have been ingested.

classmethod with_project(project_name: str) StudioClient[source]

Set up a client for a project.

Will create the project if it does not exist.

class StudioProject(*, name: str, description: str | None)[source]

Bases: BaseModel

Testing Module

Two implementations of the Cognitive System Interface (CSI) that can be used for testing and development.

When Skills are run in the Engine, the CSI is provided via an Application Binary Interface. This interface is defined via the Wasm Interface Type (WIT) language. For development and debugging, Skills can also run in a local Python environment. The CSI which is available to the Skill at runtime can be substituted with a DevCSI which is backed by HTTP requests against a running instance of the Engine. Developers can write tests, step through their Python code and inspect the state of variables.

class DevCsi(namespace: str | None = None, project: str | None = None)[source]

Bases: Csi

The DevCsi can be used for testing Skill code locally against a PhariaEngine.

This implementation of Cognitive System Interface (CSI) is backed by a running instance of PhariaEngine via HTTP. This enables Skill developers to run and test Skills against the same services that are used by the PhariaEngine.

The DevCsi supports trace exports to different collectors. If you want to support traces to PhariaStudio, simply provide a project name on construction. If not set, a default exporter will be loaded from the corresponding environment variables.

Parameters:
  • namespace – The namespace to use for tool invocations.

  • project – The name of the studio project to export traces to. Will be created if it does not exist.

Examples:

# import your skill
from haiku import run

# create a `CSI` instance, optionally with trace export to Studio
csi = DevCsi(project="my-project")

# Run your skill
input = Input(topic="The meaning of life")
result = run(csi, input)

assert "42" in result.haiku

The following environment variables are required:

If you want to export traces to PhariaStudio, set:

If you want to export traces to Langfuse, set:

  • OTEL_EXPORTER_OTLP_ENDPOINT=https://cloud.langfuse.com/api/public/otel/v1/traces

  • OTEL_EXPORTER_OTLP_HEADERS (Langfuse basic auth string; example: “Authorization=Basic ${AUTH_STRING}”)

See <https://langfuse.com/integrations/native/opentelemetry> on how to generate the basic auth string.

chat_concurrent(requests: Sequence[ChatRequest]) list[ChatResponse][source]

Generate model responses for a list of chat requests concurrently.

This method adds GenAI specific tracing attributes to the span. Until we figure out how to do tracing for multiple requests, we can at least provide some GenAI specific attributes for the single request case.

See <https://opentelemetry.io/docs/specs/semconv/registry/attributes/gen-ai/#genai-attributes> for more details.

chunk_concurrent(requests: Sequence[ChunkRequest]) list[list[Chunk]][source]

Chunk a text into chunks concurrently.

Parameters:

requests (list[ChunkRequest], required) – List of chunk requests.

complete_concurrent(requests: Sequence[CompletionRequest]) list[Completion][source]

Generate model responses for a list of completion requests concurrently.

This method adds GenAI specific tracing attributes to the span. Until we figure out how to do tracing for multiple requests, we can at least provide some GenAI specific attributes for the single request case.

documents(document_paths: Sequence[DocumentPath]) list[Document][source]

Fetch multiple documents from the Document Index.

The documents are guaranteed to be returned in the same order as the document paths.

Parameters:

document_paths (list[DocumentPath], required) – The document paths to get the documents from.

Returns:

List of documents in the same order as the provided document paths.

Return type:

list[Document]

documents_metadata(document_paths: Sequence[DocumentPath]) list[dict[str, Any] | list[Any] | str | int | float | bool | None][source]

Return the metadata of multiple documents in the Document Index.

The metadata is guaranteed to be returned in the same order as the document paths.

Parameters:

document_paths (list[DocumentPath], required) – The document paths to get metadata from.

Returns:

List of metadata in the same order as the provided document paths.

Return type:

list[JsonSerializable]

classmethod existing_exporter() SpanExporter | None[source]

Return the first exporter that has been set on the DevCsi.

invoke_tool_concurrent(requests: Sequence[InvokeRequest]) list[ToolOutput | ToolError][source]

Invoke multiple tools concurrently.

This function does not raise an error if a tool invocation fails, but rather returns a ToolResult, which can either be a ToolOutput or a ToolError. The reason for this is that for concurrent tool invocations, raising an error would prevent the caller from accessing the results of other tool calls.

Parameters:

requests (list[InvokeRequest], required) – List of invoke requests.

Returns:

List of tool results in the same order as the requests.

Return type:

list[ToolResult]

list_tools() list[Tool][source]

List all tools that are available to the skill.

Returns:

List of tools.

Return type:

list[Tool]

static provider() TracerProvider[source]

Tracer provider for the current thread.

Check if the tracer provider is already set and if not, set it.

run(function: str, data: dict[str, Any]) Any[source]
search_concurrent(requests: Sequence[SearchRequest]) list[list[SearchResult]][source]

Execute multiple search requests against the Document Index.

Parameters:

requests (list[SearchRequest], required) – List of search requests.

Returns:

List of search results in the same order as the requests.

Return type:

list[list[SearchResult]]

select_language_concurrent(requests: Sequence[SelectLanguageRequest]) list[Language | None][source]

Detect the language for multiple texts concurrently.

Parameters:

requests (list[SelectLanguageRequest], required) – List of select language requests.

Returns:

List of detected languages in the same order as the requests.

Return type:

list[Language | None]

classmethod set_span_exporter(exporter: SpanExporter) None[source]

Set a span exporter for Studio if it has not been set yet.

This method overwrites any existing exporters, thereby ensuring that there are never two exporters to Studio attached at the same time.

stream(function: str, data: dict[str, Any], span: Span) Generator[Event, None, None][source]

Stream events from the client.

While the DevCsi is responsible for tracing, streaming requires a different approach, because the DevCsi may already go out of scope, even if the completion has not been fully streamed. Therefore, the responsibility moves to the DevChatStreamResponse and DevCompletionStreamResponse classes.

However, if an error occurs while constructing each one of these classes, we need to notify the span about the error in here.

class MessageRecorder[source]

Bases: MessageWriter[Payload]

A message writer that can be passed into a message_stream skill at testing time.

It allows to inspect the output that a skill produces, either via the items property that stored individual chunks that have been written or via the messages method that aggregates the items into a list of messages.

The MessageRecorder also validates the stream of items that are written to it.

Example:

from pharia_skill import Csi, message_stream, MessageAppend, MessageBegin, MessageEnd
from pharia_skill.testing import MessageWriter, MessageRecorder, RecordedMessage

@message_stream
def my_skill(csi: Csi, writer: MessageWriter, input: Input) -> None:
    ...

def test_my_skill():
    csi = DevCsi()
    writer = MessageRecorder()
    input = Input(topic="The meaning of life")

    my_skill(csi, writer, input)

    assert writer.messages() == [
        RecordedMessage(role="assistant", content="The meaning of life"),
    ]
messages() list[RecordedMessage][source]

Convenience method to aggregate the streamed items into a list of messages.

Message items are validated when they are written, so we assume that the list is valid.

skill_output() str[source]

Serialized output of the skill.

In constrast to a skill, a message_stream does not define a concrete output schema. It can yield different type of events, and their order is determined at runtime. In some scenarios, e.g. when testing the skill, a user might be interested in an aggregated view of these events. This is provided by the messages method. Studio can also render skill output. This method converts the recorded messages into a representation that can be rendered by Studio.

static validate(existing: list[MessageBegin | Reasoning | MessageAppend | MessageEnd[Payload]], item: MessageBegin | Reasoning | MessageAppend | MessageEnd[Payload]) None[source]

Is it legal to append this item to the previous items?

There are three rules that must be followed:

  1. The first item must be a MessageBegin.

  2. Consecutive MessageBegin`s must be preceded by a `MessageEnd.

  3. A MessageEnd must not be preceded by MessageEnd.

write(item: MessageBegin | Reasoning | MessageAppend | MessageEnd[Payload]) None[source]

Store and validate the streamed items.

Validating the stream here gives the developer early feedback at test time.

class RecordedMessage(*, role: str | None, content: str = '', reasoning_content: str = '', payload: Payload | None = None)[source]

Bases: BaseModel, Generic[Payload]

class StubCsi(*args, **kwargs)[source]

Bases: Csi

The StubCsi can be used to mock out the CSI for testing purposes.

You can use this class directly, or inherit from it and load it up with your own expectations. Suppose you want to test a Skill that uses the chat method, and want to mock out the response from the LLM to run your tests faster:

Example:

from pharia_skill import Csi, skill

@skill
def run(csi: Csi, input: Input) -> Output:
    system = Message.system("You are a poet who strictly speaks in haikus.")
    user = Message.user(input.topic)
    params = ChatParams(max_tokens=64)
    response = csi.chat("llama-3.1-8b-instruct", [system, user], params)
    return Output(haiku=response.message.content.strip())

class CustomMockCsi(StubCsi):
    def chat(self, model: str, messages: list[Message], params: ChatParams) -> ChatResponse:
        message = Message.assistant("Whispers in the dark\nEchoes of a fleeting dream\nMeaning lost in space")
        return ChatResponse(message=message, finish_reason=FinishReason.STOP)

def test_run():
    csi = CustomMockCsi()
    result = run(csi, Input(topic="The meaning of life"))
    assert result.haiku == "Whispers in the dark\nEchoes of a fleeting dream\nMeaning lost in space"
chat_concurrent(requests: Sequence[ChatRequest]) list[ChatResponse][source]

Generate model responses for a list of chat requests concurrently.

This represents the concurrent version of chat()

Parameters:

requests (list[ChatRequest], required) – List of chat requests.

Returns:

List of chat responses in the same order as the requests.

Return type:

list[ChatResponse]

chunk_concurrent(requests: Sequence[ChunkRequest]) list[list[Chunk]][source]

Chunk a text into chunks concurrently.

Parameters:

requests (list[ChunkRequest], required) – List of chunk requests.

complete_concurrent(requests: Sequence[CompletionRequest]) list[Completion][source]

Complete multiple prompts concurrently.

This represents the concurrent version of complete().

Parameters:

requests (list[CompletionRequest], required) – List of completion requests.

Returns:

List of completions in the same order as the requests.

Return type:

list[Completion]

documents(document_paths: Sequence[DocumentPath]) list[Document][source]

Fetch multiple documents from the Document Index.

The documents are guaranteed to be returned in the same order as the document paths.

Parameters:

document_paths (list[DocumentPath], required) – The document paths to get the documents from.

Returns:

List of documents in the same order as the provided document paths.

Return type:

list[Document]

documents_metadata(document_paths: Sequence[DocumentPath]) list[dict[str, Any] | list[Any] | str | int | float | bool | None][source]

Return the metadata of multiple documents in the Document Index.

The metadata is guaranteed to be returned in the same order as the document paths.

Parameters:

document_paths (list[DocumentPath], required) – The document paths to get metadata from.

Returns:

List of metadata in the same order as the provided document paths.

Return type:

list[JsonSerializable]

invoke_tool_concurrent(requests: Sequence[InvokeRequest]) list[ToolOutput | ToolError][source]

Invoke multiple tools concurrently.

This function does not raise an error if a tool invocation fails, but rather returns a ToolResult, which can either be a ToolOutput or a ToolError. The reason for this is that for concurrent tool invocations, raising an error would prevent the caller from accessing the results of other tool calls.

Parameters:

requests (list[InvokeRequest], required) – List of invoke requests.

Returns:

List of tool results in the same order as the requests.

Return type:

list[ToolResult]

list_tools() list[Tool][source]

List all tools that are available to the skill.

Returns:

List of tools.

Return type:

list[Tool]

search_concurrent(requests: Sequence[SearchRequest]) list[list[SearchResult]][source]

Execute multiple search requests against the Document Index.

Parameters:

requests (list[SearchRequest], required) – List of search requests.

Returns:

List of search results in the same order as the requests.

Return type:

list[list[SearchResult]]

select_language_concurrent(requests: Sequence[SelectLanguageRequest]) list[Language | None][source]

Detect the language for multiple texts concurrently.

Parameters:

requests (list[SelectLanguageRequest], required) – List of select language requests.

Returns:

List of detected languages in the same order as the requests.

Return type:

list[Language | None]