Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions lib/eventsource.dart
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
library eventsource;

export "src/event.dart";

import "dart:async";
import "dart:convert";

import "package:http/http.dart" as http;
import "package:http/src/utils.dart" show encodingForCharset;
import "package:http_parser/http_parser.dart" show MediaType;

import "src/event.dart";
import "src/decoder.dart";
import "src/event.dart";

export "src/event.dart";

enum EventSourceReadyState {
CONNECTING,
Expand Down Expand Up @@ -55,24 +55,33 @@ class EventSource extends Stream<Event> {
EventSourceDecoder _decoder;
String _body;
String _method;

bool _allowMalformedUtf8;

/// Create a new EventSource by connecting to the specified url.
static Future<EventSource> connect(url,
{http.Client client, String lastEventId, Map headers, String body, String method}) async {
{http.Client client,
String lastEventId,
Map headers,
String body,
String method,
bool allowMalformedUtf8 = false}) async {
// parameter initialization
url = url is Uri ? url : Uri.parse(url);
client = client ?? new http.Client();
lastEventId = lastEventId ?? "";
body = body ?? "";
method = method ?? "GET";
EventSource es = new EventSource._internal(url, client, lastEventId, headers, body, method);
EventSource es = new EventSource._internal(
url, client, lastEventId, headers, body, method, allowMalformedUtf8);
await es._start();
return es;
}

EventSource._internal(this.url, this.client, this._lastEventId, this.headers, this._body, this._method) {
_decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay);
EventSource._internal(this.url, this.client, this._lastEventId, this.headers,
this._body, this._method, this._allowMalformedUtf8) {
_decoder = new EventSourceDecoder(
retryIndicator: _updateRetryDelay,
allowMalformedUtf8: _allowMalformedUtf8);
}

// proxy the listen call to the controller's listen call
Expand All @@ -92,9 +101,9 @@ class EventSource extends Stream<Event> {
request.headers["Last-Event-ID"] = _lastEventId;
}
if (headers != null) {
headers.forEach((k,v) {
headers.forEach((k, v) {
request.headers[k] = v;
});
});
}
request.body = _body;

Expand Down
8 changes: 5 additions & 3 deletions lib/src/decoder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ typedef RetryIndicator = void Function(Duration retry);

class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
RetryIndicator retryIndicator;
bool allowMalformedUtf8;

EventSourceDecoder({this.retryIndicator});
EventSourceDecoder({this.retryIndicator, this.allowMalformedUtf8 = false});

Stream<Event> bind(Stream<List<int>> stream) {
StreamController<Event> controller;
Expand All @@ -24,7 +25,7 @@ class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
// single event. So we build events on the fly and broadcast the event as
// soon as we encounter a double newline, then we start a new one.
stream
.transform(new Utf8Decoder())
.transform(new Utf8Decoder(allowMalformed: allowMalformedUtf8))
.transform(new LineSplitter())
.listen((String line) {
if (line.isEmpty) {
Expand Down Expand Up @@ -67,5 +68,6 @@ class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
return controller.stream;
}

StreamTransformer<RS, RT> cast <RS, RT>() => StreamTransformer.castFrom<List<int>, Event, RS, RT>(this);
StreamTransformer<RS, RT> cast<RS, RT>() =>
StreamTransformer.castFrom<List<int>, Event, RS, RT>(this);
}
Loading