diff --git a/Sources/EventSource/EventSource.swift b/Sources/EventSource/EventSource.swift index 41ebac3..48323b6 100644 --- a/Sources/EventSource/EventSource.swift +++ b/Sources/EventSource/EventSource.swift @@ -43,9 +43,12 @@ public struct EventSource: Sendable { private let eventParser: @Sendable () -> EventParser public var timeoutIntervalForRequest: TimeInterval - public var timeoutIntervalForResource: TimeInterval + public var maxReconnectAttempts: Int + public var reconnectInitialDelay: TimeInterval + public var reconnectBackoffFactor: Double + public init( mode: Mode = .default, timeoutIntervalForRequest: TimeInterval = 60, @@ -63,12 +66,18 @@ public struct EventSource: Sendable { mode: Mode = .default, eventParser: @autoclosure @escaping @Sendable () -> EventParser, timeoutIntervalForRequest: TimeInterval = 60, - timeoutIntervalForResource: TimeInterval = 300 + timeoutIntervalForResource: TimeInterval = 300, + maxReconnectAttempts: Int = 5, + reconnectInitialDelay: TimeInterval = 1.0, + reconnectBackoffFactor: Double = 2.0 ) { self.mode = mode self.eventParser = eventParser self.timeoutIntervalForRequest = timeoutIntervalForRequest self.timeoutIntervalForResource = timeoutIntervalForResource + self.maxReconnectAttempts = maxReconnectAttempts + self.reconnectInitialDelay = reconnectInitialDelay + self.reconnectBackoffFactor = reconnectBackoffFactor } public func dataTask(for urlRequest: URLRequest) -> DataTask { @@ -76,7 +85,10 @@ public struct EventSource: Sendable { urlRequest: urlRequest, eventParser: eventParser(), timeoutIntervalForRequest: timeoutIntervalForRequest, - timeoutIntervalForResource: timeoutIntervalForResource + timeoutIntervalForResource: timeoutIntervalForResource, + maxReconnectAttempts: maxReconnectAttempts, + reconnectInitialDelay: reconnectInitialDelay, + reconnectBackoffFactor: reconnectBackoffFactor ) } } @@ -88,8 +100,24 @@ public extension EventSource { /// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task, /// it can be started by iterating event stream returned by ``DataTask/events()``. final class DataTask: Sendable { + private let _readyState: Mutex = Mutex(.none) + // Reconnection properties + private let maxReconnectAttempts: Int + private let reconnectInitialDelay: TimeInterval + private let reconnectBackoffFactor: Double + + private let _reconnectAttempts: Mutex = Mutex(0) + private var reconnectAttempts: Int { + get { + _reconnectAttempts.withLock { $0 } + } + set { + _reconnectAttempts.withLock { $0 = newValue } + } + } + /// A value representing the state of the connection. public var readyState: ReadyState { get { @@ -170,12 +198,18 @@ public extension EventSource { urlRequest: URLRequest, eventParser: EventParser, timeoutIntervalForRequest: TimeInterval, - timeoutIntervalForResource: TimeInterval + timeoutIntervalForResource: TimeInterval, + maxReconnectAttempts: Int, + reconnectInitialDelay: TimeInterval, + reconnectBackoffFactor: Double ) { self.urlRequest = urlRequest self._eventParser = Mutex(eventParser) self.timeoutIntervalForRequest = timeoutIntervalForRequest self.timeoutIntervalForResource = timeoutIntervalForResource + self.maxReconnectAttempts = maxReconnectAttempts + self.reconnectInitialDelay = reconnectInitialDelay + self.reconnectBackoffFactor = reconnectBackoffFactor } /// Creates and returns event stream. @@ -188,42 +222,56 @@ public extension EventSource { } return AsyncStream { continuation in - let sessionDelegate = SessionDelegate() - let urlSession = URLSession( - configuration: urlSessionConfiguration, - delegate: sessionDelegate, - delegateQueue: nil - ) - let urlSessionDataTask = urlSession.dataTask(with: urlRequest) - - let sessionDelegateTask = Task { [weak self] in - for await event in sessionDelegate.eventStream { - guard let self else { return } - - switch event { - case let .didCompleteWithError(error): - handleSessionError(error, stream: continuation, urlSession: urlSession) - case let .didReceiveResponse(response, completionHandler): - handleSessionResponse( - response, - stream: continuation, - urlSession: urlSession, - completionHandler: completionHandler - ) - case let .didReceiveData(data): - parseMessages(from: data, stream: continuation, urlSession: urlSession) - } + startSession(stream: continuation) + readyState = .connecting + consumed = true + } + } + + /// Initializes or reinitializes the SSE session + private func startSession(stream continuation: AsyncStream.Continuation) { + let sessionDelegate = SessionDelegate() + let urlSession = URLSession( + configuration: urlSessionConfiguration, + delegate: sessionDelegate, + delegateQueue: nil + ) + let urlSessionDataTask = urlSession.dataTask(with: urlRequest) + let sessionDelegateTask = Task { [weak self] in + for await event in sessionDelegate.eventStream { + guard let self else { return } + switch event { + case let .didCompleteWithError(error): + self.handleSessionError(error, stream: continuation, urlSession: urlSession) + case let .didReceiveResponse(response, completionHandler): + self.handleSessionResponse( + response, + stream: continuation, + urlSession: urlSession, + completionHandler: completionHandler + ) + case let .didReceiveData(data): + self.parseMessages(from: data, stream: continuation, urlSession: urlSession) } } + } - continuation.onTermination = { @Sendable [weak self] _ in - sessionDelegateTask.cancel() - Task { self?.close(stream: continuation, urlSession: urlSession) } - } + continuation.onTermination = { @Sendable [weak self] _ in + sessionDelegateTask.cancel() + Task { self?.close(stream: continuation, urlSession: urlSession) } + } - urlSessionDataTask.resume() - readyState = .connecting - consumed = true + urlSessionDataTask.resume() + } + + /// Helper method for reconnection + private func attemptReconnect(stream continuation: AsyncStream.Continuation) { + let delay = reconnectInitialDelay * pow(reconnectBackoffFactor, Double(reconnectAttempts - 1)) + Task { [weak self] in + try? await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + self?.startSession(stream: continuation) + self?.readyState = .connecting + self?.consumed = true } } @@ -241,9 +289,15 @@ public extension EventSource { if let error { sendErrorEvent(with: error, stream: continuation) } - - // Close connection - close(stream: continuation, urlSession: urlSession) + + // Attempts to reconnect if the limit has not been exceeded + if reconnectAttempts < maxReconnectAttempts { + reconnectAttempts += 1 + attemptReconnect(stream: continuation) + } else { + // Close connection if attempts exceeded + close(stream: continuation, urlSession: urlSession) + } } private func handleSessionResponse( @@ -321,6 +375,7 @@ public extension EventSource { private func setOpen(stream continuation: AsyncStream.Continuation) { readyState = .open + reconnectAttempts = 0 // reset attempts when opening continuation.yield(.open) }