import React, { useEffect } from 'react';
import * as R from 'ramda';

import { Observable, Subject } from 'rxjs';
import * as operators from 'rxjs/operators';

export type StreamRx<T> = {
  push: keyof T extends never ? () => void : (value: T) => void;
  observable: Observable<T>;
};

export type GetStateStreamData<T> = {
  onStateReceive(state: T): void;
};

export function makeStream<T>(): StreamRx<T> {
  const subject = new Subject<T>();

  return {
    observable: subject,
    push: ((data: any) => subject.next(data)) as any,
  };
}

export function useStream<T>(getStream: () => Observable<T> | StreamRx<T>, handler: (e: T) => void, deps: React.DependencyList) {
  useEffect(() => {
    const stream = getStream();
    const observable = stream instanceof Observable ? stream : stream.observable;
    const subscription = observable.subscribe(handler);
    return () => subscription.unsubscribe();
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, deps);
}

export function getState<T>(stream: StreamRx<GetStateStreamData<T>>): T {
  let state;

  stream.push({
    onStateReceive: (s: T) => {
      state = s;
    },
  });

  if (state === undefined) {
    return {} as any;
  }

  return state;
}

export type LocalStreams<T> = {
  [K in keyof T]: T[K] extends StreamRx<infer Data> ? StreamRx<{ [DK in keyof Omit<Data, 'apiID'>]: Data[DK] }> : never;
};

export function useStreamsByApiID<T>(streams: T, apiID: string): LocalStreams<T> {
  return R.map(
    (stream: StreamRx<any>): StreamRx<any> => ({
      push: (data: any) => stream.push({ ...data, apiID }),
      observable: stream.observable.pipe(operators.filter(data => data.apiID === apiID)),
    }),
    streams as any,
  ) as any;
}
