import { Observable, ReplaySubject, Subscription } from 'rxjs';

/**
 * Observable that calls the factory function once (upon subscribe) and shares results between observers.
 * When update() is called, observableFactory is called again, and its result is pushed to all observers.
 * Note: in order to implement an app-wide http cache, each endpoint parameter change should result in creating new DataProvider.
 * Otherwise, sharing the same DataProvider between different observers would result in all subscribers receiving
 * data relevant to a new parameter value.
 */
export class DataProvider<T> extends Observable<T> {
  constructor(private observableFactory: () => Observable<T>) {
    super(subscriber => {
      const isFirstObserver = this.serverResponseCache.observers.length === 0;
      const subscription = this.serverResponseCache.subscribe(subscriber);

      if (isFirstObserver) {
        this.update();
      }

      // Unsubscribe function
      return () => {
        subscription.unsubscribe();
        if (this.serverResponseCache.observers.length === 0) {
          this.serverResponseSubscription.unsubscribe();
        }
      };
    });
  }

  private serverResponseCache = new ReplaySubject<T>(1);
  private serverResponseSubscription: Subscription;

  /**
   * Force the data to update.
   * ObservableFactory is called and its results pushed to all observers.
   */
  update() {
    if (this.serverResponseCache.observers.length > 0) {
      this.serverResponseSubscription = this.observableFactory().subscribe(
        response => this.serverResponseCache.next(response),
        error => this.serverResponseCache.error(error)
      );
    }
  }
}
