import { map, scan, publishReplay, refCount, mergeMap } from 'rxjs/operators';
import { Injectable } from '@angular/core';
import { Subject, Observable } from 'rxjs';
import { IChatMessage, IChatThread } from '../domain';
import { ChatUserService } from './chat-user.service';

interface IMessageOperation extends Function {
  (messages: IChatMessage[]): IChatMessage[];
}

@Injectable({
  providedIn: 'root',
})
export class ChatMessageService {
  messages: Observable<IChatMessage[]>;

  private updates: Subject<IMessageOperation> = new Subject();

  // add messages action
  public add: Subject<IChatMessage[] | IChatMessage> = new Subject();
  public clear: Subject<void> = new Subject();
  //public addHead: Subject<IChatMessage[] | IChatMessage> = new Subject();
  public markThreadAsRead: Subject<IChatThread> = new Subject();

  constructor() {
    this.messages = this.updates.pipe(
      scan<IMessageOperation, IChatMessage[]>((messages, operation) => {
        return operation(messages);
      }, []),
      publishReplay(1),
      refCount()
    );

    this.add
      .pipe(
        map<IChatMessage[] | IChatMessage, IMessageOperation>((newMessages) => {
          return (messages) => {
            return messages.concat(newMessages);
          };
        })
      )
      .subscribe(this.updates);

    this.clear
      .pipe(
        map(() => {
          return (messages) => [];
        })
      )
      .subscribe(this.updates);

    this.markThreadAsRead
      .pipe(
        map<IChatThread, IMessageOperation>((thread) => {
          return (messages) => {
            return messages.map((msg) => {
              if (msg.threadId == thread.id) {
                msg.seen = true;
              }
              return msg;
            });
          };
        })
      )
      .subscribe(this.updates);
  }

  fetchMessages() {}
}
