Skip to content

Commit 46aca1e

Browse files
authored
Initial swift EventSource implementation (#1)
1 parent f7d845e commit 46aca1e

File tree

10 files changed

+851
-8
lines changed

10 files changed

+851
-8
lines changed

LICENSE.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
Copyright 2020 Catamorphic, Co.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.

Package.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import PackageDescription
55

66
let package = Package(
77
name: "LDSwiftEventSource",
8+
platforms: [
9+
.iOS(.v10),
10+
.macOS(.v10_12),
11+
.watchOS(.v3),
12+
.tvOS(.v10)
13+
],
814
products: [
915
// Products define the executables and libraries produced by a package, and make them visible to other packages.
1016
.library(
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import Foundation
2+
3+
class EventParser {
4+
private struct Constants {
5+
static let dataLabel: Substring = "data"
6+
static let idLabel: Substring = "id"
7+
static let eventLabel: Substring = "event"
8+
static let retryLabel: Substring = "retry"
9+
static let defaultEventName = "message"
10+
}
11+
12+
private let handler: EventHandler
13+
private let connectionHandler: ConnectionHandler
14+
15+
private var data: String = ""
16+
private var lastEventId: String?
17+
private var eventName: String = Constants.defaultEventName
18+
19+
init(handler: EventHandler, connectionHandler: ConnectionHandler) {
20+
self.handler = handler
21+
self.connectionHandler = connectionHandler
22+
}
23+
24+
func parse(line: String) {
25+
let splitByColon = line.split(separator: ":", maxSplits: 1, omittingEmptySubsequences: false)
26+
27+
switch (splitByColon[0], splitByColon[safe: 1]) {
28+
case ("", nil): // Empty line
29+
dispatchEvent()
30+
case let ("", .some(comment)): // Line starting with ':' is a comment
31+
handler.onComment(comment: String(comment))
32+
case let (field, data):
33+
processField(field: field, value: dropLeadingSpace(str: data ?? ""))
34+
}
35+
}
36+
37+
private func dropLeadingSpace(str: Substring) -> Substring {
38+
if str.first == " " {
39+
return str[str.index(after: str.startIndex)...]
40+
}
41+
return str
42+
}
43+
44+
private func processField(field: Substring, value: Substring) {
45+
switch field {
46+
case Constants.dataLabel:
47+
if !data.isEmpty {
48+
data.append(contentsOf: "\n")
49+
}
50+
data.append(contentsOf: value)
51+
case Constants.idLabel:
52+
lastEventId = String(value)
53+
case Constants.eventLabel:
54+
eventName = String(value)
55+
case Constants.retryLabel:
56+
if value.allSatisfy(("0"..."9").contains), let reconnectionTime = Int64(value) {
57+
connectionHandler.setReconnectionTime(Double(reconnectionTime) * 0.001)
58+
}
59+
default: break
60+
}
61+
}
62+
63+
private func dispatchEvent() {
64+
guard !data.isEmpty
65+
else { return }
66+
let messageEvent = MessageEvent(data: data, lastEventId: lastEventId)
67+
if let lastEventId = lastEventId {
68+
connectionHandler.setLastEventId(lastEventId)
69+
}
70+
handler.onMessage(event: eventName, messageEvent: messageEvent)
71+
data = ""
72+
eventName = Constants.defaultEventName
73+
}
74+
}
75+
76+
private extension Array {
77+
/// Returns the element at the specified index if it is within bounds, otherwise nil.
78+
subscript (safe index: Index) -> Element? {
79+
return index >= startIndex && index < endIndex ? self[index] : nil
80+
}
81+
}
Lines changed: 253 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,254 @@
1-
struct LDSwiftEventSource {
2-
var text = "Hello, World!"
1+
import Foundation
2+
import os.log
3+
4+
/// Error class that means the remote server returned an HTTP error.
5+
public class UnsuccessfulResponseError: Error {
6+
/// The HTTP response code received
7+
public let responseCode: Int
8+
9+
init(responseCode: Int) {
10+
self.responseCode = responseCode
11+
}
12+
}
13+
14+
/// Protocol for an object that will receive SSE events.
15+
public protocol EventHandler {
16+
/// EventSource calls this method when the stream connection has been opened.
17+
func onOpened()
18+
/// EventSource calls this method when the stream connection has been closed.
19+
func onClosed()
20+
/// EventSource calls this method when it has received a new event from the stream.
21+
func onMessage(event: String, messageEvent: MessageEvent)
22+
/// EventSource calls this method when it has received a comment line from the stream
23+
/// (any line starting with a colon).
24+
func onComment(comment: String)
25+
/// This method will be called for all exceptions that occur on the socket connection
26+
/// (including an {@link UnsuccessfulResponseError} if the server returns an unexpected HTTP status),
27+
/// but only after the ConnectionErrorHandler (if any) has processed it. If you need to
28+
/// do anything that affects the state of the connection, use ConnectionErrorHandler.
29+
func onError(error: Error)
30+
}
31+
32+
typealias ConnectionHandler = (setReconnectionTime: (TimeInterval) -> (), setLastEventId: (String) -> ())
33+
/// Type for a function that will be notified when EventSource encounters a connection failure.
34+
/// This is different from onError in that it will not be called for other kinds of errors; also,
35+
/// it has the ability to tell EventSource to stop reconnecting.
36+
public typealias ConnectionErrorHandler = (Error) -> ConnectionErrorAction
37+
38+
/// Potential actions a ConnectionErrorHandler can return
39+
public enum ConnectionErrorAction {
40+
/// Specifies that the error should be logged normally and dispatched to the EventHandler.
41+
/// Connection retrying will proceed normally if appropriate.
42+
case proceed
43+
/// Specifies that the connection should be immediately shut down and not retried. The error
44+
/// will not be dispatched to the EventHandler
45+
case shutdown
46+
}
47+
48+
/// Enum values representing the states of an EventSource
49+
public enum ReadyState: String, Equatable {
50+
/// The EventSource has not been started yet.
51+
case raw
52+
/// The EventSource is attempting to make a connection.
53+
case connecting
54+
/// The EventSource is active and the EventSource is listening for events.
55+
case open
56+
/// The connection has been closed or has failed, and the EventSource will attempt to reconnect.
57+
case closed
58+
/// The connection has been permanently closed and will not reconnect.
59+
case shutdown
60+
}
61+
62+
/// Struct representing received event from the stream.
63+
public struct MessageEvent: Equatable, Hashable {
64+
/// Returns the event data.
65+
public let data: String
66+
/// The last seen event id, or the event id set in the Config if none have been received.
67+
public let lastEventId: String?
68+
69+
init(data: String, lastEventId: String? = nil) {
70+
self.data = data
71+
self.lastEventId = lastEventId
72+
}
73+
}
74+
75+
public class EventSource: NSObject, URLSessionDataDelegate {
76+
77+
private let config: Config
78+
79+
private let delegateQueue: DispatchQueue = DispatchQueue(label: "ESDelegateQueue")
80+
private let logger: OSLog
81+
private var readyState: ReadyState = .raw
82+
83+
private var lastEventId: String?
84+
private var reconnectTime: TimeInterval
85+
private var connectedTime: Date?
86+
87+
private var reconnectionAttempts: Int = 0
88+
private var errorHandlerAction: ConnectionErrorAction? = nil
89+
private let utf8LineParser: UTF8LineParser = UTF8LineParser()
90+
private var eventParser: EventParser!
91+
92+
public init(config: Config) {
93+
self.config = config
94+
self.lastEventId = config.lastEventId
95+
self.reconnectTime = config.reconnectTime
96+
self.logger = OSLog(subsystem: "com.launchdarkly.swift-event-source", category: "LDEventSource")
97+
}
98+
99+
public func start() {
100+
delegateQueue.async {
101+
guard self.readyState == .raw
102+
else {
103+
os_log("Start method called on this already-started EventSource object. Doing nothing", log: self.logger, type: .info)
104+
return
105+
}
106+
self.connect()
107+
}
108+
}
109+
110+
public func getLastEventId() -> String? { lastEventId }
111+
112+
private func connect() {
113+
os_log("Starting EventSource client", log: logger, type: .info)
114+
let connectionHandler: ConnectionHandler = (
115+
setReconnectionTime: { reconnectionTime in self.reconnectTime = reconnectionTime },
116+
setLastEventId: { eventId in self.lastEventId = eventId }
117+
)
118+
self.eventParser = EventParser(handler: self.config.handler, connectionHandler: connectionHandler)
119+
let sessionConfig = URLSessionConfiguration.default
120+
sessionConfig.httpAdditionalHeaders = ["Accept": "text/event-stream", "Cache-Control": "no-cache"]
121+
sessionConfig.timeoutIntervalForRequest = self.config.idleTimeout
122+
let session = URLSession.init(configuration: sessionConfig, delegate: self, delegateQueue: nil)
123+
var urlRequest = URLRequest(url: self.config.url,
124+
cachePolicy: URLRequest.CachePolicy.reloadIgnoringLocalAndRemoteCacheData,
125+
timeoutInterval: self.config.idleTimeout)
126+
urlRequest.httpMethod = self.config.method
127+
urlRequest.httpBody = self.config.body
128+
urlRequest.setValue(self.lastEventId, forHTTPHeaderField: "Last-Event-ID")
129+
urlRequest.allHTTPHeaderFields?.merge(self.config.headers, uniquingKeysWith: { $1 })
130+
session.dataTask(with: urlRequest).resume()
131+
}
132+
133+
private func dispatchError(error: Error) -> ConnectionErrorAction {
134+
let action: ConnectionErrorAction = config.connectionErrorHandler(error)
135+
if action != .shutdown {
136+
config.handler.onError(error: error)
137+
}
138+
return action
139+
}
140+
141+
private func afterComplete() {
142+
var nextState: ReadyState = .closed
143+
let currentState: ReadyState = readyState
144+
if errorHandlerAction == .shutdown {
145+
os_log("Connection has been explicitly shut down by error handler", log: logger, type: .info)
146+
nextState = .shutdown
147+
}
148+
readyState = nextState
149+
os_log("State: %@ -> %@", log: logger, type: .debug, currentState.rawValue, nextState.rawValue)
150+
151+
if currentState == .open {
152+
config.handler.onClosed()
153+
}
154+
155+
if nextState != .shutdown {
156+
reconnect()
157+
}
158+
}
159+
160+
private func reconnect() {
161+
reconnectionAttempts += 1
162+
163+
if let connectedTime = connectedTime, Date().timeIntervalSince(connectedTime) >= config.backoffResetThreshold {
164+
reconnectionAttempts = 0
165+
}
166+
167+
let maxSleep = min(config.maxReconnectTime, reconnectTime * pow(2.0, Double(reconnectionAttempts)))
168+
let sleep = maxSleep / 2 + Double.random(in: 0...(maxSleep/2))
169+
170+
os_log("Waiting %.3f seconds before reconnecting...", log: logger, type: .info, sleep)
171+
delegateQueue.asyncAfter(deadline: .now() + sleep) {
172+
self.connect()
173+
}
174+
}
175+
176+
// Tells the delegate that the task finished transferring data.
177+
public func urlSession(_ session: URLSession,
178+
task: URLSessionTask,
179+
didCompleteWithError error: Error?) {
180+
utf8LineParser.closeAndReset().forEach(eventParser.parse)
181+
// Send additional empty line to force a last dispatch
182+
eventParser.parse(line: "")
183+
184+
if let error = error {
185+
if readyState != .shutdown {
186+
os_log("Connection error: %@", log: logger, type: .info, error.localizedDescription)
187+
errorHandlerAction = dispatchError(error: error)
188+
} else {
189+
errorHandlerAction = .shutdown
190+
}
191+
} else {
192+
os_log("Connection unexpectedly closed.", log: logger, type: .info)
193+
}
194+
195+
afterComplete()
196+
}
197+
198+
// Tells the delegate that the data task received the initial reply (headers) from the server.
199+
public func urlSession(_ session: URLSession,
200+
dataTask: URLSessionDataTask,
201+
didReceive response: URLResponse,
202+
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void) {
203+
os_log("initial reply received", log: logger, type: .debug)
204+
205+
let httpResponse = response as! HTTPURLResponse
206+
if (200..<300).contains(httpResponse.statusCode) {
207+
connectedTime = Date()
208+
readyState = .open
209+
config.handler.onOpened()
210+
completionHandler(.allow)
211+
} else {
212+
os_log("Unsuccessful response: %d", log: logger, type: .info, httpResponse.statusCode)
213+
errorHandlerAction = dispatchError(error: UnsuccessfulResponseError(responseCode: httpResponse.statusCode))
214+
completionHandler(.cancel)
215+
}
216+
}
217+
218+
public func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
219+
utf8LineParser.append(data).forEach(eventParser.parse)
220+
}
221+
222+
/// Struct describing the configuration of the EventSource
223+
public struct Config {
224+
public let handler: EventHandler
225+
public let url: URL
226+
227+
/// The method to use for the EventSource connection
228+
public var method: String = "GET"
229+
/// Optional body to be sent with the initial request
230+
public var body: Data? = nil
231+
/// Error handler that can determine whether to proceed or shutdown.
232+
public var connectionErrorHandler: ConnectionErrorHandler = { _ in .proceed }
233+
/// An initial value for the last-event-id to be set on the initial request
234+
public var lastEventId: String? = nil
235+
/// Additional headers to be set on the request
236+
public var headers: [String: String] = [:]
237+
/// The minimum amount of time to wait before reconnecting after a failure
238+
public var reconnectTime: TimeInterval = 1.0
239+
/// The maximum amount of time to wait before reconnecting after a failure
240+
public var maxReconnectTime: TimeInterval = 30.0
241+
/// The minimum amount of time for an EventSource connection to remain open before allowing connection
242+
/// backoff to reset.
243+
public var backoffResetThreshold: TimeInterval = 60.0
244+
/// The maximum amount of time between receiving any data before considering the connection to have
245+
/// timed out.
246+
public var idleTimeout: TimeInterval = 300.0
247+
248+
/// Create a new configuration with an EventHandler and a URL
249+
public init(handler: EventHandler, url: URL) {
250+
self.handler = handler
251+
self.url = url
252+
}
253+
}
3254
}

0 commit comments

Comments
 (0)