Source code for pharia_skill.studio

import os
from typing import Optional, cast
from urllib.parse import urljoin

import requests
from dotenv import load_dotenv
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from pydantic import BaseModel
from requests.exceptions import ConnectionError, HTTPError, MissingSchema


[docs] class OutdatedPhariaAI(Exception): def __str__(self) -> str: return ( "This version of the SDK requires requires your PhariaAI instance to be at " "least on feature set 251000. Please ask your operator to upgrade. If this " "is not a possibility, please downgrade to version `0.19.x` of the SDK." )
[docs] class StudioProject(BaseModel): name: str description: Optional[str]
[docs] class StudioClient: """Client for communicating with Pharia Studio. The Studio instance is determined by the environment variable `PHARIA_STUDIO_ADDRESS`. Attributes: project_id (int, required): The unique identifier of the project currently in use. """ def __init__(self, project_name: str) -> None: """Initializes the client. Runs a health check to check for a valid url of the Studio connection. It does not check for a valid authentication token, which happens later. Args: project_name (str, required): The human readable identifier provided by the user. """ load_dotenv() self._token = os.environ["PHARIA_AI_TOKEN"] self.url = os.environ["PHARIA_STUDIO_ADDRESS"] self._auth_headers = {"Authorization": f"Bearer {self._token}"} self._headers = { "Accept": "application/json", **self._auth_headers, } self._check_connection() self._project_name = project_name self._project_id: str | None = None
[docs] def exporter(self) -> OTLPSpanExporter: """Create an OTLP exporter for Studio. This exporter uses OpenTelemetry's OTLP HTTP/PROTOBUF exporter to send traces directly to Studio's traces_v2 endpoint. """ self.assert_new_trace_endpoint_is_available() return OTLPSpanExporter( endpoint=self.trace_endpoint, headers=self._auth_headers )
@property def trace_endpoint(self) -> str: """The OTEL compatible grpc endpoint Studio offers to export traces to.""" return urljoin(self.url, f"/api/projects/{self.project_id}/traces_v2")
[docs] @classmethod def with_project(cls, project_name: str) -> "StudioClient": """Set up a client for a project. Will create the project if it does not exist. """ studio_client = StudioClient(project_name=project_name) if (project_id := studio_client._get_project(project_name)) is None: project_id = studio_client.create_project(project_name) assert project_id is not None studio_client._project_id = project_id return studio_client
def _check_connection(self) -> None: url = urljoin(self.url, "/health") try: response = requests.get(url, headers=self._headers) except MissingSchema: raise ValueError( "The given url of the studio client is invalid. Make sure to include http:// in your url." ) from None except ConnectionError: raise ValueError( "The given url of the studio client does not point to a server." ) from None try: response.raise_for_status() except requests.HTTPError: raise ValueError( f"The given url of the studio client does not point to a healthy studio: {response.status_code}: {response.text}" ) from None @property def project_id(self) -> str: "The unique project_id for the project_name as assigned by Pharia Studio" if self._project_id is None: if (project_id := self._get_project(self._project_name)) is None: raise ValueError( f"Project {self._project_name} was not available. Consider creating it with `StudioClient.create_project`." ) self._project_id = project_id return self._project_id def _get_project(self, project: str) -> str | None: url = urljoin(self.url, "/api/projects") response = requests.get(url, headers=self._headers) response.raise_for_status() all_projects = response.json() try: project_of_interest = next( proj for proj in all_projects if proj["name"] == project ) return str(project_of_interest["project_id"]) except StopIteration: return None
[docs] def create_project(self, project: str, description: Optional[str] = None) -> str: """Creates a project in Studio. Projects are uniquely identified by the user provided name. Args: project (str, required): User provided name of the project. description (str, optional, default None): Description explaining the usage of the project. Returns: The ID of the newly created project. """ url = urljoin(self.url, "/api/projects") data = StudioProject(name=project, description=description) response = requests.post( url, data=data.model_dump_json(), headers=self._headers, ) match response.status_code: case 409: raise ValueError("Project already exists") case _: response.raise_for_status() return cast(str, response.json())
[docs] def assert_new_trace_endpoint_is_available(self) -> None: """Assert that the trace v2 endpoint accepting traces as protobuf is available. Historically, Studio supported trace ingestion via a custom format and endpoint. Starting with feature set 251000, Studio offers an OTEL compatible endpoint, that we are now using in the SDK. Since the SDK is shipped independently of PhariaAI, we need to check if the new endpoint is available. """ try: self.list_traces() except HTTPError as e: if e.response.status_code == 404: raise OutdatedPhariaAI from None raise
[docs] def list_traces(self) -> list[str]: """Helper method for tests to assert on the traces that have been ingested.""" url = urljoin(self.url, f"/api/projects/{self.project_id}/traces_v2") response = requests.get(url, headers=self._headers) response.raise_for_status() return cast(list[str], response.json()["traces"])
[docs] def delete_project(self) -> None: """Helper method for tests to delete a project.""" url = urljoin(self.url, f"/api/projects/{self.project_id}") response = requests.delete(url, headers=self._headers) response.raise_for_status()