/**
 * Compose functions that deal with buffering streams (batching, fixed-size chunking, slowing, etc).
 *
 * @module data/stream/buffer
 */
import log from "log";
import xs, { Stream } from "xstream";
import deq from "deep-equal";
import { DEBUG } from "data/stream/constants";
import { uuidv4 } from "util/generator";

const dbg = (label, ...stuff) => {
  if (DEBUG) log.debug(`BUFFER : ${label} ::`, ...stuff);
};

/**
 * Collects items into a buffer until the buffer size or time limit is reached.
 *
 * @param {int} maxItems maximum number of items collected before emitting
 * @param {int} maxMs max time in milliseconds before emitting
 * @emits {mixed[]} items collected in buffer
 */
export const batch = (maxItems = 10, maxMs = 1) => (in$) => {
  let beat = 0;
  let lastBeat = beat;
  let lastItem = null;
  let timeout = null;
  let buffer = [];

  const beat$ = xs.createWithMemory().startWith(lastBeat);

  const startTimeout = () => {
    clearTimeout(timeout);
    timeout = setTimeout(() => {
      beat++;
      beat$.shamefullySendNext(beat);
    }, maxMs);
  };

  const zipped$ = xs.combine(beat$, in$);

  return xs.create({
    start: (listener) => {
      zipped$.addListener({
        next: ([b, i]) => {
          if (!deq(lastItem, i)) {
            buffer.push(i);
            startTimeout();
            lastItem = i;
          }
          if ((lastBeat !== b || buffer.length >= maxItems)) {
            listener.next(buffer);
            clearTimeout(timeout);
            timeout = null;
            buffer = [];
          }
          lastItem = i;
          lastBeat = b;
        },
        error: (err) => log.error("batch stream error", err),
      });
    },
    stop: () => {
      beat$.stop();
      zipped$.stop();
    },
  });
};

/**
 * Collects items from a stream of arrays into fixed sized chunks.
 *
 * Emits when a chunk is full or time limit is reached.
 *
 * `timeLimit` can be set to `Infinity` to prevent chunk from ever emitting before full.
 *
 * @param {int} [chunkSize=0] maximum number of items collected before emitting
 * @param {int} [timeLimit=10] max time in milliseconds before emitting
 * @emits {mixed[]} items collected in buffer
 */
export const chunk = (chunkSize = 10, timeLimit = 10) => (in$) => {
  let lastItem = null;
  let buffer = [];

  return xs.create({
    start: (listener) => {
      let timeout = null;

      const startTimeout = () => {
        if (timeout !== null || timeLimit === Infinity) return;
        timeout = setTimeout(() => {
          timeout = null;
          /* eslint-disable-next-line no-use-before-define */
          send();
        }, timeLimit);
      };

      const send = () => {
        if (buffer.length) {
          dbg("CHUNK", "sent chunk of size", buffer.length);
          listener.next(buffer);
          buffer = [];
        }
        clearTimeout(timeout);
        timeout = null;
      };

      in$.addListener({
        next: (i) => {
          if (i.length && !deq(lastItem, i)) {
            const bufRemain = chunkSize - buffer.length;
            if (i.length <= bufRemain) {
              // fits in buffer, let's hold it
              buffer = buffer.concat(i);
              if (buffer.length >= chunkSize) {
                send();
              }
            } else {
              // first fill what's left of any previous buffer
              let remaining = i.slice(0);
              dbg("CHUNK", "incoming", remaining.length, "overflows buffer", buffer.length);
              buffer = buffer.concat(i.slice(0, bufRemain));
              send();
              // now deal with the remaining items
              const remainder = remaining.length - bufRemain;
              remaining = remaining.slice(-remainder);
              dbg("CHUNK", "took", bufRemain, "left", remaining.length, "going into loop");
              while (remaining.length) {
                // doesn't fit in buffer, send several chunks
                buffer = remaining.slice(0, chunkSize);
                dbg("CHUNK", "had remaining", remaining.length, "sent batch of", buffer.length);
                send();
                remaining = remaining.slice(chunkSize);
              }
            }
            if (buffer.length) startTimeout();
          }
          if (i.length) lastItem = i;
        },
        error: (err) => log.error("batch stream error", err),
      });
    },

    stop: () => {},
  });
};

