import xs from "xstream";
import { source } from "data/stream/source";
import {
  markInProgress,
  excludeInProgress,
  markComplete,
} from "data/stream/api";
import {
  asyncStream,
  collectAll,
  streamLog,
  takeKeys,
  tee,
} from "data/stream/compose";
import { choke, gate, dropChoke } from "data/stream/buffer";
import { toCacheMany, fromCacheMany, excludeCachedEntries } from "data/stream/cache";
import hashmap from "util/hash-map";

const CHOKE_TIME = 240; // 20 frames

const splitParents = (args) => {
  const dupe = [...args];
  const ids = dupe.pop();
  return [dupe, ids];
};

/**
 * Pipeline for batch requests to fetch child objects.
 *
 * Works with input on the source stream in the form of `[parentId, parent2Id, childIds]`.
 *
 * @example
 * const thing = { parentId: "a", id: "b", childIds: ["c", "e", "f"] };
 * const apiFn = ([parent, thing, childIds]) => api.get(parentId, thingId, childIds);
 * const child$ = batch({ apiFn }).post([thing.parentId, thing.id, thing.childIds]);
 * // child$ -> --Map([[c, cObj], [d, dObj], [e, eObj]])-|
 *
 * @function batch
 * @param {object} opts configuration options
 * @param {APIItemCallback.<T>} opts.apiFn api call like `getFn(ids: UUID[], ?skipCache: boolean)`
 * @param {?DefaultCallback.<T>} opts.defaultFn provides a default/placeholder model
 * @param {?DefaultCallback.<T>} opts.modelFn should map cache to full object
 * @param {string} opts.cacheKey cache key for an expiring map cache to store items
 * @param {?Map.<UUID, T>} opts.accumulator map of all items retrieved this session
 * @param {Stream.<Map.<UUID, T>>} opts.itemSink shared stream containing all `T`s
 * @param {int} [chunkSize=50] size of request chunks
 * @param {int} [timeout=3000] milliseconds to wait until giving up on a chunk
 * @returns {Sink.<Map.<UUID, T>>}
 */
export default function getBatch({
  apiFn,
  cacheKey,
  cacheFn = fromCacheMany,
  cacheExcludeFn = excludeCachedEntries,
  getCacheKeyFromItem = (item) => item.id,
  accumulator = hashmap(),
  defaultFn = () => (id) => ({ id }),
  itemSink$ = null,
}) {
  if (itemSink$ === null) throw new Error("batch pipeline: itemSink$ is required");
  const source$ = source();
  const activity$ = source();

  const signal$ = xs.create().compose(choke(1));

  const inProgress = new Set();

  const cache$ = source$
    .compose(streamLog("BATCH_CACHE_IN"))
    .map((args) => {
      const [parents, ids] = splitParents(args);
      if (!(ids instanceof Array)) {
        throw Error("BATCH_FINAL_CACHE_SPLIT NOT AN ID ARRAY");
      }
      return cacheFn(cacheKey, defaultFn(parents))(ids);
    })
    .map((items) => items.map((item) => [getCacheKeyFromItem(item), item]))
    .compose(collectAll(accumulator));

  const api$ = source$
    .map((args) => {
      const [parents, ids] = splitParents(args);
      const excludedIds = excludeInProgress(inProgress, cacheExcludeFn(cacheKey)(ids));
      if (excludedIds.length) return [...parents, excludedIds];
      return null;
    })
    .filter((v) => !!v)
    .compose(gate(signal$, 150))
    .compose(streamLog("BATCH_API_REQ"))
    .compose(tee((args) => {
      markInProgress(inProgress, args);
      activity$.post(1);
    }))
    .map(apiFn)
    .compose(asyncStream)
    .compose(tee((items) => {
      markComplete(inProgress, items.map(getCacheKeyFromItem));
      activity$.post(-1);
    }))
    .compose(tee(toCacheMany(cacheKey)))
    .map((items) => items.map((item) => [getCacheKeyFromItem(item), item]))
    .compose(collectAll(accumulator))
    .compose(dropChoke(CHOKE_TIME))
    .compose(streamLog("BATCH_API_OUT"));

  signal$.imitate(api$);

  const sink$ = xs.merge(cache$, api$);

  return {
    post: (args) => {
      const ids = splitParents(args)[1];
      setTimeout(() => source$.post(args), 1);
      const out$ = itemSink$
        .compose(takeKeys(ids))
        .filter((map) => map.size)
        .compose(streamLog("GET_BATCH_FINAL"));
      return out$;
    },
    sink$,
    source$,
    pending$: activity$.fold((acc, cur) => acc + cur, 0),
  };
}
