diff --git a/CURSOR_MEMORY.md b/CURSOR_MEMORY.md new file mode 100644 index 00000000..d9ef1f0a --- /dev/null +++ b/CURSOR_MEMORY.md @@ -0,0 +1,98 @@ +# Openlayer Python SDK - Cursor Memory + +## Key Lessons Learned + +### ✅ FIXED: trace_async Now Supports Async Generators + +**STATUS**: **FIXED** - Applied proper async generator support to the SDK. + +**What Was Fixed**: +- Added `inspect.isasyncgenfunction()` detection in `trace_async` decorator +- Properly handles async generators by consuming them while maintaining streaming behavior +- Collects complete output and logs it after streaming finishes +- Measures timing correctly for the full streaming duration + +**Result**: Users can now use `@trace_async()` on async generator functions without any code modifications. The decorator automatically detects async generators and handles them appropriately while preserving streaming behavior. + +### Duplicate Trace Issue with Async Streaming + +**Problem**: When using both `@trace()` decorator and `trace_async_openai()` together, duplicate traces are generated: +1. One trace from `@trace()` decorator with function input parameters +2. Another trace from `trace_async_openai()` with the OpenAI chat completion request +3. **CRITICAL**: This breaks tests because tests are executed over both separate requests instead of one unified trace + +**Root Cause**: +- **CRITICAL**: `@trace_async()` is fundamentally broken for async generators + - Cannot `await` an async generator function (returns generator object, not values) + - Logs the generator object as output instead of actual streamed content + - Records timing before generator is consumed (incorrect latency) + - Never captures the actual yielded values +- `trace_async_openai()` creates separate traces for OpenAI calls +- This creates conflicting/duplicate trace data that confuses test execution +- Tests expect single request but get two separate ones to validate + +**Key Files**: +- `src/openlayer/lib/tracing/tracer.py` - Contains trace() and trace_async() decorators +- `src/openlayer/lib/integrations/async_openai_tracer.py` - Contains trace_async_openai() + +**Solution Strategy**: +1. **RECOMMENDED**: Remove all decorators and use ONLY `trace_async_openai()` for async streaming +2. Alternative: Use ONLY `@trace_async()` decorator (but lose OpenAI-specific metrics) +3. **NEVER**: Mix decorators with client tracing - this always causes duplicates + +**Confirmed Working Solutions**: + +**Option 1 - No Function Tracing** (simplest): +```python +class say_hi: + def __init__(self): + self.openai_client = trace_async_openai(AsyncOpenAI()) + + # ❌ Remove @trace() decorator + async def hi(self, cur_str: str): + # trace_async_openai handles all tracing automatically + response = await self.openai_client.chat.completions.create(...) + # ... rest of streaming logic +``` + +**Option 2 - With Function Tracing** (recommended): +```python +class say_hi: + def __init__(self): + self.openai_client = trace_async_openai(AsyncOpenAI()) + + @trace_async() # ✅ Works when function returns string + async def hi(self, cur_str: str) -> str: # Return complete response + response = await self.openai_client.chat.completions.create(...) + complete_answer = "" + async for chunk in response: + complete_answer += chunk.choices[0].delta.content or "" + return complete_answer # ✅ Return instead of yield +``` + +## Project Structure Insights + +### Tracing Architecture +- Context variables are used to maintain trace state across async calls +- Each trace consists of steps that can be nested +- Root steps trigger data upload to Openlayer +- Streaming responses are handled differently from regular responses + +### Integration Patterns +- LLM integrations wrap client methods rather than using decorators +- Each provider (OpenAI, Anthropic, etc.) has its own tracer module +- All tracers follow similar patterns but handle provider-specific details + +## Best Practices Discovered + +1. **Don't double-trace**: Avoid using both decorators and client tracing simultaneously +2. **Async generators need special handling**: Regular trace decorators don't work well with streaming responses +3. **Context preservation**: Async tracing requires proper context variable management +4. **Provider-specific tracing**: Use provider-specific tracers for better integration + +## Technology Stack Notes + +- Uses `contextvars` for maintaining trace context across async boundaries +- Integrates with multiple LLM providers through wrapper functions +- Supports both sync and async operations +- Uses step-based tracing model with nested structure \ No newline at end of file diff --git a/openlayer_duplicate_traces_analysis.md b/openlayer_duplicate_traces_analysis.md new file mode 100644 index 00000000..86c6c5fc --- /dev/null +++ b/openlayer_duplicate_traces_analysis.md @@ -0,0 +1,219 @@ +# Openlayer Duplicate Traces Analysis & Solutions + +## Problem Summary + +Your code is generating duplicate traces in Openlayer because you're using **both** `@trace()` decorator **and** `trace_async_openai()` simultaneously. This creates two separate traces: + +1. **Function-level trace** from `@trace()`: Captures the async generator object as output (not useful) +2. **OpenAI-level trace** from `trace_async_openai()`: Captures only the OpenAI response without function context + +## Root Cause Analysis + +### Issue 1: Async Generator Handling +The `@trace()` and `trace_async()` decorators don't properly handle async generators. They capture the generator object itself as the output, not the actual streamed content. + +```python +# Current behavior in tracer.py +output = await func(*func_args, **func_kwargs) # This returns +step.log(output=output) # Logs the generator object, not content +``` + +### Issue 2: Double Tracing +- `@trace()` creates a user-level trace for your `hi()` function +- `trace_async_openai()` creates an OpenAI-specific trace for the API call +- Both traces are independent and don't coordinate + +## Solutions + +### Solution 1: Use Only Client-Level Tracing (Recommended) + +Remove the `@trace()` decorator and rely solely on `trace_async_openai()`: + +```python +import asyncio +from openai import AsyncOpenAI +from openlayer.lib import trace_async_openai + +class say_hi: + def __init__(self): + self.openai_client = trace_async_openai(AsyncOpenAI()) + + # Remove @trace() decorator + async def hi(self, cur_str: str): + messages = [ + {"role": "system", "content": "say hi !"}, + {"role": "user", "content": cur_str} + ] + response = await self.openai_client.chat.completions.create( + model="gpt-3.5-turbo-16k", + messages=messages, + temperature=0, + max_tokens=100, + stream=True, + ) + complete_answer = "" + async for chunk in response: + delta = chunk.choices[0].delta + if hasattr(delta, "content") and delta.content: + chunk_content = delta.content + complete_answer += chunk_content + yield chunk_content +``` + +### Solution 2: Use Only Function-Level Tracing + +Remove `trace_async_openai()` and use only `@trace_async()` with a non-streaming approach: + +```python +import asyncio +from openai import AsyncOpenAI +from openlayer.lib.tracing.tracer import trace_async + +class say_hi: + def __init__(self): + self.openai_client = AsyncOpenAI() # No tracing wrapper + + @trace_async() + async def hi(self, cur_str: str) -> str: # Return string, not generator + messages = [ + {"role": "system", "content": "say hi !"}, + {"role": "user", "content": cur_str} + ] + response = await self.openai_client.chat.completions.create( + model="gpt-3.5-turbo-16k", + messages=messages, + temperature=0, + max_tokens=100, + stream=True, # Still stream internally + ) + complete_answer = "" + async for chunk in response: + delta = chunk.choices[0].delta + if hasattr(delta, "content") and delta.content: + complete_answer += delta.content + return complete_answer # Return complete response +``` + +### Solution 3: Custom Async Streaming Decorator (Advanced) + +Create a specialized decorator that properly handles async generators: + +```python +import asyncio +import inspect +import time +from functools import wraps +from typing import AsyncGenerator, Any +from openlayer.lib.tracing.tracer import create_step + +def trace_async_streaming( + *step_args, + inference_pipeline_id: str = None, + **step_kwargs, +): + """Decorator specifically for async streaming functions.""" + + def decorator(func): + func_signature = inspect.signature(func) + + @wraps(func) + async def wrapper(*func_args, **func_kwargs): + if step_kwargs.get("name") is None: + step_kwargs["name"] = func.__name__ + + with create_step( + *step_args, + inference_pipeline_id=inference_pipeline_id, + **step_kwargs + ) as step: + # Bind arguments + bound = func_signature.bind(*func_args, **func_kwargs) + bound.apply_defaults() + inputs = dict(bound.arguments) + inputs.pop("self", None) + inputs.pop("cls", None) + + # Execute the async generator + async_gen = func(*func_args, **func_kwargs) + collected_output = [] + + async def traced_generator(): + try: + async for chunk in async_gen: + collected_output.append(str(chunk)) + yield chunk + except Exception as exc: + step.log(metadata={"Exceptions": str(exc)}) + raise + finally: + # Log the complete output + end_time = time.time() + latency = (end_time - step.start_time) * 1000 + complete_output = "".join(collected_output) + + step.log( + inputs=inputs, + output=complete_output, + end_time=end_time, + latency=latency, + ) + + return traced_generator() + return wrapper + return decorator + +# Usage: +class say_hi: + def __init__(self): + self.openai_client = AsyncOpenAI() # No trace_async_openai + + @trace_async_streaming() + async def hi(self, cur_str: str): + messages = [ + {"role": "system", "content": "say hi !"}, + {"role": "user", "content": cur_str} + ] + response = await self.openai_client.chat.completions.create( + model="gpt-3.5-turbo-16k", + messages=messages, + temperature=0, + max_tokens=100, + stream=True, + ) + async for chunk in response: + delta = chunk.choices[0].delta + if hasattr(delta, "content") and delta.content: + yield delta.content +``` + +## Recommended Approach + +**Use Solution 1** (client-level tracing only) because: + +1. **Simplest**: Just remove the `@trace()` decorator +2. **Most reliable**: `trace_async_openai()` is specifically designed for streaming +3. **Complete data**: Captures all OpenAI-specific metrics (tokens, cost, etc.) +4. **Less error-prone**: Avoids the complexity of handling async generators + +## Why `trace_async()` Doesn't Work Well + +The current `trace_async()` implementation has these limitations: + +1. **Generator object capture**: It captures `` as output, not the actual content +2. **Timing issues**: It completes before the generator is fully consumed +3. **No streaming awareness**: It doesn't understand that the function yields values over time + +## Testing Your Fix + +After implementing Solution 1, you should see: +- **Single trace** per function call +- **Complete output** showing the full generated response +- **Proper timing** and token counts +- **No duplicate entries** in Openlayer + +## Future Improvements + +Consider contributing to the Openlayer project by: +1. Improving async generator handling in the decorators +2. Adding detection for double-tracing scenarios +3. Creating specialized decorators for streaming functions \ No newline at end of file diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index bc02ad88..368d3218 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -5,7 +5,7 @@ import inspect import logging import contextvars -from typing import Any, Dict, List, Tuple, Optional, Awaitable, Generator +from typing import Any, Dict, List, Tuple, Optional, Awaitable, Generator, AsyncIterator from functools import wraps from contextlib import contextmanager @@ -287,6 +287,15 @@ def decorator(func): async def wrapper(*func_args, **func_kwargs): if step_kwargs.get("name") is None: step_kwargs["name"] = func.__name__ + + # Check if function is an async generator + if inspect.isasyncgenfunction(func): + return handle_async_generator( + func, func_args, func_kwargs, step_args, step_kwargs, + inference_pipeline_id, context_kwarg, func_signature + ) + + # Handle regular async functions with create_step( *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs ) as step: @@ -327,6 +336,69 @@ async def wrapper(*func_args, **func_kwargs): raise exception return output + async def handle_async_generator( + func, func_args, func_kwargs, step_args, step_kwargs, + inference_pipeline_id, context_kwarg, func_signature + ) -> AsyncIterator[Any]: + """Handle async generator functions properly.""" + with create_step( + *step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs + ) as step: + collected_output = [] + exception = None + + # Prepare inputs + bound = func_signature.bind(*func_args, **func_kwargs) + bound.apply_defaults() + inputs = dict(bound.arguments) + inputs.pop("self", None) + inputs.pop("cls", None) + + if context_kwarg: + if context_kwarg in inputs: + log_context(inputs.get(context_kwarg)) + else: + logger.warning( + "Context kwarg `%s` not found in inputs of the " + "current function.", + context_kwarg, + ) + + try: + # Get the async generator + async_gen = func(*func_args, **func_kwargs) + + # Consume and collect all values while yielding them + async for value in async_gen: + collected_output.append(value) + yield value # Maintain streaming behavior + + except Exception as exc: + step.log(metadata={"Exceptions": str(exc)}) + exception = exc + raise + finally: + # Log complete output after streaming finishes + end_time = time.time() + latency = (end_time - step.start_time) * 1000 # in ms + + # Convert collected output to string representation + if collected_output: + # Handle different types of output + if all(isinstance(item, str) for item in collected_output): + complete_output = "".join(collected_output) + else: + complete_output = "".join(str(item) for item in collected_output) + else: + complete_output = "" + + step.log( + inputs=inputs, + output=complete_output, # Actual content, not generator object + end_time=end_time, + latency=latency, # Correct timing for full streaming + ) + return wrapper return decorator