Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow .clear()ing the stored value or error in a BehaviorSubject #774

Open
timcreatedit opened this issue Jan 21, 2025 · 1 comment
Open
Assignees

Comments

@timcreatedit
Copy link

When building with BehaviorSubject, the need arose to be able to clear the subject to reset it to the same state it has after having been instantiated without a seed.

This can be helpful if the subject holds an error and we want to retry the operation that caused it. During the retry, any new listeners still immediately receive the error value, while we want them to have to wait for the first value after the retry.

Related: #233

A simple fix for this is a re-implementation of BehaviorSubject that supports .clear():

  /// Clears the subject and removes the last value or error.
  void clear() => _wrapper
    ..value = EMPTY
    ..isValue = false
    ..errorAndStackTrace = null;
Full Code ```dart // ignore_for_file: implementation_imports, // ignore_for_file: avoid_equals_and_hash_code_on_mutable_classes

import 'dart:async';

import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/streams/value_stream.dart';
import 'package:rxdart/src/transformers/start_with.dart';
import 'package:rxdart/src/transformers/start_with_error.dart';
import 'package:rxdart/src/utils/empty.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';
import 'package:rxdart/src/utils/notification.dart';
import 'package:rxdart/subjects.dart';

/// A variant of [BehaviorSubject] that allows clearing the subject and removing
/// the last value or error and resets it to the state as if it was just created
/// without a seed value.
///
/// Other than that, this works exactly like [BehaviorSubject].
class ClearableBehaviorSubject extends Subject implements ValueStream {
/// Constructs a [ClearableBehaviorSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ClearableBehaviorSubject({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);

final wrapper = _Wrapper<T>();

return ClearableBehaviorSubject<T>._(
  controller,
  Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
  wrapper,
);

}

/// Constructs a [ClearableBehaviorSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// [seedValue] becomes the current [value] and is emitted immediately.
///
/// See also [StreamController.broadcast]
factory ClearableBehaviorSubject.seeded(
T seedValue, {
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);

final wrapper = _Wrapper<T>.seeded(seedValue);

return ClearableBehaviorSubject<T>._(
  controller,
  Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
  wrapper,
);

}

ClearableBehaviorSubject._(
super.controller,
super.stream,
this._wrapper,
);

final _Wrapper _wrapper;

static Stream Function() _deferStream(
_Wrapper wrapper,
StreamController controller,
bool sync,
) =>
() {
final errorAndStackTrace = wrapper.errorAndStackTrace;
if (errorAndStackTrace != null && !wrapper.isValue) {
return controller.stream.transform(
StartWithErrorStreamTransformer(
errorAndStackTrace.error,
errorAndStackTrace.stackTrace,
),
);
}

    final value = wrapper.value;
    if (isNotEmpty(value) && wrapper.isValue) {
      return controller.stream
          .transform(StartWithStreamTransformer(value as T));
    }

    return controller.stream;
  };

@OverRide
void onAdd(T event) => _wrapper.setValue(event);

@OverRide
void onAddError(Object error, [StackTrace? stackTrace]) =>
_wrapper.setError(error, stackTrace);

/// Clears the subject and removes the last value or error.
void clear() => _wrapper.clear();

@OverRide
ValueStream get stream => _Stream(this);

@OverRide
bool get hasValue => isNotEmpty(_wrapper.value);

@OverRide
T get value {
final value = _wrapper.value;
if (isNotEmpty(value)) {
return value as T;
}
throw ValueStreamError.hasNoValue();
}

@OverRide
T? get valueOrNull => unbox(_wrapper.value);

/// Set and emit the new value.
set value(T newValue) => add(newValue);

@OverRide
bool get hasError => _wrapper.errorAndStackTrace != null;

@OverRide
Object? get errorOrNull => _wrapper.errorAndStackTrace?.error;

@OverRide
Object get error {
final errorAndSt = _wrapper.errorAndStackTrace;
if (errorAndSt != null) {
return errorAndSt.error;
}
throw ValueStreamError.hasNoError();
}

@OverRide
StackTrace? get stackTrace => _wrapper.errorAndStackTrace?.stackTrace;

@OverRide
StreamNotification? get lastEventOrNull {
// data event
if (_wrapper.isValue) {
return StreamNotification.data(_wrapper.value as T);
}

// error event
final errorAndSt = _wrapper.errorAndStackTrace;
if (errorAndSt != null) {
  return ErrorNotification(errorAndSt);
}

// no event
return null;

}
}

class _Wrapper {
/// Non-seeded constructor
_Wrapper() : isValue = false;

_Wrapper.seeded(T v) {
setValue(v);
}

bool isValue = false;
Object? value = EMPTY;
ErrorAndStackTrace? errorAndStackTrace;

void setValue(T event) {
value = event;
isValue = true;
}

void setError(Object error, StackTrace? stackTrace) {
errorAndStackTrace = ErrorAndStackTrace(error, stackTrace);
isValue = false;
}

void clear() {
value = EMPTY;
isValue = false;
errorAndStackTrace = null;
}
}

class _Stream extends Stream implements ValueStream {
_Stream(this._subject);

final ClearableBehaviorSubject _subject;

@OverRide
bool get isBroadcast => true;

// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.

@OverRide
int get hashCode => _subject.hashCode ^ 0x35323532;

@OverRide
bool operator ==(Object other) {
if (identical(this, other)) {
return true;
}

return other is _Stream && identical(other._subject, _subject);

}

@OverRide
StreamSubscription listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);

@OverRide
Object get error => _subject.error;

@OverRide
Object? get errorOrNull => _subject.errorOrNull;

@OverRide
bool get hasError => _subject.hasError;

@OverRide
bool get hasValue => _subject.hasValue;

@OverRide
StackTrace? get stackTrace => _subject.stackTrace;

@OverRide
T get value => _subject.value;

@OverRide
T? get valueOrNull => _subject.valueOrNull;

@OverRide
StreamNotification? get lastEventOrNull => _subject.lastEventOrNull;
}

</details>
@hoc081098
Copy link
Collaborator

While it is useful in Flutter development, I don't see any ReactiveX implementations that have this method of BehaviorSubject.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants