import { filter, map, pipe, withLatestFrom, type Observable } from 'rxjs';
import { resubscribeOnChange } from './resubscribeOnChange';
import { jsonDefinedAndMeaningful, mapToInitialSubscriptionResponse } from './utils';
import { wsSubscriptionMemoryConsumptionWithMetadata } from './wsSubscriptionCache';

/** A function definition for a function checking filtering inclusion for an item T, going from item -> boolean */
export type FilterFunc<T> = (item: T) => boolean;

/**
 * A function returning a pipe for filtering SubscriptionResponse<T> messages.
 * Takes in a filterFuncObs, which should be an observable of a BehaviorSubject specifically.
 * This pipe will continuously unsubscribe and resubscribe to the source observable on each change of the filter function.
 * By doing this, this pipe triggers the source observable to see this as a brand new subscriber, thus sending us replays.
 * This is necessary to do upon filter change, since we need to re-apply the filter and need the complete unfiltered (initial) data set to do so.
 * As can be seen by the typing, this pipe must be attached directly to a `wsSubscriptionMemory` pipe.
 * @param filterFuncObs An observable of a BehaviorSubject FilterFunc<T>
 */
export function wsSubscriptionFilter<T>(filterFuncObs: Observable<FilterFunc<T>>, subscriptionResponseType?: string) {
  return pipe(
    // Consume the memory input while retaining important metadata, "memory" and "diff" specifically
    wsSubscriptionMemoryConsumptionWithMetadata<T>(subscriptionResponseType),
    // As described in the comment above, resubscribe on each change of the filterFuncObs
    resubscribeOnChange(filterFuncObs),

    // grab the latest from the filterFuncObs. Important to note that only the source observable (json) will cause this line to re-trigger
    // as opposed to combineLatestWith, with which both combined observables would cause this pipe to re-run. We want the json obs
    // to be in control, and resubscribe when the filterFuncObs emit a second time
    withLatestFrom(filterFuncObs),
    map(([memoryOutput, filterFunc]) => {
      const { memory, json, diff } = memoryOutput;
      // At this point we have received some new message, or the filterFunc has changed causing a resubscription to the source.
      // If the json is an initial one, there is no logic needed and we just return that (filtered)
      if (json.initial) {
        return {
          ...json,
          data: json.data.filter(filterFunc),
        };
      }

      // Otherwise, we are receiving a delta update. In these delta updates, an entity can go from being included -> excluded by the filter.
      // If any entity in the delta update is going from being included to excluded, we have to set initial: true since there is no way for us to
      // convey to consumers downstream that an entity has been excluded due to a filter.
      const someEntityGoingFromInclusionToExclusion = [...(diff?.values() ?? [])].some(({ prev, next }) => {
        if (!prev || !next) {
          return false;
        }

        return filterFunc(prev) && !filterFunc(next);
      });

      if (someEntityGoingFromInclusionToExclusion) {
        const initialNonFilteredResponse = mapToInitialSubscriptionResponse(memory, subscriptionResponseType);
        return {
          ...initialNonFilteredResponse,
          data: initialNonFilteredResponse.data.filter(filterFunc),
        };
      } else {
        // Basic case: just forward the delta message filtered
        return {
          ...json,
          data: json.data.filter(filterFunc),
        };
      }
    }),
    // Constrain the messages we emit here to be relevant to a consumer
    filter(jsonDefinedAndMeaningful)
  );
}
