feat: add resilient SSE session with reconnection/backoff#43
feat: add resilient SSE session with reconnection/backoff#43dogo wants to merge 4 commits intoRecouse:mainfrom
Conversation
• Implement exponential backoff reconnection with configurable limits (maxReconnectAttempts, reconnectInitialDelay, reconnectBackoffFactor) • Track reconnect attempts and readyState using thread-safe Mutex; reset attempts on successful .open
|
Hey @Recouse, I’ve resolved the conflicts and the CI has passed. Could you please review the code when you have a moment? |
|
Hi @Recouse 👋 Before moving forward with this PR, I just wanted to check if there’s still interest in the feature it introduces. If not, no worries at all, I’m happy to close it. Just let me know 🙂 |
|
Hi, |
Recouse
left a comment
There was a problem hiding this comment.
Have you tested the implementation? At first glance, it looks fine, but there may be some caveats with SessionDelegate and URLSession instances from the initial request.
| /// Helper method for reconnection | ||
| private func attemptReconnect(stream continuation: AsyncStream<EventType>.Continuation) { | ||
| let delay = reconnectInitialDelay * pow(reconnectBackoffFactor, Double(reconnectAttempts - 1)) | ||
| DispatchQueue.global().asyncAfter(deadline: .now() + delay) { [weak self] in |
There was a problem hiding this comment.
Would it possible to implement this with Task.sleep()?
| /// it can be started by iterating event stream returned by ``DataTask/events()``. | ||
| final class DataTask: Sendable { | ||
| /// Initializes or reinitializes the SSE session | ||
| private func startSession(stream continuation: AsyncStream<EventType>.Continuation) { |
There was a problem hiding this comment.
It's more of a stylistic request to declare methods after init()
Yes, I’ve tested the reconnection flow with:
The session is properly invalidated before creating a new one, and the delegate is not reused across sessions to avoid unexpected behavior. |
Recouse
left a comment
There was a problem hiding this comment.
In my tests I've noticed that the current logic relies on the URLSession:task:didCompleteWithError: delegate method to complete the request. I've added an error != nil check to properly close the connection when reconnection is not needed.
There's another issue: reconnection happens indefinitely, because on reconnection readyState becomes .connecting, and after the first response it's set to .open and resets reconnectAttempts. As a result, the request never finishes and re-runs indefinitely.
| // Attempts to reconnect if the limit has not been exceeded | ||
| if reconnectAttempts < maxReconnectAttempts { |
There was a problem hiding this comment.
| // Attempts to reconnect if the limit has not been exceeded | |
| if reconnectAttempts < maxReconnectAttempts { | |
| // If an error occurred, an error object will indicate how the request failed, otherwise nil. | |
| // Attempts to reconnect on error if the limit has not been exceeded. | |
| if error != nil, reconnectAttempts < maxReconnectAttempts { |
|
Turns out it requires an overhaul of the DataTask implementation. I also wanted to reduce the Mutex count, and either switch to a single state value or make DataTask an actor. I think it will be better to put this PR on hold and return to it later. |
That makes sense. I agree the current approach introduces complexity in state management and reconnection flow. Switching DataTask to an actor sounds like a much cleaner solution and would eliminate the need for multiple mutexes. I'm happy to put this PR on hold and revisit it with a redesigned implementation. |
• Implement exponential backoff reconnection with configurable limits (maxReconnectAttempts, reconnectInitialDelay, reconnectBackoffFactor)
• Track reconnect attempts and readyState using thread-safe Mutex; reset attempts on successful .open