154 lines
4.0 KiB
Dart
154 lines
4.0 KiB
Dart
import 'dart:async';
|
|
|
|
// It is essentially a stream but:
|
|
// 1. we cache the latestValue of the stream
|
|
// 2. the "latestValue" is re-emitted whenever the stream is listened to
|
|
class StreamControllerReemit<T> {
|
|
T? _latestValue;
|
|
|
|
final StreamController<T> _controller = StreamController<T>.broadcast();
|
|
|
|
StreamControllerReemit({T? initialValue}) : _latestValue = initialValue;
|
|
|
|
Stream<T> get stream {
|
|
return _latestValue != null ? _controller.stream.newStreamWithInitialValue(_latestValue as T) : _controller.stream;
|
|
}
|
|
|
|
T? get value => _latestValue;
|
|
|
|
void add(T newValue) {
|
|
_latestValue = newValue;
|
|
_controller.add(newValue);
|
|
}
|
|
|
|
Future<void> close() {
|
|
return _controller.close();
|
|
}
|
|
}
|
|
|
|
// return a new stream that immediately emits an initial value
|
|
extension _StreamNewStreamWithInitialValue<T> on Stream<T> {
|
|
Stream<T> newStreamWithInitialValue(T initialValue) {
|
|
return transform(_NewStreamWithInitialValueTransformer(initialValue));
|
|
}
|
|
}
|
|
|
|
// Helper for 'newStreamWithInitialValue' method for streams.
|
|
class _NewStreamWithInitialValueTransformer<T> extends StreamTransformerBase<T, T> {
|
|
/// the initial value to push to the new stream
|
|
final T initialValue;
|
|
|
|
/// controller for the new stream
|
|
late StreamController<T> controller;
|
|
|
|
/// subscription to the original stream
|
|
late StreamSubscription<T> subscription;
|
|
|
|
/// new stream listener count
|
|
var listenerCount = 0;
|
|
|
|
_NewStreamWithInitialValueTransformer(this.initialValue);
|
|
|
|
@override
|
|
Stream<T> bind(Stream<T> stream) {
|
|
if (stream.isBroadcast) {
|
|
return _bind(stream, broadcast: true);
|
|
} else {
|
|
return _bind(stream);
|
|
}
|
|
}
|
|
|
|
Stream<T> _bind(Stream<T> stream, {bool broadcast = false}) {
|
|
|
|
/////////////////////////////////////////
|
|
/// Original Stream Subscription Callbacks
|
|
///
|
|
|
|
/// When the original stream emits data, forward it to our new stream
|
|
void onData(T data) {
|
|
controller.add(data);
|
|
}
|
|
|
|
/// When the original stream is done, close our new stream
|
|
void onDone() {
|
|
controller.close();
|
|
}
|
|
|
|
/// When the original stream has an error, forward it to our new stream
|
|
void onError(Object error) {
|
|
controller.addError(error);
|
|
}
|
|
|
|
/// When a client listens to our new stream, emit the
|
|
/// initial value and subscribe to original stream if needed
|
|
void onListen() {
|
|
// Emit the initial value to our new stream
|
|
controller.add(initialValue);
|
|
|
|
// listen to the original stream, if needed
|
|
if (listenerCount == 0) {
|
|
subscription = stream.listen(
|
|
onData,
|
|
onError: onError,
|
|
onDone: onDone,
|
|
);
|
|
}
|
|
|
|
// count listeners of the new stream
|
|
listenerCount++;
|
|
}
|
|
|
|
//////////////////////////////////////
|
|
/// New Stream Controller Callbacks
|
|
///
|
|
|
|
/// (Single Subscription Only) When a client pauses
|
|
/// the new stream, pause the original stream
|
|
void onPause() {
|
|
subscription.pause();
|
|
}
|
|
|
|
/// (Single Subscription Only) When a client resumes
|
|
/// the new stream, resume the original stream
|
|
void onResume() {
|
|
subscription.resume();
|
|
}
|
|
|
|
/// Called when a client cancels their
|
|
/// subscription to the new stream,
|
|
void onCancel() {
|
|
// count listeners of the new stream
|
|
listenerCount--;
|
|
|
|
// when there are no more listeners of the new stream,
|
|
// cancel the subscription to the original stream,
|
|
// and close the new stream controller
|
|
if (listenerCount == 0) {
|
|
subscription.cancel();
|
|
controller.close();
|
|
}
|
|
}
|
|
|
|
//////////////////////////////////////
|
|
/// Return New Stream
|
|
///
|
|
|
|
// create a new stream controller
|
|
if (broadcast) {
|
|
controller = StreamController<T>.broadcast(
|
|
onListen: onListen,
|
|
onCancel: onCancel,
|
|
);
|
|
} else {
|
|
controller = StreamController<T>(
|
|
onListen: onListen,
|
|
onPause: onPause,
|
|
onResume: onResume,
|
|
onCancel: onCancel,
|
|
);
|
|
}
|
|
|
|
return controller.stream;
|
|
}
|
|
}
|