v1.2.1 "Bunri" - Streaming Architecture Refinement
Release Date: July 23, 2025
Code Name: "Bunri" (分離 - Separation/Division)
Type: Patch Release
🎯 Overview
Version 1.2.1 refines the streaming architecture introduced in v1.2.0 by separating the streaming request and response patterns. This release addresses architectural concerns with the single-method streaming approach and implements a cleaner, more maintainable two-phase pattern for streaming operations.
🔧 Breaking Changes
Streaming Method Architecture Refactoring
The streaming methods have been fundamentally restructured to use a two-phase approach:
Before (v1.2.0)
// Single method returning IAsyncEnumerable
IAsyncEnumerable<LogEntry> RequestLogsStreamAsync(GetLogsRequest request);
IAsyncEnumerable<ChatMessage> RequestSessionMessagesStreamAsync(GetSingleSessionRequest request);
After (v1.2.1)
// Phase 1: Request streaming (server notifies client)
Task RequestLogsStreamAsync(GetLogsRequest request);
Task RequestSessionMessagesStreamAsync(GetSingleSessionRequest request);
// Phase 2: Receive streaming (client sends stream back)
Task ReceiveLogsStreamAsync(IAsyncEnumerable<LogEntry> stream, CancellationToken cancellationToken = default);
Task ReceiveSessionMessagesStreamAsync(IAsyncEnumerable<ChatMessage> stream, CancellationToken cancellationToken = default);
Event Handler Changes
Stream request event handlers now return Task
instead of IAsyncEnumerable
:
// Updated event signatures
public event Func<GetLogsRequest, Task>? LogsStreamRequested;
public event Func<GetSingleSessionRequest, Task>? SessionMessagesStreamRequested;
New Event Constants
Added new event constants for the receive phase:
Events.ReceiveLogsStream
- Event fired when client sends logs stream to serverEvents.ReceiveSessionMessagesStream
- Event fired when client sends session messages stream to server
🛠️ Technical Details
Implementation Changes
- IJiroHub Interface: Updated streaming methods to return
Task
and added new receive methods - IJiroClient Interface: Updated event signatures and added receive method declarations
- JiroClientBase Class: Modified event wiring from
OnStream
toOnNotification
for requests - Events Class: Added new constants for receive stream events
Migration Guide
If you were using the previous streaming methods:
Server-side (Hub):
- Replace
IAsyncEnumerable<T>
returns withTask
- Implement separate
Receive*StreamAsync
methods for handling incoming streams
- Replace
Client-side:
- Update event handlers to return
Task
instead ofIAsyncEnumerable<T>
- Use the new
Receive*StreamAsync
methods to send stream data back to server
- Update event handlers to return
Event Handling:
- Stream requests are now fire-and-forget notifications
- Stream data is sent separately using the receive methods
Architecture Benefits
- Cleaner separation of concerns: Request and response phases are clearly distinct
- Better error handling: Each phase can handle errors independently
- Improved maintainability: Easier to debug and extend streaming functionality
- IAsyncEnumerable streaming: Uses
IAsyncEnumerable<T>
with cancellation token support for robust stream handling - Cancellation support: Built-in cancellation token support for proper resource cleanup
🔄 Changes
Streaming Infrastructure Improvements
- Two-Phase Streaming: Clear separation between requesting streams and receiving stream data
- IAsyncEnumerable Transport: Leverages
IAsyncEnumerable<T>
for reliable stream data transport - Cancellation Token Support: Built-in cancellation support for proper resource cleanup
- Event Architecture: Simplified event handling with distinct phases
- Error Isolation: Better error handling between request and response phases
Code Quality Enhancements
- Method Naming Clarity: Clear distinction between
Request*
andReceive*
methods - Type Safety: Strong typing with
IAsyncEnumerable<T>
and proper cancellation token support - Event Consistency: Consistent event pattern across all streaming operations
💻 Usage Examples
Server-Side Implementation
// Phase 1: Server requests stream from client
await hubConnection.InvokeAsync("RequestLogsStreamAsync", new GetLogsRequest
{
SessionId = "session-123",
Count = 100
});
// Phase 2: Server receives stream from client (handled automatically)
public async Task ReceiveLogsStreamAsync(IAsyncEnumerable<LogEntry> stream, CancellationToken cancellationToken = default)
{
await foreach (var logEntry in stream.WithCancellation(cancellationToken))
{
// Process each log entry as it arrives
await ProcessLogEntry(logEntry);
}
}
Client-Side Implementation
// Phase 1: Handle stream request from server
LogsStreamRequested += async (request) =>
{
// Phase 2: Send stream back to server using async enumerable
await ReceiveLogsStreamAsync(GetLogStreamAsync(request));
};
// Helper method to create async enumerable stream
private async IAsyncEnumerable<LogEntry> GetLogStreamAsync(GetLogsRequest request, CancellationToken cancellationToken = default)
{
// Generate log entries based on request
for (int i = 0; i < request.Count; i++)
{
cancellationToken.ThrowIfCancellationRequested();
var logEntry = await GetLogEntryAsync(request.SessionId, i);
yield return logEntry;
// Simulate streaming delay
await Task.Delay(100, cancellationToken);
}
}
🚀 Deployment
- Version 1.2.1 maintains full backward compatibility for non-streaming operations
- Streaming operations require code updates as outlined in the migration guide
- All tests updated and validated with the new streaming architecture
- Package documentation updated with new streaming patterns
🔮 Future Considerations
- The two-phase streaming architecture provides a solid foundation for advanced streaming scenarios
- IAsyncEnumerable-based transport enables natural integration with .NET's async streaming patterns
- Built-in cancellation token support allows for proper resource cleanup and operation cancellation
- Clear separation allows for independent evolution of request and response phases
Note: This release represents a significant architectural improvement to the streaming infrastructure. While it introduces breaking changes to streaming operations, the new pattern provides better maintainability, error handling, and extensibility for future streaming enhancements.