/**
 * Pipeline for multi-item get requests.
 * @module data/pipeline/many
 * @category DataStreams
 * @subcategory Pipelines
 */
import xs from "xstream";
import dropRepeats from "xstream/extra/dropRepeats";
import deq from "deep-equal";
import { source } from "data/stream/source";
import { fromCacheMany, toCacheMany, excludeCachedEntries } from "data/stream/cache";
import {
  byIds,
  markInProgress,
  excludeInProgress,
  markComplete,
} from "data/stream/api";
import {
  asyncStream,
  collectAll,
  streamLog,
  takeKeys,
  tee,
} from "data/stream/compose";
import { choke, chunk, gate, dropChoke } from "data/stream/buffer";
import { unique } from "util/array";
import hashmap from "util/hash-map";
import { tagPlaceholder, preserveTags } from "./common";

const CHOKE_TIME = 240; // 20 frames

const inProgress = new Set();

/**
 * Pipeline for multi-item get requests.
 *
 * Fetches a cached version of each item or provides a placeholder while waiting
 * for the api request to resolve. Places fresh result in cache after
 * retrieval.
 *
 * If supplied with an external `accumulator` the map can be shared between
 * different pipelines, so e.g. a getItem pipeline can share its results with the
 * getMany pipeline, reducing duplication of calls (though the two pipes may still
 * dispatch requests).
 *
 * @function getItems
 * @param {object} opts configuration options
 * @param {APIItemCallback.<T>} opts.apiFn api call like `getFn(ids: UUID[], ?skipCache: boolean)`
 * @param {?DefaultCallback.<T>} opts.defaultFn provides a default/placeholder model
 * @param {?DefaultCallback.<T>} opts.modelFn should map cache to full object
 * @param {string} opts.cacheKey cache key for an expiring map cache to store items
 * @param {?Map.<UUID, T>} opts.accumulator map of all items retrieved this session
 * @param {Stream.<Map.<UUID, T>>} opts.itemSink shared stream containing all `T`s
 * @param {int} [chunkSize=50] size of request chunks
 * @param {int} [timeout=3000] milliseconds to wait until giving up on a chunk
 * @returns {Sink.<Map.<UUID, T>>}
 */
export default function getMany({
  apiFn,
  alwaysUpdate = false, // update even if cached
  cacheKey,
  defaultFn = (id) => ({ id }),
  modelFn = (partial) => partial,
  cacheFn = fromCacheMany,
  cacheExcludeFn = excludeCachedEntries,
  accumulator = hashmap(),
  // takes an item and responds with a cache key
  makeCacheKey = (id) => id,
  // takes an item and responds with its accumulator key
  getCacheKeyFromItem = (item) => item.id,
  itemSink$ = xs.create(),
  chunkSize = 10,
  timeout = 300,
}) {
  if (itemSink$ === null) throw new Error("many pipeline: itemSink$ is required");
  const source$ = source();
  const activity$ = source();

  const cache$ = source$
    .compose(streamLog("MANY_CACHE_IN"))
    .map((items) => unique(items))
    .compose(dropRepeats(deq))
    .compose(streamLog("MANY_CACHE_IN_DEDUPE"))
    .map(cacheFn(cacheKey, tagPlaceholder(defaultFn), makeCacheKey))
    .map(preserveTags(modelFn))
    .compose(streamLog("MANY_CACHE_RES"))
    .map((items) => items.map((item) => [getCacheKeyFromItem(item), item]))
    .compose(collectAll(accumulator))
    .compose(streamLog("MANY_CACHE_OUT"));

  const signal$ = xs.create().compose(choke(1));

  const api$ = source$
    .map(cacheExcludeFn(cacheKey, alwaysUpdate))
    .map((items) => unique(items))
    .compose(dropRepeats(deq))
    .map((items) => excludeInProgress(inProgress, items))
    .filter((ids) => ids.length)
    // chunk gathers and re-slices incoming id arrays into fixed sized chunks
    .compose(chunk(chunkSize))
    .compose(gate(signal$, timeout))
    .compose(streamLog("GET_MANY_API_REQ"))
    .compose(tee((items) => {
      markInProgress(inProgress, items);
      activity$.post(1);
    }))
    .compose(byIds(apiFn))
    .compose(asyncStream)
    .compose(streamLog("GET_MANY_API_RES"))
    .compose(tee((items) => {
      markComplete(inProgress, items.map(getCacheKeyFromItem));
      activity$.post(-1);
    }))
    // after byIds resolves, stick all the items in the cache
    .compose(tee(toCacheMany(cacheKey)))
    .map((items) => items.map((item) => [getCacheKeyFromItem(item), item]))
    .compose(collectAll(accumulator))
    .compose(dropChoke(CHOKE_TIME))
    .compose(streamLog("GET_MANY_OUT"));

  signal$.imitate(api$);

  const sink$ = xs.merge(cache$, api$);

  return {
    post: (args) => {
      const out$ = itemSink$
        .compose(takeKeys(args))
        .compose(streamLog("GET_MANY_FINAL"));
      // make sure we don't start the request until after binding happens
      setTimeout(() => source$.post(args), 1);
      return out$;
    },
    sink$,
    source$,
    pending$: activity$.fold((acc, cur) => acc + cur, 0),
  };
}
