From 3d1f275c57228746cd490cdb93b0a762ff533aab Mon Sep 17 00:00:00 2001 From: Javi Pacheco Date: Mon, 4 Mar 2024 14:52:09 +0100 Subject: [PATCH 1/4] First approach for assistant metrics --- .../functional/xef/llm/MetricManagement.kt | 7 + .../xef/llm/assistants/Assistant.kt | 12 +- .../xef/llm/assistants/AssistantThread.kt | 124 ++++++++------ .../functional/xef/metrics/LogsMetric.kt | 46 +++++ .../xebia/functional/xef/metrics/Metric.kt | 23 +++ .../xebia/functional/xef/assistants/DSL.kt | 9 +- .../OpenTelemetryAssistantState.kt | 158 ++++++++++++++++++ .../xef/opentelemetry/OpenTelemetryMetric.kt | 16 ++ 8 files changed, 336 insertions(+), 59 deletions(-) create mode 100644 integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt index 6cd4e1657..58e94f7a2 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt @@ -1,7 +1,9 @@ package com.xebia.functional.xef.llm import com.xebia.functional.openai.models.CreateChatCompletionResponse +import com.xebia.functional.openai.models.RunObject import com.xebia.functional.xef.conversation.Conversation +import com.xebia.functional.xef.metrics.Metric import com.xebia.functional.xef.prompt.Prompt suspend fun CreateChatCompletionResponse.addMetrics( @@ -41,3 +43,8 @@ suspend fun Prompt.addMetrics(conversation: Conversation) { if (functions.isNotEmpty()) conversation.metric.parameter("openai.chat_completion.functions", functions.map { it.name }) } + +suspend fun RunObject.addMetrics(metric: Metric): RunObject { + metric.assistantCreateRun(this) + return this +} diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt index d670da7cc..a508d3a87 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt @@ -8,7 +8,7 @@ import com.xebia.functional.openai.models.CreateAssistantRequest import com.xebia.functional.openai.models.ModifyAssistantRequest import com.xebia.functional.openai.models.ext.assistant.AssistantTools import com.xebia.functional.xef.llm.fromEnvironment -import io.ktor.client.statement.* +import com.xebia.functional.xef.metrics.Metric import io.ktor.util.logging.* import kotlinx.serialization.KSerializer import kotlinx.serialization.json.JsonElement @@ -18,6 +18,7 @@ import kotlinx.serialization.json.JsonPrimitive class Assistant( val assistantId: String, val toolsConfig: List> = emptyList(), + val metric: Metric = Metric.EMPTY, private val assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), private val api: AssistantApi = fromEnvironment(::AssistantApi) ) { @@ -25,9 +26,10 @@ class Assistant( constructor( assistantObject: AssistantObject, toolsConfig: List> = emptyList(), + metric: Metric = Metric.EMPTY, assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), api: AssistantApi = fromEnvironment(::AssistantApi) - ) : this(assistantObject.id, toolsConfig, assistantsApi, api) + ) : this(assistantObject.id, toolsConfig, metric, assistantsApi, api) suspend fun get(): AssistantObject = assistantsApi.getAssistant(assistantId).body() @@ -35,6 +37,7 @@ class Assistant( Assistant( api.modifyAssistant(assistantId, modifyAssistantRequest).body(), toolsConfig, + metric, assistantsApi, api ) @@ -71,6 +74,7 @@ class Assistant( fileIds: List = arrayListOf(), metadata: JsonObject? = null, toolsConfig: List> = emptyList(), + metric: Metric = Metric.EMPTY, assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), api: AssistantApi = fromEnvironment(::AssistantApi) ): Assistant = @@ -85,6 +89,7 @@ class Assistant( metadata = metadata ), toolsConfig, + metric, assistantsApi, api ) @@ -92,11 +97,12 @@ class Assistant( suspend operator fun invoke( request: CreateAssistantRequest, toolsConfig: List> = emptyList(), + metric: Metric, assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), api: AssistantApi = fromEnvironment(::AssistantApi) ): Assistant { val response = assistantsApi.createAssistant(request) - return Assistant(response.body(), toolsConfig, assistantsApi, api) + return Assistant(response.body(), toolsConfig, metric, assistantsApi, api) } } } diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt index bed765abc..80c7f52f1 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt @@ -3,22 +3,27 @@ package com.xebia.functional.xef.llm.assistants import arrow.fx.coroutines.parMap import com.xebia.functional.openai.apis.AssistantsApi import com.xebia.functional.openai.infrastructure.ApiClient +import com.xebia.functional.openai.infrastructure.HttpResponse import com.xebia.functional.openai.models.* import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsMessageCreationObject import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsToolCallsObject import com.xebia.functional.openai.models.ext.assistant.RunStepObjectStepDetails +import com.xebia.functional.xef.llm.addMetrics import com.xebia.functional.xef.llm.fromEnvironment +import com.xebia.functional.xef.metrics.Metric import kotlin.jvm.JvmName import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runBlocking import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject class AssistantThread( val threadId: String, + val metric: Metric = Metric.EMPTY, private val api: AssistantsApi = fromEnvironment(::AssistantsApi) ) { @@ -28,24 +33,16 @@ class AssistantThread( AssistantThread(api.modifyThread(threadId, request).body().id) suspend fun createMessage(message: MessageWithFiles): MessageObject = - api - .createMessage( - threadId, - CreateMessageRequest( - role = CreateMessageRequest.Role.user, - content = message.content, - fileIds = message.fileIds - ) + createMessage( + CreateMessageRequest( + role = CreateMessageRequest.Role.user, + content = message.content, + fileIds = message.fileIds ) - .body() + ) suspend fun createMessage(content: String): MessageObject = - api - .createMessage( - threadId, - CreateMessageRequest(role = CreateMessageRequest.Role.user, content = content) - ) - .body() + createMessage(CreateMessageRequest(role = CreateMessageRequest.Role.user, content = content)) suspend fun createMessage(request: CreateMessageRequest): MessageObject = api.createMessage(threadId, request).body() @@ -56,12 +53,12 @@ class AssistantThread( suspend fun listMessages(): List = api.listMessages(threadId).body().data suspend fun createRun(request: CreateRunRequest): RunObject = - api.createRun(threadId, request).body() + api.createRun(threadId, request).body().addMetrics(metric) suspend fun getRun(runId: String): RunObject = api.getRun(threadId, runId).body() suspend fun createRun(assistant: Assistant): RunObject = - api.createRun(threadId, CreateRunRequest(assistantId = assistant.assistantId)).body() + createRun(CreateRunRequest(assistantId = assistant.assistantId)) suspend fun run(assistant: Assistant): Flow { val run = createRun(assistant) @@ -98,27 +95,28 @@ class AssistantThread( emit( RunDelta.Run( RunObject( - id = runId, - `object` = RunObject.Object.thread_run, - createdAt = 0, - threadId = threadId, - assistantId = assistant.assistantId, - status = RunObject.Status.failed, - lastError = - RunObjectLastError( - code = RunObjectLastError.Code.server_error, - message = e.message ?: "Unknown error" - ), - startedAt = null, - cancelledAt = null, - failedAt = null, - completedAt = null, - model = "", - instructions = "", - tools = emptyList(), - fileIds = emptyList(), - metadata = null - ) + id = runId, + `object` = RunObject.Object.thread_run, + createdAt = 0, + threadId = threadId, + assistantId = assistant.assistantId, + status = RunObject.Status.failed, + lastError = + RunObjectLastError( + code = RunObjectLastError.Code.server_error, + message = e.message ?: "Unknown error" + ), + startedAt = null, + cancelledAt = null, + failedAt = null, + completedAt = null, + model = "", + instructions = "", + tools = emptyList(), + fileIds = emptyList(), + metadata = null + ) + .addMetrics(metric) ) ) } finally { @@ -130,7 +128,7 @@ class AssistantThread( runId: String, cache: MutableSet ): RunObject { - val run = getRun(runId) + val run = metric.assistantCreateRun(runId) { runBlocking { getRun(runId) } } if (run !in cache) { cache.add(run) emit(RunDelta.Run(run)) @@ -162,7 +160,11 @@ class AssistantThread( runId: String, cache: MutableSet ) { - val steps = runSteps(runId) + + val steps = runSteps(runId).map { + metric.assistantCreateRunStep(runId) { it } + } + steps.forEach { step -> val calls = step.stepDetails.toolCalls() // .filter { @@ -200,20 +202,23 @@ class AssistantThread( toolCall.id to result } .toMap() - api.submitToolOuputsToRun( - threadId = threadId, - runId = runId, - submitToolOutputsRunRequest = + + metric.assistantToolOutputsRun(runId) { + api.submitToolOuputsToRun( + threadId = threadId, + runId = runId, + submitToolOutputsRunRequest = SubmitToolOutputsRunRequest( toolOutputs = - results.map { (toolCallId, result) -> - SubmitToolOutputsRunRequestToolOutputsInner( - toolCallId = toolCallId, - output = ApiClient.JSON_DEFAULT.encodeToString(result) - ) - } + results.map { (toolCallId, result) -> + SubmitToolOutputsRunRequestToolOutputsInner( + toolCallId = toolCallId, + output = ApiClient.JSON_DEFAULT.encodeToString(result) + ) + } ) - ) + ).body() + } } } } @@ -224,6 +229,7 @@ class AssistantThread( suspend operator fun invoke( messages: List, metadata: JsonObject? = null, + metric: Metric = Metric.EMPTY, api: AssistantsApi = fromEnvironment(::AssistantsApi) ): AssistantThread = AssistantThread( @@ -242,6 +248,7 @@ class AssistantThread( ) .body() .id, + metric, api ) @@ -249,6 +256,7 @@ class AssistantThread( suspend operator fun invoke( messages: List, metadata: JsonObject? = null, + metric: Metric = Metric.EMPTY, api: AssistantsApi = fromEnvironment(::AssistantsApi) ): AssistantThread = AssistantThread( @@ -263,6 +271,7 @@ class AssistantThread( ) .body() .id, + metric, api ) @@ -270,18 +279,25 @@ class AssistantThread( suspend operator fun invoke( messages: List = emptyList(), metadata: JsonObject? = null, + metric: Metric = Metric.EMPTY, api: AssistantsApi = fromEnvironment(::AssistantsApi) ): AssistantThread = - AssistantThread(api.createThread(CreateThreadRequest(messages, metadata)).body().id, api) + AssistantThread( + api.createThread(CreateThreadRequest(messages, metadata)).body().id, + metric, + api + ) suspend operator fun invoke( request: CreateThreadRequest, + metric: Metric = Metric.EMPTY, api: AssistantsApi = fromEnvironment(::AssistantsApi) - ): AssistantThread = AssistantThread(api.createThread(request).body().id, api) + ): AssistantThread = AssistantThread(api.createThread(request).body().id, metric, api) suspend operator fun invoke( request: CreateThreadAndRunRequest, + metric: Metric = Metric.EMPTY, api: AssistantsApi = fromEnvironment(::AssistantsApi) - ): AssistantThread = AssistantThread(api.createThreadAndRun(request).body().id, api) + ): AssistantThread = AssistantThread(api.createThreadAndRun(request).body().id, metric, api) } } diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt index 3830161b1..e188033f7 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt @@ -1,6 +1,8 @@ package com.xebia.functional.xef.metrics import arrow.atomic.AtomicInt +import com.xebia.functional.openai.models.RunObject +import com.xebia.functional.openai.models.RunStepObject import com.xebia.functional.xef.prompt.Prompt import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.Level @@ -39,6 +41,50 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { return output } + override suspend fun assistantCreateRun(runObject: RunObject) { + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}" + } + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${runObject.threadId}" + } + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${runObject.id}" + } + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${runObject.status.value}" + } + } + + override suspend fun assistantCreateRun(runId: String, block: Metric.() -> RunObject): RunObject { + val output = block() + assistantCreateRun(output) + return output + } + + override suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject { + val output = block() + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}" + } + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${output.threadId}" + } + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${output.runId}" + } + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${output.status.value}" + } + return output + } + + override suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject { + val output = block() + assistantCreateRun(output) + return output + } + override suspend fun event(message: String) { logger.at(level) { this.message = "${writeIndent(numberOfBlocks.get())}|-- $message" } } diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt index 06e63f5de..6a1844002 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt @@ -1,5 +1,7 @@ package com.xebia.functional.xef.metrics +import com.xebia.functional.openai.models.RunObject +import com.xebia.functional.openai.models.RunStepObject import com.xebia.functional.xef.prompt.Prompt interface Metric { @@ -13,6 +15,14 @@ interface Metric { suspend fun parameter(key: String, values: List) + suspend fun assistantCreateRun(runObject: RunObject) + + suspend fun assistantCreateRun(runId: String, block: Metric.() -> RunObject): RunObject + + suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject + + suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject + companion object { val EMPTY: Metric = object : Metric { @@ -24,6 +34,19 @@ interface Metric { block: suspend Metric.() -> A ): A = block() + override suspend fun assistantCreateRun(runObject: RunObject) {} + + override suspend fun assistantCreateRun( + runId: String, + block: Metric.() -> RunObject + ): RunObject = block() + + override suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject = + block() + + override suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject = + block() + override suspend fun event(message: String) {} override suspend fun parameter(key: String, value: String) {} diff --git a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt index fb70c1c80..ddb05d2aa 100644 --- a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt +++ b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt @@ -6,6 +6,7 @@ import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsToolCallsO import com.xebia.functional.xef.llm.assistants.Assistant import com.xebia.functional.xef.llm.assistants.AssistantThread import com.xebia.functional.xef.llm.assistants.Tool +import com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric import io.ktor.client.* import kotlinx.serialization.Serializable @@ -37,12 +38,16 @@ suspend fun main() { // model = "gpt-4-1106-preview" // ) // println("generated assistant: ${assistant2.assistantId}") + + val metric = OpenTelemetryMetric() + val assistant = Assistant( assistantId = "asst_UxczzpJkysC0l424ood87DAk", toolsConfig = listOf(Tool.toolOf(SumTool())), + metric = metric ) - val thread = AssistantThread() + val thread = AssistantThread(metric = metric) println("Welcome to the Math tutor, ask me anything about math:") while (true) { println() @@ -127,5 +132,5 @@ private fun stepStatusEmoji(status: RunStepObject.Status) = } private fun displayRunStatus(run: AssistantThread.RunDelta.Run) { - println("Assistant: ${runStatusEmoji(run)}") + println("Assistant: ${runStatusEmoji(run)} - ${run.message.status} - ${run.message}") } diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt new file mode 100644 index 000000000..2ba5640a9 --- /dev/null +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt @@ -0,0 +1,158 @@ +package com.xebia.functional.xef.opentelemetry + +import com.xebia.functional.openai.models.RunObject +import com.xebia.functional.openai.models.RunStepDetailsToolCallsObjectToolCallsInner +import com.xebia.functional.openai.models.RunStepObject +import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsMessageCreationObject +import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsToolCallsObject +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.api.trace.Tracer +import io.opentelemetry.context.Context + +class OpenTelemetryAssistantState(private val tracer: Tracer) { + + private val runIds: MutableMap = mutableMapOf() + + fun runSpan(runObject: RunObject) { + + val parentOrRoot: Context = runObject.id.getOrCreateContext() + + val currentSpan = + tracer + .spanBuilder(runObject.status.value) + .setParent(parentOrRoot) + .setSpanKind(SpanKind.CLIENT) + .startSpan() + + try { + currentSpan.makeCurrent().use { runObject.setParameters(currentSpan) } + } finally { + currentSpan.end() + } + } + + fun runSpan(runId: String, block: () -> RunObject): RunObject { + + val parentOrRoot: Context = runId.getOrCreateContext() + + val currentSpan = + tracer + .spanBuilder("New Run: $runId") + .setParent(parentOrRoot) + .setSpanKind(SpanKind.CLIENT) + .startSpan() + + return try { + val output = block() + currentSpan.makeCurrent().use { + currentSpan.updateName(output.status.value) + output.setParameters(currentSpan) + } + output + } finally { + currentSpan.end() + } + } + + suspend fun toolOutputRunSpan(runId: String, block: suspend () -> RunObject): RunObject { + + val parentOrRoot: Context = runId.getOrCreateContext() + + val currentSpan = + tracer + .spanBuilder("New ToolOutput: $runId") + .setParent(parentOrRoot) + .setSpanKind(SpanKind.CLIENT) + .startSpan() + + return try { + val output = block() + currentSpan.makeCurrent().use { + currentSpan.updateName("ToolOutput: ${output.status.value}") + output.setParameters(currentSpan) + } + output + } finally { + currentSpan.end() + } + } + + fun runStepSpan(runId: String, block: () -> RunStepObject): RunStepObject { + + val parentOrRoot: Context = runId.getOrCreateContext() + + val currentSpan = + tracer + .spanBuilder("New RunStep: $runId") + .setParent(parentOrRoot) + .setSpanKind(SpanKind.CLIENT) + .startSpan() + + return try { + val output = block() + currentSpan.makeCurrent().use { + when (val detail = output.stepDetails) { + is RunStepDetailsMessageCreationObject -> { + currentSpan.updateName("Creating message: ${output.status.value}") + } + is RunStepDetailsToolCallsObject -> { + currentSpan.updateName("Tools: ${detail.toolCalls.joinToString { + when (it.type) { + RunStepDetailsToolCallsObjectToolCallsInner.Type.code_interpreter -> it.type.value + RunStepDetailsToolCallsObjectToolCallsInner.Type.retrieval -> it.type.value + RunStepDetailsToolCallsObjectToolCallsInner.Type.function -> it.function?.name ?: it.type.value + } + }}: ${output.status.value}") + } + } + output.setParameters(currentSpan) + } + output + } finally { + currentSpan.end() + } + } + + private fun String.getOrCreateContext(): Context { + val parent = runIds.get(this) + return if (parent == null) { + val newParent = tracer.spanBuilder("Run: $this").startSpan() + newParent.end() + val newContext = Context.current().with(newParent) + runIds[this] = newContext + newContext + } else parent + } + + private fun RunObject.setParameters(span: Span) { + span.setAttribute("openai.assistant.model", model) + span.setAttribute("openai.assistant.fileIds", fileIds.joinToString()) + span.setAttribute("openai.assistant.tools.count", tools.count().toString()) + span.setAttribute("openai.assistant.thread.id", threadId) + span.setAttribute("openai.assistant.assistant.id", assistantId) + span.setAttribute("openai.assistant.run.id", id) + span.setAttribute("openai.assistant.status", status.value) + } + + private fun RunStepObject.setParameters(span: Span) { + span.setAttribute("openai.assistant.type", type.value) + span.setAttribute("openai.assistant.thread.id", threadId) + span.setAttribute("openai.assistant.assistant.id", assistantId) + span.setAttribute("openai.assistant.run.id", runId) + span.setAttribute("openai.assistant.runStep.id", id) + span.setAttribute("openai.assistant.status", status.value) + when (val detail = stepDetails) { + is RunStepDetailsMessageCreationObject -> { + span.setAttribute("openai.assistant.messageCreation.id", detail.messageCreation.messageId) + } + is RunStepDetailsToolCallsObject -> { + detail.toolCalls.forEachIndexed { index, toolCall -> + span.setAttribute("openai.assistant.toolCalls.$index.type", toolCall.type.value) + span.setAttribute("openai.assistant.toolCalls.$index.function.name", toolCall.function?.name ?: "") + span.setAttribute("openai.assistant.toolCalls.$index.function.arguments", toolCall.function?.arguments ?: "") + } + } + } + } +} diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt index 0f2f646ae..bdac15ca7 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt @@ -1,5 +1,7 @@ package com.xebia.functional.xef.opentelemetry +import com.xebia.functional.openai.models.RunObject +import com.xebia.functional.openai.models.RunStepObject import com.xebia.functional.xef.metrics.Metric import com.xebia.functional.xef.prompt.Prompt import io.opentelemetry.api.trace.* @@ -12,6 +14,8 @@ class OpenTelemetryMetric( private val state = OpenTelemetryState(getTracer()) + private val assistantState = OpenTelemetryAssistantState(getTracer()) + override suspend fun customSpan(name: String, block: suspend Metric.() -> A): A = state.span(name) { block() } @@ -32,4 +36,16 @@ class OpenTelemetryMetric( private fun getTracer(scopeName: String? = null): Tracer = openTelemetry.getTracer(scopeName ?: config.defaultScopeName) + + override suspend fun assistantCreateRun(runObject: RunObject) = assistantState.runSpan(runObject) + + override suspend fun assistantCreateRun(runId: String, block: Metric.() -> RunObject): RunObject = + assistantState.runSpan(runId) { block() } + + override suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject = + assistantState.runStepSpan(runId) { block() } + + override suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject = + assistantState.toolOutputRunSpan(runId) { block() } + } From ae119fdf6c11865d07b641a420b07ed53286c62e Mon Sep 17 00:00:00 2001 From: Javi Pacheco Date: Mon, 4 Mar 2024 15:29:59 +0100 Subject: [PATCH 2/4] Support for messages --- .../xef/llm/assistants/AssistantThread.kt | 61 ++++++++++--------- .../functional/xef/metrics/LogsMetric.kt | 27 +++++++- .../xebia/functional/xef/metrics/Metric.kt | 37 ++++++++--- .../OpenTelemetryAssistantState.kt | 58 ++++++++++++++++-- .../xef/opentelemetry/OpenTelemetryMetric.kt | 29 ++++++--- 5 files changed, 158 insertions(+), 54 deletions(-) diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt index 80c7f52f1..0f78ccf42 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt @@ -3,7 +3,6 @@ package com.xebia.functional.xef.llm.assistants import arrow.fx.coroutines.parMap import com.xebia.functional.openai.apis.AssistantsApi import com.xebia.functional.openai.infrastructure.ApiClient -import com.xebia.functional.openai.infrastructure.HttpResponse import com.xebia.functional.openai.models.* import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsMessageCreationObject import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsToolCallsObject @@ -16,7 +15,6 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.runBlocking import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject @@ -87,7 +85,7 @@ class AssistantThread( while (run.status != RunObject.Status.completed) { checkSteps(assistant = assistant, runId = runId, cache = stepCache) delay(500) // To avoid excessive calls to OpenAI - checkMessages(cache = messagesCache) + checkMessages(runId, cache = messagesCache) delay(500) // To avoid excessive calls to OpenAI run = checkRun(runId = runId, cache = runCache) } @@ -120,7 +118,7 @@ class AssistantThread( ) ) } finally { - checkMessages(cache = messagesCache) + checkMessages(runId, cache = messagesCache) } } @@ -128,7 +126,7 @@ class AssistantThread( runId: String, cache: MutableSet ): RunObject { - val run = metric.assistantCreateRun(runId) { runBlocking { getRun(runId) } } + val run = metric.assistantCreateRun(runId) { getRun(runId) } if (run !in cache) { cache.add(run) emit(RunDelta.Run(run)) @@ -136,15 +134,22 @@ class AssistantThread( return run } - private suspend fun FlowCollector.checkMessages(cache: MutableSet) { - val messages = listMessages() - val updatedAndNewMessages = messages.filterNot { it in cache } - updatedAndNewMessages.forEach { message -> - val content = message.content.filterNot { it.text?.value?.isBlank() ?: true } - if (content.isNotEmpty() && message !in cache) { - cache.add(message) - emit(RunDelta.ReceivedMessage(message)) + private suspend fun FlowCollector.checkMessages( + runId: String, + cache: MutableSet + ) { + metric.assistantCreatedMessage(runId) { + val messages = mutableListOf() + val updatedAndNewMessages = listMessages().filterNot { it in cache } + updatedAndNewMessages.forEach { message -> + val content = message.content.filterNot { it.text?.value?.isBlank() ?: true } + if (content.isNotEmpty() && message !in cache) { + cache.add(message) + messages.add(message) + emit(RunDelta.ReceivedMessage(message)) + } } + messages } } @@ -161,9 +166,7 @@ class AssistantThread( cache: MutableSet ) { - val steps = runSteps(runId).map { - metric.assistantCreateRunStep(runId) { it } - } + val steps = runSteps(runId).map { metric.assistantCreateRunStep(runId) { it } } steps.forEach { step -> val calls = step.stepDetails.toolCalls() @@ -204,20 +207,22 @@ class AssistantThread( .toMap() metric.assistantToolOutputsRun(runId) { - api.submitToolOuputsToRun( - threadId = threadId, - runId = runId, - submitToolOutputsRunRequest = - SubmitToolOutputsRunRequest( - toolOutputs = - results.map { (toolCallId, result) -> - SubmitToolOutputsRunRequestToolOutputsInner( - toolCallId = toolCallId, - output = ApiClient.JSON_DEFAULT.encodeToString(result) + api + .submitToolOuputsToRun( + threadId = threadId, + runId = runId, + submitToolOutputsRunRequest = + SubmitToolOutputsRunRequest( + toolOutputs = + results.map { (toolCallId, result) -> + SubmitToolOutputsRunRequestToolOutputsInner( + toolCallId = toolCallId, + output = ApiClient.JSON_DEFAULT.encodeToString(result) + ) + } ) - } ) - ).body() + .body() } } } diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt index e188033f7..f982f003d 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt @@ -1,6 +1,7 @@ package com.xebia.functional.xef.metrics import arrow.atomic.AtomicInt +import com.xebia.functional.openai.models.MessageObject import com.xebia.functional.openai.models.RunObject import com.xebia.functional.openai.models.RunStepObject import com.xebia.functional.xef.prompt.Prompt @@ -56,13 +57,30 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { } } - override suspend fun assistantCreateRun(runId: String, block: Metric.() -> RunObject): RunObject { + override suspend fun assistantCreateRun( + runId: String, + block: suspend Metric.() -> RunObject + ): RunObject { val output = block() assistantCreateRun(output) return output } - override suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject { + override suspend fun assistantCreatedMessage( + runId: String, + block: suspend Metric.() -> List + ): List { + val output = block() + logger.at(level) { + this.message = "${writeIndent(numberOfBlocks.get())}|-- Size: ${output.size}" + } + return output + } + + override suspend fun assistantCreateRunStep( + runId: String, + block: suspend Metric.() -> RunStepObject + ): RunStepObject { val output = block() logger.at(level) { this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}" @@ -79,7 +97,10 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { return output } - override suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject { + override suspend fun assistantToolOutputsRun( + runId: String, + block: suspend Metric.() -> RunObject + ): RunObject { val output = block() assistantCreateRun(output) return output diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt index 6a1844002..2d3e3ccc3 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt @@ -1,5 +1,6 @@ package com.xebia.functional.xef.metrics +import com.xebia.functional.openai.models.MessageObject import com.xebia.functional.openai.models.RunObject import com.xebia.functional.openai.models.RunStepObject import com.xebia.functional.xef.prompt.Prompt @@ -17,11 +18,22 @@ interface Metric { suspend fun assistantCreateRun(runObject: RunObject) - suspend fun assistantCreateRun(runId: String, block: Metric.() -> RunObject): RunObject + suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject - suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject + suspend fun assistantCreatedMessage( + runId: String, + block: suspend Metric.() -> List + ): List - suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject + suspend fun assistantCreateRunStep( + runId: String, + block: suspend Metric.() -> RunStepObject + ): RunStepObject + + suspend fun assistantToolOutputsRun( + runId: String, + block: suspend Metric.() -> RunObject + ): RunObject companion object { val EMPTY: Metric = @@ -38,14 +50,23 @@ interface Metric { override suspend fun assistantCreateRun( runId: String, - block: Metric.() -> RunObject + block: suspend Metric.() -> RunObject ): RunObject = block() - override suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject = - block() + override suspend fun assistantCreatedMessage( + runId: String, + block: suspend Metric.() -> List + ): List = block() - override suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject = - block() + override suspend fun assistantCreateRunStep( + runId: String, + block: suspend Metric.() -> RunStepObject + ): RunStepObject = block() + + override suspend fun assistantToolOutputsRun( + runId: String, + block: suspend Metric.() -> RunObject + ): RunObject = block() override suspend fun event(message: String) {} diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt index 2ba5640a9..523e7f0e9 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt @@ -1,5 +1,6 @@ package com.xebia.functional.xef.opentelemetry +import com.xebia.functional.openai.models.MessageObject import com.xebia.functional.openai.models.RunObject import com.xebia.functional.openai.models.RunStepDetailsToolCallsObjectToolCallsInner import com.xebia.functional.openai.models.RunStepObject @@ -32,7 +33,7 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } - fun runSpan(runId: String, block: () -> RunObject): RunObject { + suspend fun runSpan(runId: String, block: suspend () -> RunObject): RunObject { val parentOrRoot: Context = runId.getOrCreateContext() @@ -78,7 +79,7 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } - fun runStepSpan(runId: String, block: () -> RunStepObject): RunStepObject { + suspend fun runStepSpan(runId: String, block: suspend () -> RunStepObject): RunStepObject { val parentOrRoot: Context = runId.getOrCreateContext() @@ -97,13 +98,15 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { currentSpan.updateName("Creating message: ${output.status.value}") } is RunStepDetailsToolCallsObject -> { - currentSpan.updateName("Tools: ${detail.toolCalls.joinToString { + currentSpan.updateName( + "Tools: ${detail.toolCalls.joinToString { when (it.type) { RunStepDetailsToolCallsObjectToolCallsInner.Type.code_interpreter -> it.type.value RunStepDetailsToolCallsObjectToolCallsInner.Type.retrieval -> it.type.value RunStepDetailsToolCallsObjectToolCallsInner.Type.function -> it.function?.name ?: it.type.value } - }}: ${output.status.value}") + }}: ${output.status.value}" + ) } } output.setParameters(currentSpan) @@ -114,6 +117,32 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } + suspend fun createdMessagesSpan( + runId: String, + block: suspend () -> List + ): List { + + val parentOrRoot: Context = runId.getOrCreateContext() + + val currentSpan = + tracer + .spanBuilder("New Run: $runId") + .setParent(parentOrRoot) + .setSpanKind(SpanKind.CLIENT) + .startSpan() + + return try { + val output = block() + currentSpan.makeCurrent().use { + currentSpan.updateName("Messages: ${output.size}") + output.setParameters(currentSpan) + } + output + } finally { + currentSpan.end() + } + } + private fun String.getOrCreateContext(): Context { val parent = runIds.get(this) return if (parent == null) { @@ -149,10 +178,27 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { is RunStepDetailsToolCallsObject -> { detail.toolCalls.forEachIndexed { index, toolCall -> span.setAttribute("openai.assistant.toolCalls.$index.type", toolCall.type.value) - span.setAttribute("openai.assistant.toolCalls.$index.function.name", toolCall.function?.name ?: "") - span.setAttribute("openai.assistant.toolCalls.$index.function.arguments", toolCall.function?.arguments ?: "") + span.setAttribute( + "openai.assistant.toolCalls.$index.function.name", + toolCall.function?.name ?: "" + ) + span.setAttribute( + "openai.assistant.toolCalls.$index.function.arguments", + toolCall.function?.arguments ?: "" + ) } } } } + + private fun List.setParameters(span: Span) { + span.setAttribute("openai.assistant.messages.count", size.toString()) + forEach { + span.setAttribute("openai.assistant.messages.${indexOf(it)}.role", it.role.value) + span.setAttribute( + "openai.assistant.messages.${indexOf(it)}.content", + it.content.firstOrNull()?.text?.value ?: "" + ) + } + } } diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt index bdac15ca7..79dee313d 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt @@ -1,5 +1,6 @@ package com.xebia.functional.xef.opentelemetry +import com.xebia.functional.openai.models.MessageObject import com.xebia.functional.openai.models.RunObject import com.xebia.functional.openai.models.RunStepObject import com.xebia.functional.xef.metrics.Metric @@ -34,18 +35,28 @@ class OpenTelemetryMetric( state.setAttribute(key, values) } - private fun getTracer(scopeName: String? = null): Tracer = - openTelemetry.getTracer(scopeName ?: config.defaultScopeName) - override suspend fun assistantCreateRun(runObject: RunObject) = assistantState.runSpan(runObject) - override suspend fun assistantCreateRun(runId: String, block: Metric.() -> RunObject): RunObject = - assistantState.runSpan(runId) { block() } + override suspend fun assistantCreateRun( + runId: String, + block: suspend Metric.() -> RunObject + ): RunObject = assistantState.runSpan(runId) { block() } - override suspend fun assistantCreateRunStep(runId: String, block: Metric.() -> RunStepObject): RunStepObject = - assistantState.runStepSpan(runId) { block() } + override suspend fun assistantCreatedMessage( + runId: String, + block: suspend Metric.() -> List + ): List = assistantState.createdMessagesSpan(runId) { block() } - override suspend fun assistantToolOutputsRun(runId: String, block: suspend Metric.() -> RunObject): RunObject = - assistantState.toolOutputRunSpan(runId) { block() } + override suspend fun assistantCreateRunStep( + runId: String, + block: suspend Metric.() -> RunStepObject + ): RunStepObject = assistantState.runStepSpan(runId) { block() } + override suspend fun assistantToolOutputsRun( + runId: String, + block: suspend Metric.() -> RunObject + ): RunObject = assistantState.toolOutputRunSpan(runId) { block() } + + private fun getTracer(scopeName: String? = null): Tracer = + openTelemetry.getTracer(scopeName ?: config.defaultScopeName) } From b5a44f314927e292b1e60389b42e1859bdb8fa56 Mon Sep 17 00:00:00 2001 From: Javi Pacheco Date: Tue, 5 Mar 2024 11:37:30 +0100 Subject: [PATCH 3/4] Adding usage on metrics --- .../xebia/functional/xef/llm/assistants/Assistant.kt | 11 ++--------- .../kotlin/com/xebia/functional/xef/assistants/DSL.kt | 11 +++++------ .../xef/opentelemetry/OpenTelemetryAssistantState.kt | 7 ++++++- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt index a508d3a87..de2721713 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/Assistant.kt @@ -8,7 +8,6 @@ import com.xebia.functional.openai.models.CreateAssistantRequest import com.xebia.functional.openai.models.ModifyAssistantRequest import com.xebia.functional.openai.models.ext.assistant.AssistantTools import com.xebia.functional.xef.llm.fromEnvironment -import com.xebia.functional.xef.metrics.Metric import io.ktor.util.logging.* import kotlinx.serialization.KSerializer import kotlinx.serialization.json.JsonElement @@ -18,7 +17,6 @@ import kotlinx.serialization.json.JsonPrimitive class Assistant( val assistantId: String, val toolsConfig: List> = emptyList(), - val metric: Metric = Metric.EMPTY, private val assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), private val api: AssistantApi = fromEnvironment(::AssistantApi) ) { @@ -26,10 +24,9 @@ class Assistant( constructor( assistantObject: AssistantObject, toolsConfig: List> = emptyList(), - metric: Metric = Metric.EMPTY, assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), api: AssistantApi = fromEnvironment(::AssistantApi) - ) : this(assistantObject.id, toolsConfig, metric, assistantsApi, api) + ) : this(assistantObject.id, toolsConfig, assistantsApi, api) suspend fun get(): AssistantObject = assistantsApi.getAssistant(assistantId).body() @@ -37,7 +34,6 @@ class Assistant( Assistant( api.modifyAssistant(assistantId, modifyAssistantRequest).body(), toolsConfig, - metric, assistantsApi, api ) @@ -74,7 +70,6 @@ class Assistant( fileIds: List = arrayListOf(), metadata: JsonObject? = null, toolsConfig: List> = emptyList(), - metric: Metric = Metric.EMPTY, assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), api: AssistantApi = fromEnvironment(::AssistantApi) ): Assistant = @@ -89,7 +84,6 @@ class Assistant( metadata = metadata ), toolsConfig, - metric, assistantsApi, api ) @@ -97,12 +91,11 @@ class Assistant( suspend operator fun invoke( request: CreateAssistantRequest, toolsConfig: List> = emptyList(), - metric: Metric, assistantsApi: AssistantsApi = fromEnvironment(::AssistantsApi), api: AssistantApi = fromEnvironment(::AssistantApi) ): Assistant { val response = assistantsApi.createAssistant(request) - return Assistant(response.body(), toolsConfig, metric, assistantsApi, api) + return Assistant(response.body(), toolsConfig, assistantsApi, api) } } } diff --git a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt index ddb05d2aa..ca47eb40e 100644 --- a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt +++ b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt @@ -6,8 +6,7 @@ import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsToolCallsO import com.xebia.functional.xef.llm.assistants.Assistant import com.xebia.functional.xef.llm.assistants.AssistantThread import com.xebia.functional.xef.llm.assistants.Tool -import com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric -import io.ktor.client.* +import com.xebia.functional.xef.metrics.Metric import kotlinx.serialization.Serializable @Serializable data class SumInput(val left: Int, val right: Int) @@ -39,13 +38,13 @@ suspend fun main() { // ) // println("generated assistant: ${assistant2.assistantId}") - val metric = OpenTelemetryMetric() + val metric = Metric.EMPTY + // val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric() val assistant = Assistant( assistantId = "asst_UxczzpJkysC0l424ood87DAk", - toolsConfig = listOf(Tool.toolOf(SumTool())), - metric = metric + toolsConfig = listOf(Tool.toolOf(SumTool())) ) val thread = AssistantThread(metric = metric) println("Welcome to the Math tutor, ask me anything about math:") @@ -132,5 +131,5 @@ private fun stepStatusEmoji(status: RunStepObject.Status) = } private fun displayRunStatus(run: AssistantThread.RunDelta.Run) { - println("Assistant: ${runStatusEmoji(run)} - ${run.message.status} - ${run.message}") + println("Assistant: ${runStatusEmoji(run)} - ${run.message.status}") } diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt index 523e7f0e9..bbc346677 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt @@ -156,12 +156,17 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { private fun RunObject.setParameters(span: Span) { span.setAttribute("openai.assistant.model", model) - span.setAttribute("openai.assistant.fileIds", fileIds.joinToString()) + if (fileIds.isNotEmpty()) span.setAttribute("openai.assistant.fileIds", fileIds.joinToString()) span.setAttribute("openai.assistant.tools.count", tools.count().toString()) span.setAttribute("openai.assistant.thread.id", threadId) span.setAttribute("openai.assistant.assistant.id", assistantId) span.setAttribute("openai.assistant.run.id", id) span.setAttribute("openai.assistant.status", status.value) + usage?.let { + span.setAttribute("openai.assistant.usage.totalTokens", it.totalTokens.toString()) + span.setAttribute("openai.assistant.usage.completionTokens", it.completionTokens.toString()) + span.setAttribute("openai.assistant.usage.promptTokens", it.promptTokens.toString()) + } } private fun RunStepObject.setParameters(span: Span) { From 4e52e9f696fd1543c020622536ba7b7b8cd9e4a4 Mon Sep 17 00:00:00 2001 From: Javi Pacheco Date: Tue, 5 Mar 2024 13:36:12 +0100 Subject: [PATCH 4/4] OpenTelemetric comment --- .../main/kotlin/com/xebia/functional/xef/assistants/DSL.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt index ca47eb40e..2159794d2 100644 --- a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt +++ b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt @@ -38,6 +38,11 @@ suspend fun main() { // ) // println("generated assistant: ${assistant2.assistantId}") + // This example contemplate the case of using OpenTelemetry for metrics + // To run the example with OpenTelemetry, you can execute the following commands: + // - # cd server/docker/opentelemetry + // - # docker-compose up + val metric = Metric.EMPTY // val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric()