/**
 * Stream sink generics.
 *
 * @module data/stream/sink
 */
import xs from "xstream";
import deq from "deep-equal";
import { mapSpread, streamLog } from "./compose";

/**
 * Callback for `sink$.then(next: fn(next))`.
 *
 * Receives updates from a stream of type `T`.
 *
 * @example
 * sink$.then((myObj) => view.update({ myObj });
 *
 * @callback {SinkNextCallback.<T>}
 * @param {T} next
 */

/**
 * Callback for `sink$.then(_, error: fn(err))`.
 *
 * Receives updates from a stream of type `T`.
 *
 * @example
 * sink$.then((myObj) => view.update({ myObj });
 *
 * @callback {SinkErrorCallback}
 * @param {Error} error
 */

/**
 * @typedef {Sink.<T>}
 * @extends Stream
 * @method then
 * @param {SinkNextCallback.<T>}
 * @param {SinkErrorCallback.<T>}
 */

/**
 * A basic sink.
 *
 * sinks are meant to be the final link in a pipeline. They provide a convenience
 * function `sink.then(next: fn(marble), error: fn(error))` for adding a listener to
 * the output of a pipeline.
 *
 * @function sink
 * @param {Stream} out$
 * @returns {Sink<T>}
 */
export const sink = (source$) => {
  const sink$ = xs.create({
    start(listener) {
      this.listen = {
        next: (marble) => listener.next(marble),
        error: (err) => listener.error(err),
      };
      source$.addListener(this.listen);
    },
    stop() {
      source$.removeListener(this.listen);
    },
  }).remember();
  sink$.then = (next, error = () => {}) => {
    sink$.addListener({ next, error });
    return sink$;
  };
  return sink$;
};

/// a sink that only forwards tagged marbles
export const taggedSink = (source$, target) => source$
  .filter(([tag]) => tag === target)
  .map((bundle) => bundle[1])
  .compose(sink);

const defaultIdMatchFn = (obj) => obj.id;

/// push id to source$ then filter out$ by same id
export const idFilterSink = (source$, sink$, id, idMapFn = defaultIdMatchFn) => {
  source$.post(id);
  return sink$.filter((item) => idMapFn(item) === id).compose(sink);
};

/// push id to source$ then filter out$ by same id
export const idMapFilterSink = (source$, sink$, id, itemMatchFn = defaultIdMatchFn) => {
  source$.post(id);
  return sink$
    .compose(streamLog("ID_MAP_FILTER_IN"))
    .compose(mapSpread)
    .compose(streamLog("ID_MAP_FILTER_SPREAD"))
    .filter((item) => item && deq(itemMatchFn(item), id))
    .compose(streamLog("ID_MAP_FILTER_OUT"))
    .compose(sink);
};

export const monoSink = (source$, sink$) => {
  setTimeout(() => source$.post(true), 1);
  return sink$.compose(sink);
};
