/**
 * Course module data stream.
 *
 * Provides `getBatch` interface for fetching module by
 * `[courseId, moduleIds]`.
 *
 * @module data/course/module
 * @category Data Streams
 * @subcategory Course
 */
import xs from "xstream";
import { byIds } from "api/v2/course/module";
import { CACHE_COURSE_MODULE_KEY_V2 as CACHE_KEY } from "cache/constants";
import { makeCourseModule } from "model/course";
import batch from "data/pipeline/batch";
import lectureStream from "data/course/lecture";
import assessmentStream from "data/course/assessment";
import { mapSpread, streamLog, sumStream, tee } from "data/stream/compose";
import hashmap from "util/hash-map";

const itemMap = hashmap();

const all$ = xs.create();

const byIdsFlat = (args) => byIds(...args);

const defaultFn = ([courseId]) => (id) => makeCourseModule({ id, courseId });

const {
  post: getBatch,
  sink$: getBatchSink$,
  pending$: batchPending$,
} = batch({
  apiFn: byIdsFlat,
  defaultFn,
  cacheKey: CACHE_KEY,
  accumulator: itemMap,
  itemSink$: all$,
});

const pending$ = sumStream(batchPending$);

all$.imitate(getBatchSink$);
all$.compose(streamLog("MODULE_ALL$"));

/**
 * Pushes all module lecture ids from any fetched courses into the lecture stream.
 *
 * Returns the lecture `all$`.
 *
 * Useful for ensuring availability of any lectures attached to any modules fetched by
 * getModules().
 *
 * Note this stream will be silent unless `getModules()` is also called, but the call
 * order doesn't matter.
 *
 * @example
 * const courseA$ = courseStream.get(courseIdA);
 * const courseB$ = courseStream.get(courseIdB);
 * const module$ = courseStream.getModules();
 * // map of all modules in courseA and courseB
 * const lecture$ = courseStream.getLectures();
 * // map of all lectures in courseA and courseB
 *
 * @function getLecture()
 * @returns {Stream} `Stream.<Map.<UUID, CourseLecture>>`
 */
const getLectures = () => {
  const seen = new Set();
  all$
    .compose(streamLog("COURSE_MODULES_LECTURES_ALL"))
    .compose(mapSpread)
    .filter((module) => (
      module?.id
        && module?.courseId
        && module.lectureIds?.length
        && !seen.has(module.id)
    ))
    .compose(streamLog("COURSE_MODULES_LECTURES_ITEM_IN"))
    .compose(tee((module) => seen.add(module.id)))
    .map((module) => [module.courseId, module.id, module.lectureIds])
    .compose(streamLog("COURSE_MODULES_LECTURES_IN"))
    .map(lectureStream.getBatch)
    .flatten()
    .compose(streamLog("COURSE_MODULES_LECTURES_OUT"));
  return lectureStream.all$;
};

/**
 * Pushes all module assessment ids from any fetched courses into the assessment stream.
 *
 * Returns the assessment `all$`.
 *
 * Useful for ensuring availability of any assessments attached to any modules fetched by
 * getModules().
 *
 * Note this stream will be silent unless `getModules()` is also called, but the call
 * order doesn't matter.
 *
 * @example
 * const courseA$ = courseStream.get(courseIdA);
 * const courseB$ = courseStream.get(courseIdB);
 * const module$ = courseStream.getModules();
 * // map of all modules in courseA and courseB
 * const assessment$ = courseStream.getAssessments();
 * // map of all assessments in courseA and courseB
 *
 * @function getAssessments()
 * @returns {Stream} `Stream.<Map.<UUID, Assessment>>`
 */
const getAssessments = () => {
  const seen = new Set();
  all$
    .compose(streamLog("COURSE_ASSESSMENTS_ALL"))
    .compose(mapSpread)
    .filter((module) => (
      module?.id
        && module?.courseId
        && module.assessmentIds?.length
        && !seen.has(module.id)
    ))
    .compose(tee((module) => seen.add(module.id)))
    .map((module) => [module.courseId, module.id, module.assessmentIds])
    .compose(streamLog("COURSE_ASSESSMENTS_IN"))
    .map(assessmentStream.getBatch)
    .flatten()
    .compose(streamLog("COURSE_ASSESSMENTS_OUT"));
  return assessmentStream.all$;
};

export default {
  get: (...args) => getBatch([args[0], [args[1]]])
    .map((modules) => modules.get(args[1])),
  getBatch,
  getLectures,
  getAssessments,
  all$,
  pending$,
};
