/**
 * Dynamic page data stream.
 *
 * Provides `getMany` interface for fetching pages by `[pageId]`.
 *
 * Provides `get` interface for fetching page by `pageId`.
 *
 * Provides `getSlug` interface for fetching page by `pageSlug`.
 *
 * @module data/dynamic-page
 * @category Data Streams
 * @subcategory Page
 */
import xs from "xstream";
import { bySlug, allById } from "api/page";
import { count as countPages } from "api/v2/dynamic-page";
import { makePage } from "model/dynamic-page";
import {
  pageTypesRequiringMetadata,
  pageTypesRequiringCourseData,
  pageTypesRequiringSearchData,
  pageTypes,
} from "model/dynamic-page/constants";
import { CACHE_PAGE_KEY_V2 as CACHE_KEY } from "cache/constants";
import courseStream from "data/course";
import playlistStream from "data/metadata/playlist";
import abstractStream from "data/metadata/abstract";
import documentStream from "data/metadata/document";
import videoStream from "data/metadata/video";
import countPipe from "data/pipeline/count";
import many from "data/pipeline/many";
import single from "data/pipeline/single";
import { mapSpread, streamLog, tee } from "data/stream/compose";
import { merge } from "util/object";
import hashmap from "util/hash-map";
import { metadataTypes } from "model/metadata/constants";

const itemMap = hashmap();
const countMap = hashmap();

const defaultFn = (id) => merge(makePage({ id }), { PLACEHOLDER: true });

const defaultSlugFn = (slug) => merge(makePage({ slug }), { PLACEHOLDER: true });

const modelFn = (partial) => makePage(partial);

const bySlugs = async (slugs) => Promise.all(slugs.map((slug) => bySlug(slug, true)));

const all$ = xs.create();

const ready$ = xs.create();

const pagesListening = new Map();
const pagesReady = new Set();

const {
  post: getSlug,
  sink$: getSlugSink$,
} = single({
  apiFn: bySlugs,
  defaultFn: defaultSlugFn,
  modelFn,
  getCacheKeyFromItem: (obj) => obj.slug,
  cacheKey: CACHE_KEY,
  accumulator: itemMap,
});

const {
  post: get,
  sink$: getSink$,
} = single({
  apiFn: allById,
  defaultFn,
  modelFn,
  cacheKey: CACHE_KEY,
  accumulator: itemMap,
});

const {
  post: getMany,
  sink$: getManySink$,
} = many({
  apiFn: allById,
  defaultFn,
  modelFn,
  cacheKey: CACHE_KEY,
  accumulator: itemMap,
  itemSink$: all$,
});

const {
  post: count,
  sink$: countSink$,
} = countPipe({
  apiFn: countPages,
  defaultFn: () => 0,
  modelFn: (r) => r,
  cacheKey: CACHE_KEY,
  accumulator: countMap,
});

const out$ = xs.merge(
  getSink$,
  getSlugSink$,
  getManySink$,
);

out$.compose(tee((pages) => {
  [...pages.values()].forEach((page) => {
    if (!page.PLACEHOLDER && !pagesReady.has(page.id)) {
      pagesReady.add(page.id);
      ready$.shamefullySendNext(new Set(pagesReady));
    }
  });
}));

all$.imitate(out$);
all$.compose(streamLog("DYNAMIC_PAGE_ALL$"));

const itemReady = (itemId) => {
  const pageIds = pagesListening.get(itemId);
  if (pageIds) {
    [...pageIds].forEach((id) => pagesReady.add(id));
    pagesListening.delete(itemId);
    ready$.shamefullySendNext(new Set(pagesReady));
  }
};

const pageIsListening = (pageId, itemId) => {
  const pages = pagesListening.get(itemId);
  if (pages) pages.add(pageId);
  else pagesListening.set(itemId, new Set([pageId]));
};

const getCourses = () => {
  xs.merge(all$)
    .compose(mapSpread)
    .filter((page) => page && pageTypesRequiringCourseData.has(page.type))
    .compose(streamLog("COURSE_SECTIONS_ITEMS_IN"))
    .compose(tee((page) => pageIsListening(page.id, page.itemId)))
    .map((page) => page.itemId)
    .filter((id) => !!id)
    .compose(streamLog("COURSE_SECTIONS_ITEMS_ID"))
    .map((id) => courseStream.get(id))
    .flatten()
    .compose(tee((c) => {
      // notify when pages that want courses can have them
      if (c?.id && !c?.PLACEHOLDER) {
        itemReady(c.id);
      }
    }))
    .compose(streamLog("COURSE_SECTIONS_ITEMS_OUT"));
  return courseStream.all$;
};

const getPlaylists = () => {
  all$.addListener({
    next: (pages) => {
      const listIds = new Set();
      [...pages.values()].forEach((page) => {
        if (page.itemId && pageTypesRequiringMetadata.has(page.type)) {
          listIds.add(page.itemId);
          pageIsListening(page.id, page.itemId);
        }
      });
      if (listIds.size) {
        playlistStream.getMany([...listIds]);
      }
    },
  });

  return playlistStream.all$;
};

// notify when pages that want playlists can have them
playlistStream.all$.compose(tee((lists) => {
  [...lists.values()].forEach((list) => {
    itemReady(list.id);
  });
}));

const getMetadataForEmbeddedItems = () => {
  all$.addListener({
    next: (pages) => {
      const videoIds = new Set();
      const documentIds = new Set();
      [...pages.values()].forEach((page) => {
        if (page.itemId && page.type === pageTypes.EMBEDDED_ITEM) {
          if (page.text === metadataTypes.VIDEO) videoIds.add(page.itemId);
          if (page.text === metadataTypes.DOCUMENT) documentIds.add(page.itemId);
        }
      });
      if (videoIds.size) videoStream.getMany([...videoIds]);
      if (documentIds.size) documentStream.getMany([...documentIds]);
    },
  });

  return {
    document$: documentStream.all$,
    video$: videoStream.all$,
  };
};

// get BE search results
const getSearches = () => {
  all$.addListener({
    next: (pages) => {
      const searches = hashmap();
      [...pages.values()].forEach((page) => {
        if (
          page.searchData
          && Object.keys(page.searchData)?.length
          && pageTypesRequiringSearchData.has(page.type)
        ) {
          searches.set(page.searchData, page.searchData);
          pageIsListening(page.id, searches.getHash(page.searchData));
        }
      });
      if (searches.size) {
        [...searches.values()].forEach((q) => abstractStream.search([q, 0]));
      }
    },
  });
  return abstractStream.search$;
};

// notify when pages that want search results can have them
abstractStream.search$.compose(tee((results) => {
  [...results.values()].forEach((result) => {
    itemReady(result.id);
  });
}));

export default {
  count,
  get,
  getSlug,
  getMany,
  getCourses,
  getPlaylists,
  getSearches,
  getMetadataForEmbeddedItems,
  all$,
  count$: countSink$.compose(streamLog("PAGE_COUNT$")),
  ready$,
};
