import { filter, map, pipe, scan, type Observable, type UnaryFunction } from 'rxjs';
import type { MinimalSubscriptionResponse } from '../types/SubscriptionResponse';
import { jsonDefined, mapToInitialSubscriptionResponse } from './utils';
import { wsSubscriptionMemory, type WSSubscriptionMemoryOutput } from './wsSubscriptionMemory';

/**
 * An RxJS pipe you can attach to any SubscriptionResponse<T> emitting observable in order to cache its current state for any incoming
 * late subscribers. For a late subscriber, it behaves to them just as if they are performing a fresh subscription to the backend.
 * After receiving a first initial response, they will be forwarded all SubscriptionResponses as usual.
 * @param getUniqueKey a function to get a unique key for each entry in the data set
 * @param subscriptionResponseType an optional parameter to define what the type should be on the emitted SubscriptionResponses
 */
export function wsSubscriptionCache<T>(getUniqueKey: (item: T) => string, subscriptionResponseType?: string) {
  return pipe(
    wsSubscriptionMemory(getUniqueKey),
    // Below here each subscriber has its own fork of the pipeline, and above is shared
    wsSubscriptionMemoryConsumption(subscriptionResponseType)
  );
}

/**
 * An RxJS pipe meant to be attached directly following a `wsSubscriptionMemory` pipe, consuming the memory in such a way that
 * downstream subscribers perceive cache-like behavior. On initial subscription, this pipe will construct a "synthetic" initial json message using the received memory,
 * and following that will simply forward any SubscriptionResponse messages.
 * @param subscriptionResponseType an optional type to attach to the initial synthetic message
 */
export function wsSubscriptionMemoryConsumption<T>(
  subscriptionResponseType?: string
): UnaryFunction<Observable<WSSubscriptionMemoryOutput<T>>, Observable<MinimalSubscriptionResponse<T>>> {
  return pipe(
    scan(
      ({ instantiated, json }, { memory, latestMessage }) => {
        if (!instantiated) {
          instantiated = true;
          json = mapToInitialSubscriptionResponse(memory, subscriptionResponseType, latestMessage);
        } else {
          json = latestMessage;
        }

        return { instantiated, json };
      },
      {
        instantiated: false,
        json: undefined as undefined | MinimalSubscriptionResponse<T>,
      }
    ),
    // Only forward the json
    map(({ json }) => json),
    // filter out undefined that get here, and only propagate changes which either have any data updates in them or are json.initial
    filter(jsonDefined)
  );
}

export interface WSSubscriptionMemoryConsumptionWithMetadataOutput<T> {
  json: MinimalSubscriptionResponse<T>;
  memory: WSSubscriptionMemoryOutput<T>['memory'];
  diff: WSSubscriptionMemoryOutput<T>['diff'];
}

/**
 *
 * An RxJS pipe meant to be attached directly following a `wsSubscriptionMemory` pipe, consuming the memory in such a way that
 * downstream subscribers perceive cache-like behavior. On initial subscription, this pipe will construct a "synthetic" initial json message using the received memory,
 * and following that will simply forward any SubscriptionResponse messages.
 * This is just like the `wsSubscriptionMemoryConsumption` pipe, but also passes along the `memory` and `diff` metadata from the memory pipe.
 * @param subscriptionResponseType an optional type to attach to the initial synthetic message
 */
export function wsSubscriptionMemoryConsumptionWithMetadata<T>(subscriptionResponseType?: string) {
  return pipe(
    scan<
      WSSubscriptionMemoryOutput<T>,
      {
        instantiated: boolean;
        json: MinimalSubscriptionResponse<T> | undefined;
        memory: WSSubscriptionMemoryOutput<T>['memory'];
        diff: WSSubscriptionMemoryOutput<T>['diff'];
      }
    >(
      ({ instantiated, json }, memoryOutput) => {
        if (!instantiated) {
          instantiated = true;
          json = mapToInitialSubscriptionResponse(memoryOutput.memory, subscriptionResponseType);
        } else {
          json = memoryOutput.latestMessage;
        }

        return { instantiated, json, memory: memoryOutput.memory, diff: memoryOutput.diff };
      },
      {
        instantiated: false,
        json: undefined,
        memory: new Map(),
        diff: undefined,
      }
    ),
    // Don't forward the instantiated property
    map(({ json, memory, diff }) => ({ json, memory, diff })),
    // filter out any messages that get here where json is undefined, and also helps with typing
    filter(memoryConsumptionWithMetadataReturnDefined)
  );
}

function memoryConsumptionWithMetadataReturnDefined<T>(
  input: Partial<WSSubscriptionMemoryConsumptionWithMetadataOutput<T>>
): input is WSSubscriptionMemoryConsumptionWithMetadataOutput<T> {
  return input.json != null;
}
