/**
 * Pipeline for single-item get requests. Gathers the requests into batches
 * and dispatches them in groups while permitting consumers to treat the input
 * and output as a single item.
 *
 * @module data/pipeline/item
 * @category DataStreams
 * @subcategory Pipelines
 */
import dedupe from "xstream/extra/dropRepeats";
import xs from "xstream";
import { source } from "data/stream/source";
import { fromCacheMap, toCacheMany } from "data/stream/cache";
import { byIds } from "data/stream/api";
import {
  collect,
  collectAll,
  streamLog,
  tee,
} from "data/stream/compose";
import { batch } from "data/stream/buffer";
import { idMapFilterSink } from "data/stream/sink";
import hashmap from "util/hash-map";
import { tagPlaceholder, preserveTagsSingle } from "./common";

/**
 * Pipeline for single-item get requests. Gathers the requests into batches
 * and dispatches them in groups while permitting consumers to treat the input
 * and output as a single item.
 *
 * Fetches a cached version or provides a placeholder while waiting
 * for the api request to resolve. Places fresh result in cache after
 * retrieval.
 *
 * If supplied with an external `accumulator` the map can be shared between
 * different pipelines, so e.g. a getMany pipeline can share its results with the
 * getItem pipeline, reducing duplication of calls (though the two pipes may still
 * dispatch requests).
 *
 * note that requests with > 1 arity should `post` with an array of params, and
 * provide an idMapFn that produces a "key" with the same params in the same order
 * to ensure caching works. These will be stored with hash keys by the cache stream
 * utils.
 *
 * Example:
 * item$ = getItem(config).post(myItemId);
 * // item$: -{ id: myItemId, CACHED: true, ... }----{ id: myItemId, ... }|
 *
 * // high arity requests:
 * item$ = getItem({
 *   makeCacheKey: (item) => ([item.parentId, item.id]),
 * }).post([parentId, id]);
 *
 * @function getItem
 * @param {object} config
 * @param {APIItemsCallback.<T>} config.apiFn api call like `getFn(ids: UUID[], skipCache: boolean)`
 * @param {?DefaultCallback.<T>} config.defaultFn provides a default/placeholder model
 * @param {IdMapCallback.<T>} config.makeCacheKey provides a key
 * @param {string} config.cacheKey cache key for an expiring map cache to store items
 * @param {?Map.<UUID, T>} config.accumulator map of all items retrieved this session
 * @returns {Sink.<T>}
 */
export default function getItem({
  apiFn,
  defaultFn = (id) => ({ id }),
  modelFn = (obj) => obj,
  // make cache key from request params
  makeCacheKey = (id) => id,
  getCacheKeyFromItem = (item) => item?.id,
  cacheKey,
  accumulator = hashmap(),
}) {
  const source$ = source();
  const activity$ = source();

  const cache$ = source$
    .compose(dedupe())
    .compose(streamLog("CACHE_SINGLE_IN"))
    .map(fromCacheMap(cacheKey, tagPlaceholder(defaultFn), makeCacheKey))
    .map(preserveTagsSingle(modelFn))
    .map((item) => [getCacheKeyFromItem(item), item])
    .compose(collect(accumulator))
    .compose(streamLog("CACHE_SINGLE_OUT"));

  const api$ = source$
    .compose(dedupe())
    .compose(streamLog("GET_SINGLE_DEDUPED"))
    .compose(batch(12))
    .compose(tee(() => activity$.post(1)))
    .compose(byIds(apiFn))
    .compose(tee(() => activity$.post(-1)))
    .compose(streamLog("GET_SINGLE_RESULT"))
    .compose(tee(toCacheMany(cacheKey, getCacheKeyFromItem)))
    .map((items) => items.map((item) => [
      getCacheKeyFromItem(item),
      modelFn(item),
    ]))
    .compose(collectAll(accumulator))
    .compose(streamLog("GET_SINGLE_OUT"));

  const sink$ = xs.merge(cache$, api$).compose(streamLog("GET_SINGLE_SINK"));

  return {
    post: (args) => idMapFilterSink(source$, sink$, makeCacheKey(args), getCacheKeyFromItem)
      .compose(streamLog("GET_SINGLE_FINAL")),
    sink$,
    source$,
    pending$: activity$.fold((acc, cur) => acc + cur, 0),
  };
}
