/**
 * Pipeline for get requests from endpoints that always return the same object.
 *
 * Prevents needless duplication of requests in rapid succession, allowing
 * for multiple views/components to fetch the same object without worrying about
 * coordination.
 *
 * @module data/pipeline/item
 * @category DataStreams
 * @subcategory Pipelines
 */
import dedupe from "xstream/extra/dropRepeats";
import xs from "xstream";
import { fromCache, toCache, notInCache } from "data/stream/cache";
import { asyncStream, streamLog, tee } from "data/stream/compose";
import { source } from "data/stream/source";
import hashmap from "util/hash-map";
import { tagPlaceholder, preserveTagsSingle } from "./common";

/**
 * Pipeline for single-item get requests. Gathers the requests into batches
 * and dispatches them in groups while permitting consumers to treat the input
 * and output as a single item.
 *
 * Fetches a cached version or provides a placeholder while waiting
 * for the api request to resolve. Places fresh result in cache after
 * retrieval.
 *
 * If supplied with an external `accumulator` the map can be shared between
 * different pipelines, so e.g. a getMany pipeline can share its results with the
 * getItem pipeline, reducing duplication of calls (though the two pipes may still
 * dispatch requests).
 *
 * Example:
 * item$ = getMono(config).post();
 * // item$: -{ id: myItemId, CACHED: true, ... }----{ id: myItemId, ... }|
 *
 * @function getItem
 * @param {object} config
 * @param {APIItemCallback.<T>} config.apiFn api call like `getFn(ids: UUID[], skipCache: boolean)`
 * @param {?DefaultCallback.<T>} config.defaultFn provides a default/placeholder model
 * @param {string} config.cacheKey cache key for an expiring map cache to store items
 * @param {?Map.<UUID, T>} config.accumulator map of all items retrieved this session
 * @returns {Sink.<T>}
 */
export default function getMono({
  apiFn,
  cacheExcludeFn = notInCache,
  cacheFn = fromCache,
  cacheKey,
  defaultFn = () => null,
  modelFn = (partial) => partial,
  accumulatorKey = "response",
  accumulator = hashmap(),
}) {
  const source$ = source();
  const activity$ = source();

  const cache$ = source$
    .compose(dedupe())
    .map(cacheFn(cacheKey, tagPlaceholder(defaultFn)))
    .map(preserveTagsSingle(modelFn))
    .compose(streamLog("MONO_CACHE_OUT"));

  const api$ = source$
    .filter(cacheExcludeFn(cacheKey))
    .compose(dedupe())
    .compose(streamLog("MONO_DEDUPED"))
    .compose(tee(() => activity$.post(1)))
    .map(() => apiFn())
    .compose(asyncStream)
    .compose(tee(() => activity$.post(-1)))
    .compose(tee(toCache(cacheKey)))
    .compose(tee((res) => accumulator.set(accumulatorKey, res)))
    .compose(streamLog("MONO_OUT"));

  const sink$ = xs.merge(cache$, api$).remember();
  sink$.compose(streamLog("MONO_FINAL"));

  return {
    post: (args) => {
      setTimeout(() => source$.post(args), 1);
      return sink$.compose(streamLog("MONO_SINK_OUT"));
    },
    // post: () => monoSink(source$, sink$),
    sink$,
    source$,
    pending$: activity$.fold((acc, cur) => acc + cur, 0),
  };
}
