/**
 * Pipeline for count requests.
 *
 * @module data/pipeline/count
 * @category DataStreams
 * @subcategory Pipelines
 */
import xs from "xstream";
import { source } from "data/stream/source";
import { makeCountModel } from "model/search";
import {
  asyncStream,
  collect,
  streamLog,
  takeKey,
  tee,
} from "data/stream/compose";
import hashmap from "util/hash-map";

export default function count({
  apiFn,
}) {
  const source$ = source();
  const activity$ = source();
  const countResults = hashmap();

  const api$ = source$
    .compose(tee(() => activity$.post(1)))
    .map(async (params) => makeCountModel({
      params,
      count: await apiFn(params),
    }))
    .compose(asyncStream)
    .compose(tee(() => activity$.post(-1)))
    .map((item) => [item.params, item])
    .compose(collect(countResults));

  const sink$ = xs.merge(api$);
  sink$.compose(streamLog("COUNT_FINAL"));

  return {
    post: (args) => {
      setTimeout(() => source$.post(args), 1);
      return sink$.compose(takeKey(args));
    },
    sink$,
    source$,
    pending$: activity$.fold((acc, cur) => acc + cur, 0),
  };
}