/**
 * Chokes the stream down so events are emitted at most once per (ms)
 * without dropping any of the input values.
 *
 * Goes to sleep if it hasn't received an event from in$ for (ms x 5), and
 * awakens again when it first receives an event.
 *
 * @function choke
 * @extends Composer
 * @param {int} ms millisecond delay time
 */
export const choke = (ms) => (in$) => {
  const SLEEP_TIME = 5;
  let count = 0;
  let lastBeat = count;
  let lastItem = null;
  const beat$ = xs.create();
  const buffer = [];

  let sleepCountdown = SLEEP_TIME;
  let interval = null;

  const startBeat = () => {
    sleepCountdown = SLEEP_TIME;
    if (interval !== null) return;
    interval = setInterval(() => beat$.shamefullySendNext(count++), ms);
  };
  const stopBeat = () => {
    sleepCountdown = SLEEP_TIME;
    if (interval !== null) {
      clearInterval(interval);
      interval = null;
    }
  };

  const zipped$ = xs.combine(beat$, in$);
  startBeat();

  return xs.create({
    start: (listener) => {
      zipped$.addListener({
        next: ([b, i]) => {
          if (!deq(lastItem, i)) {
            buffer.push(i);
            startBeat();
            lastItem = i;
          }
          if (lastBeat !== b && buffer.length) {
            listener.next(buffer.shift());
          } else {
            sleepCountdown--;
          }
          if (sleepCountdown <= 0) {
            stopBeat();
          }
          lastItem = i;
          lastBeat = b;
        },
      });
    },
    stop: () => {
      beat$.stop();
      zipped$.stop();
    },
  });
};

/**
 * Buffers in$, emitting at most one marble each time signal$ emits, or after a
 * time limit is reached. FIFO order.
 *
 * Some things to note:
 * - The buffer starts in `open` state, meaning the first marble is immediately emitted.
 * - If a signal is received while the buffer is empty the next marble will be
 * emitted immediately upon receipt, and then the timeout will restart.
 * - If you don't set a time limit and the signal stream stops the stream will
 * be held indefinitely.
 *
 * Useful for ensuring only one API request of a type is sent at a time among other things.
 *
 * @function gate
 * @extends Composer
 * @param {Stream} signal$ event stream controlling events from buffer$
 * @param {int} timeLimit max millisecond delay between marbles when signal$ hasn't emitted
 * @emits {mixed} values from signal$
 */
export const gate = (signal$, timeLimit = Infinity) => (in$) => {
  let lastSignal = -1;
  let lastItem = null;
  const buffer = [];
  let open = false;

  const zipped$ = xs.combine(signal$.fold((acc) => acc + 1, 0), in$);

  return xs.create({
    start: (listener) => {
      let timeout = null;

      const startTimeout = () => {
        dbg("GATE", "starting timeout");
        if (timeout !== null || timeLimit === Infinity) return;
        timeout = setTimeout(() => {
          dbg("GATE", "timed out");
          timeout = null;
          open = true; // will be set false again by send()
          /* eslint-disable-next-line no-use-before-define */
          send();
        }, timeLimit);
      };

      const send = () => {
        if (open && buffer.length) {
          const item = buffer.shift();
          dbg("GATE", "sending", item, "remaining", buffer.length);
          listener.next(item);
          clearTimeout(timeout);
          timeout = null;
          // note the buffer is only closed when something has been sent!
          open = false;
          // if there's another item restart the timeout
          // (otherwise it should get started again when there's a new item)
        } else if (!open) {
          dbg("GATE", "gate was closed, holding remaining", buffer.length);
        }
        if (buffer.length) startTimeout();
        else dbg("GATE", "nothing to do, sleeping");
      };

      zipped$.addListener({
        next: ([s, i]) => {
          // note we don't count the signal until we have something in the buffer
          // so if the buffer is empty the signal permits the next marble to pass
          // immediately
          if (s !== lastSignal) {
            open = true;
            lastSignal = s;
          }
          if (!deq(i, lastItem)) {
            lastItem = i;
            buffer.push(i);
            dbg("GATE", "received new item", i, "remaining", buffer.length);
            send();
          }
        },
      });
    },
    stop: () => {
      zipped$.stop();
    },
  });
};

