/**
 * General purpose composers for data streams.
 *
 * @module data/stream/compose
 */
import xs from "xstream";
import log from "log";
import { DEBUG } from "data/stream/constants";
import hashmap from "util/hash-map";

// just log the error by default if error stream isn't subscribed to
const logError = (err) => log.error(err);

// shorthand for creating a composer
export const composer = (next, errCb = logError) => (in$) => xs.create({
  start(listener) {
    this.composerObj = {
      next: next(listener),
      error(e) {
        errCb(e);
        listener.error(e);
      },
    };
    in$.addListener(this.composerObj);
  },
  stop() {
    in$.removeListener(this.composerObj);
  },
});

/**
 * In: `--[a,b,c]---[d,e,f]---[g,h,i]--`
 * Out: `--a-b-c---d-e-f---g-h-i--`
 *
 * @function spread
 */
export const spread = (in$) => xs.create({
  start(listener) {
    this.listen = {
      next: (arr) => {
        arr.forEach((v) => listener.next(v));
      },
    };
    in$.addListener(this.listen);
  },
  stop() {
    in$.removeListener(this.listen);
  },
});

export const chunk = (size) => (in$) => xs.create({
  start(listener) {
    this.listen = {
      next: (arr) => {
        for (let i = 0; i < arr.length; i += size) {
          const ch = arr.slice(i, i + size);
          listener.next(ch);
        }
      },
    };
    in$.addListener(this.listen);
  },
  stop() {
    in$.removeListener(this.listen);
  },
});

export const mapSpread = composer(
  (listener) => (map) => [...map.values()].forEach((v) => listener.next(v)),
);

export const asyncStream = (in$) => xs.create({
  start: (listener) => {
    in$.addListener({
      next: async (promise) => {
        try {
          listener.next((await promise));
        } catch (err) {
          listener.error(err);
        }
      },
    });
  },
  stop: () => {},
});

const defaultTeeErrorHandler = (e) => log.error(e);

export const tee = (next, error = defaultTeeErrorHandler) => (in$) => {
  in$.addListener({ next, error });
  return in$;
};

/**
 * Flattens nested arrays
 */
export const flatten = (depth) => (in$) => in$
  .map((marble) => marble.flat(depth));

const formatMarble = (marble) => {
  if (marble?.doNotMergeOrClone) { // this means a hash map for now... crappy way to do this
    const obj = {};
    marble.forEach(([k, v]) => { obj[marble.getHash(k)] = v; });
    return ["HashMap", marble.iteration, obj];
  }
  return [marble];
};

/**
 * Logs whatever is happening in the stream, with optional label, if DEBUG (above) is true.
 */
export const streamLog = (label) => tee((marble) => DEBUG && log.debug(new Date().toISOString(), `DATA STREAM : ${label} ::`, ...formatMarble(marble)));

/**
 * Adds new `pair: [key, value]` into a collection.
 */
export const collect = (collection) => composer((listener) => (pair) => {
  if (!(pair instanceof Array) || pair.length !== 2) throw new Error("collect expected a key/value pair");
  collection.set(...pair);
  listener.next(hashmap(collection));
});

/**
 * Collects all entries in `mapLike` to `collection`.
 */
export const collectAll = (collection) => composer((listener) => (mapLike) => {
  if (mapLike.keys && mapLike.entries) collection.update(mapLike);
  else throw new Error("collectAll expected a value resembling a map");

  listener.next(hashmap(collection));
});

/**
 * Take items from a collection by complex key.
 */
export const takeKeys = (keys) => composer((listener) => (collection) => {
  const newCol = hashmap();
  keys.forEach((key) => {
    if (collection.has(key)) newCol.set(key, collection.get(key));
  });
  listener.next(newCol);
});

/**
 * Take items from a collection by complex key.
 */
export const takeKey = (key) => composer((listener) => (collection) => {
  const newCol = hashmap();
  if (collection.has(key)) newCol.set(key, collection.get(key));
  listener.next(newCol);
});

/**
 * Takes several streams of ints of ints and returns stream of sums `--3---`
 */
export const sumStream = (...in$s) => xs.combine(...in$s)
  .map((ins) => ins.reduce((acc, cur) => acc + cur, 0));
