import 'dart:async'; // It is essentially a stream but: // 1. we cache the latestValue of the stream // 2. the "latestValue" is re-emitted whenever the stream is listened to class StreamControllerReemit { T? _latestValue; final StreamController _controller = StreamController.broadcast(); StreamControllerReemit({T? initialValue}) : _latestValue = initialValue; Stream get stream { return _latestValue != null ? _controller.stream.newStreamWithInitialValue(_latestValue as T) : _controller.stream; } T? get value => _latestValue; void add(T newValue) { _latestValue = newValue; _controller.add(newValue); } Future close() { return _controller.close(); } } // return a new stream that immediately emits an initial value extension _StreamNewStreamWithInitialValue on Stream { Stream newStreamWithInitialValue(T initialValue) { return transform(_NewStreamWithInitialValueTransformer(initialValue)); } } // Helper for 'newStreamWithInitialValue' method for streams. class _NewStreamWithInitialValueTransformer extends StreamTransformerBase { /// the initial value to push to the new stream final T initialValue; /// controller for the new stream late StreamController controller; /// subscription to the original stream late StreamSubscription subscription; /// new stream listener count var listenerCount = 0; _NewStreamWithInitialValueTransformer(this.initialValue); @override Stream bind(Stream stream) { if (stream.isBroadcast) { return _bind(stream, broadcast: true); } else { return _bind(stream); } } Stream _bind(Stream stream, {bool broadcast = false}) { ///////////////////////////////////////// /// Original Stream Subscription Callbacks /// /// When the original stream emits data, forward it to our new stream void onData(T data) { controller.add(data); } /// When the original stream is done, close our new stream void onDone() { controller.close(); } /// When the original stream has an error, forward it to our new stream void onError(Object error) { controller.addError(error); } /// When a client listens to our new stream, emit the /// initial value and subscribe to original stream if needed void onListen() { // Emit the initial value to our new stream controller.add(initialValue); // listen to the original stream, if needed if (listenerCount == 0) { subscription = stream.listen( onData, onError: onError, onDone: onDone, ); } // count listeners of the new stream listenerCount++; } ////////////////////////////////////// /// New Stream Controller Callbacks /// /// (Single Subscription Only) When a client pauses /// the new stream, pause the original stream void onPause() { subscription.pause(); } /// (Single Subscription Only) When a client resumes /// the new stream, resume the original stream void onResume() { subscription.resume(); } /// Called when a client cancels their /// subscription to the new stream, void onCancel() { // count listeners of the new stream listenerCount--; // when there are no more listeners of the new stream, // cancel the subscription to the original stream, // and close the new stream controller if (listenerCount == 0) { subscription.cancel(); controller.close(); } } ////////////////////////////////////// /// Return New Stream /// // create a new stream controller if (broadcast) { controller = StreamController.broadcast( onListen: onListen, onCancel: onCancel, ); } else { controller = StreamController( onListen: onListen, onPause: onPause, onResume: onResume, onCancel: onCancel, ); } return controller.stream; } }