import { pipe, repeat, skip, takeUntil, type MonoTypeOperatorFunction, type Observable } from 'rxjs';

/**
 * This pipe subscribes to the source observable, and takes in a secondary observable to use as a notifier for when to resubscribe
 * to the source. On the !second! emission of the changeObs, this pipe will unsubscribe and resubscribe to the source.
 * The first emission of the changeObs is seen as receiving the "initial value", thus we only treat the second emissions as a meaningful change.
 * For any downstream consumers of this pipe, they will observe it as if they are always subscribed to a persistent hot source observable.
 * @param changeObs The observable which, when emitting a changed value, triggers this pipe to unsubscribe and resubscribe to the source.
 */
export function resubscribeOnChange<T>(changeObs: Observable<unknown>): MonoTypeOperatorFunction<T> {
  return pipe(
    // skip the first changeObs emission to conform to the behavior described above. This will emit a "complete" event on second changeObs emission.
    takeUntil(changeObs.pipe(skip(1))),
    // repeat catches "complete" events sent by takeUntil, and when one is received, "remounts" this pipe by resubscribing to the source.
    repeat()
  );
}