/**
 * Chokes the stream down so events are emitted at most once per (ms)
 * dropping all but the last value received.
 *
 * Goes to sleep if it hasn't received an event from in$ for (ms x 5), and
 * awakens again when it first receives an event.
 *
 * @function choke
 * @extends Composer
 * @param {int} ms millisecond delay time
 */
export const dropChoke = (ms) => (in$) => {
  if (!(in$ instanceof Stream)) throw new Error("dropChoke requires a stream");
  const id = uuidv4();
  const SLEEP_TIME = 3;
  // 0 when beat hasn't been started
  let beat = 0;
  let lastBeat = 0;
  // null when we have no new item to emit
  let nextItem = null;
  // null if we haven't emitted an item yet
  let lastItem = null;
  const beat$ = xs.create().startWith(beat);

  let sleepCountdown = SLEEP_TIME;
  let interval = null;

  const resetCounters = () => {
    sleepCountdown = SLEEP_TIME;
    lastBeat = 0;
    beat = 0;
    nextItem = null;
    lastItem = null;
  };

  const startBeat = () => {
    if (interval !== null) clearInterval(interval);
    resetCounters();
    beat$.shamefullySendNext(beat++);
    interval = setInterval(() => beat$.shamefullySendNext(beat++), ms);
  };

  const stopBeat = () => {
    resetCounters();
    if (interval !== null) {
      clearInterval(interval);
      interval = null;
    }
  };

  const dbgv = (i) => i?.iteration || i?.values?.() || i;

  const zipped$ = xs.combine(beat$, in$);
  const out$ = xs.create({
    start: (listener) => {
      zipped$.addListener({
        next: ([b, i]) => {
          dbg("DROP_CHOKE", id, " received beat", b, "lastBeat", lastBeat, "item", dbgv(i));
          // if we ran out of sleep time, go to sleep
          // beat will stop, so it will only wake if a new item comes through
          if (sleepCountdown === 0) {
            dbg("DROP_CHOKE", id, " SLEEPING");
            stopBeat();
            return;
          }
          // if we aren't currently beating, start beat
          if (interval === null && beat === 0 && i !== undefined) {
            dbg("DROP_CHOKE", id, " WAKING");
            startBeat();
          }
          // if there's no new beat that means we have a new item
          if (lastBeat === b) {
            dbg("DROP_CHOKE", id, " HOLDING", dbgv(i));
            nextItem = i;
          // there's a new beat so if there's a new next item, send it
          } else if (!deq(lastItem, nextItem)) {
            dbg("DROP_CHOKE", id, " EMITTING", dbgv(i));
            listener.next(nextItem);
            lastItem = i;
            // wake up again while we're at it
            sleepCountdown = SLEEP_TIME;
          // new beat and no new item, so count down to sleep
          } else {
            dbg("DROP_CHOKE", id, " counting down to sleep", sleepCountdown);
            sleepCountdown--;
          }
          // don't update last beat if we're asleep
          if (interval) lastBeat = b;
        },
      });
    },
    stop: () => {
      beat$.shamefullySendComplete();
      zipped$.shamefullySendComplete();
    },
  });
  return out$;
};
