Reactive Batch Processing with Server-Sent Events (SSE)
/ 5 min read
Updated:Table of Contents
Problem statement
Consider a workflow where users upload a CSV containing device identifiers. The backend processes each record through an external API and users need the results before moving to the next step.
A few constraints were already known:
- Maximum batch size: 100 records
- External API response times are unpredictable
- Results are independent
- Users remain on the same screen waiting for completion
The main question was not how to process the batch, but how to stream results back to users as they become available.
Approaches considered
- Multiple request from frontend to backend for each record - Discarded due to more network trips and repetitive processing logic
- Single request with waiting for entire batch to complete - Discarded due to unpredictable response times and poor user experience
- Queue based approach with polling - Discarded due to added complexity for small batch sizes and need for real-time updates
- Server-Sent Events (SSE) - Chosen for its simplicity, real-time streaming capabilities within single HTTP connection, and good browser support
Why SSE fits this workflow
- Single upload request from frontend to backend
- Server pushes update progressively
- Fewer client round trips
- No polling
- No additional infrastructure
- Communication remains one-directional
Request Flow
sequenceDiagram
autonumber
participant User
participant Frontend as Flutter Frontend
participant Backend as Quarkus Backend
participant API as External API
User->>Frontend: Upload CSV file
Frontend->>Backend: POST /validate-customers\n(records)
activate Backend
Note over Backend: Create reactive stream (Multi)
loop For each record
Backend->>API: Validate record
API-->>Backend: Validation result
Backend-->>Frontend: SSE event (record result)
Frontend->>Frontend: Update progress UI
end
Backend-->>Frontend: SSE stream completed
deactivate Backend
Frontend-->>User: Show completed results
Reactive SSE Implementation
Backend
On backend we used Quarkus reactive programming model to implement. The main components were:
- REST endpoint - A POST endpoint accepts the uploaded records as JSON and starts the validation process.
- Reactive processing pipeline - Each record is processed independently using reactive streams. Instead of waiting for the entire batch to complete, every record is validated asynchronously and emitted immediately after processing.
- Server-Sent Events (SSE) - The endpoint produces a continuous event stream using Server-Sent Events (SSE). As each record finishes processing, the backend pushes the result directly to the frontend in real time.
@POST @Path("/validate-customers") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.SERVER_SENT_EVENTS) @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi<GenericValidationResponse> uploadCustomers(List<JsonNode> incomingData) { // Using JsonNode to handle dynamic JSON structure. System.out.println("Total Records Received: " + incomingData.size()); return Multi.createFrom().iterable(incomingData) .onItem().transformToUniAndMerge(jsonNode -> { int id = jsonNode.has("id") ? jsonNode.get("id").asInt() : 0; // Mimicking processing time with a random delay long delay = ThreadLocalRandom.current().nextLong(1000, 5000); GenericValidationResponse response = new GenericValidationResponse(id, true); return Uni.createFrom().item(response).onItem().delayIt().by(Duration.ofMillis(delay)); }); }
public record GenericValidationResponse(int id, boolean validated) { }Frontend
On the frontend we used flutter to consume the SSE stream. The main steps were:
- Initiate SSE connection - When the user uploads the CSV, we send a POST request to the backend and establish an SSE connection to receive updates. We have used flutter_client_sse package to handle SSE connections in flutter.
- Listen for incoming events - We listen for incoming SSE events and update the UI in real time as results arrive.
String url = "http://127.0.0.1:8080/api/validate-customers"; SSEClient.subscribeToSSE( method: SSERequestType.POST, url: url, header: { "Accept": "text/event-stream", "Content-Type": "application/json" }, body: {'data': payload}, ).listen((event) { // Process and update UI with the received event data }, onError: (error) { print("SSE Error: $error"); }, onDone: () { print("SSE stream closed"); }, );This is how UI looks like when processing is in progress:
Production considerations
Limiting Parallel API Calls
Although processing records in parallel can speed up the workflow, it can also overwhelm the external API. To mitigate this, we can use a concurrency limiter to control the number of simultaneous API calls.
return Multi.createFrom() .iterable(incomingData) .onItem() .transformToUni(DataValidateController::process) .merge(20); // Adjust the concurrency level as needed (e.g., 20 concurrent processing)Reverse Proxy and Buffering Issues
When deploying SSE behind reverse proxies such as Nginx or CloudFront, buffering can prevent events from reaching the client immediately.
For SSE to work correctly, buffering must be disabled so events are flushed progressively instead of being accumulated.
Handling Client Disconnects
One important edge case is when the user closes the browser tab or navigates away while processing is still running.
Since SSE connections are long-lived HTTP streams, the backend should detect connection termination and stop unnecessary processing to avoid wasting resources.
.onCancellation().invoke(() -> { cancelled.set(true); System.out.println("Client disconnected. Stopping batch processing."); });This stops scheduling new work once the SSE connection closes. However, cancellation of already running API calls depends on whether the underlying HTTP client supports reactive cancellation propagation.
The implementation itself was fairly small. The more interesting part was to evaluate different approaches and choose the one that best fits the workflow. SSE provided a simple and effective way to stream results back to users in real time without adding unnecessary complexity.
References
- github repo with sample code: