/**
 * Pipeline for search requests.
 *
 * @module data/pipeline/search
 * @category DataStreams
 * @subcategory Pipelines
 */
import ohash from "object-hash";
import xs from "xstream";
import dropRepeats from "xstream/extra/dropRepeats";
import deq from "deep-equal";
import cache from "cache";
import { makeSearchModel } from "model/search";
import { source } from "data/stream/source";
import {
  asyncStream,
  collect,
  collectAll,
  mapSpread,
  streamLog,
  takeKey,
  tee,
} from "data/stream/compose";
import { gate, dropChoke } from "data/stream/buffer";
import hashmap from "util/hash-map";
import { hasAnyPlaceholders, makeCachedModelFactory } from "data/util/tags";

const CHOKE_TIME = 240; // 20 frames

const returnZero = () => Promise.resolve(0);

export default function search({
  modelFn = (partial) => partial,
  apiFn,
  countFn = null,
  cacheSearchKey,
  cacheItemsKey,
  getCacheKeyFromItem = (item) => item.id,
  accumulator = hashmap(),
  itemsAccumulator = hashmap(),
}) {
  const source$ = source();
  const activity$ = source();

  if (!cacheSearchKey) throw new Error("search stream: cacheSearchKey is required");
  if (!cacheItemsKey) throw new Error("search stream: cacheItemKey is required");

  const cachedModelFn = makeCachedModelFactory(modelFn);

  const getCachedItems = (itemKeys) => [...cache.getExpiredMapEntries(
    cacheItemsKey, itemKeys,
  ).values()].map(cachedModelFn);

  const getCachedSearch = (streamParams) => cache.getExpiredMapEntry(
    cacheSearchKey,
    ohash(streamParams),
  )?.items;

  const cacheExclude = (streamParams) => {
    const res = cache.getExpiredMapEntry(
      cacheSearchKey,
      ohash(streamParams),
    );
    return !res;
  };

  const addResultsToCache = (searchResultKey, searchResults) => {
    const pairs = searchResults.items.map((i) => [getCacheKeyFromItem(i), i]);

    cache.storeExpiringMapEntry(cacheSearchKey, ohash(searchResultKey), {
      ...searchResults,
      items: pairs.map((p) => p[0]),
    });

    cache.storeExpiringMapEntries(cacheItemsKey, pairs);
  };

  const signal$ = xs.create();

  const cache$ = source$
    .compose(streamLog("SEARCH_CACHE_IN"))
    .compose(dropRepeats(deq))
    .compose(streamLog("SEARCH_CACHE_IN_DEDUPE"))
    .map((streamParams) => {
      const [params, pageNumber] = streamParams;
      const cachedIds = getCachedSearch(streamParams);
      if (cachedIds) {
        const items = getCachedItems(cachedIds);
        if (hasAnyPlaceholders(items)) return false;
        const model = makeSearchModel({
          params,
          pageNumber,
          items,
        });
        return [streamParams, model];
      }
      return false;
    })
    .compose(streamLog("SEARCH_CACHE_OUT_PREFILTER"))
    .filter((res) => !!res)
    .compose(collect(accumulator))
    .compose(streamLog("SEARCH_CACHE_OUT"));

  const api$ = source$
    .compose(streamLog("SEARCH_API_IN"))
    .filter(cacheExclude)
    .compose(streamLog("SEARCH_API_IN_POSTFILTER"))
    .compose(tee(() => activity$.post(1)))
    .compose(gate(signal$, 300))
    .map(async ([params, pageNumber = 0]) => {
      const [items, count] = await Promise.all([
        apiFn(params),
        countFn
          ? countFn(params)
          : returnZero(),
      ]);
      const model = makeSearchModel({
        params,
        items,
        pageNumber,
        count,
      });
      const key = [params, pageNumber];
      addResultsToCache(key, model);
      return [key, model];
    })
    .compose(asyncStream)
    .compose(tee(() => activity$.post(-1)))
    .compose(collect(accumulator))
    .compose(streamLog("SEARCH_OUT"));

  signal$.imitate(api$);

  const sink$ = xs.merge(cache$, api$);
  sink$.compose(streamLog("SEARCH_FINAL"));

  const items$ = sink$
    .compose(streamLog("SEARCH_ITEMS_IN"))
    .compose(mapSpread)
    .map((searchResult) => {
      const items = hashmap(searchResult.items.map((i) => [getCacheKeyFromItem(i), i]));
      return items;
    })
    .compose(streamLog("SEARCH_ITEMS_OUT"))
    .compose(collectAll(itemsAccumulator))
    .compose(dropChoke(CHOKE_TIME))
    .compose(streamLog("SEARCH_ITEMS_FINAL"));

  return {
    post: (args) => {
      setTimeout(() => source$.post(args), 1);
      return sink$
        .compose(streamLog("SEARCH_SINK_OUT"))
        .compose(takeKey(args))
        .compose(streamLog("SEARCH_SINK_TAKEN"));
    },
    sink$,
    source$,
    items$,
    pending$: activity$.fold((acc, cur) => acc + cur, 0),
  };
}
