/* eslint-disable @typescript-eslint/no-explicit-any */
import { Injectable, Injector, OnDestroy } from '@angular/core';
import { cleanUpOnEarlyUnsubscribe, findByMatch, findBySearch, switchMapByKey } from '@mri-platform/shared/core';
import {
  ChangeSet,
  DefaultDataServiceConfig,
  EntityCacheDispatcher,
  EntityCollectionServiceBase,
  EntityCollectionServiceElementsFactory,
  EntityDefinitionService,
  EntityDispatcherDefaultOptions,
  EntityMetadata,
  EntityOp,
  EntitySelectors$,
  HttpUrlGenerator,
  OP_ERROR,
  OP_SUCCESS,
  QueryParams,
  getGuid
} from '@ngrx/data';
import { EntityState } from '@ngrx/entity';
import { createSelector } from '@ngrx/store';
import { ConnectableObservable, Observable, Subject, Subscription, Unsubscribable, throwError } from 'rxjs';
import {
  distinctUntilChanged,
  filter,
  first,
  map,
  mapTo,
  mergeMap,
  mergeMapTo,
  publishReplay,
  switchMap,
  take
} from 'rxjs/operators';
import {
  EntityActionErrorLogPolicy,
  EntityActionOptions,
  NonQueryEntityActionOptions,
  QueryEntityActionOptions
} from './entity-action-options';
import { EntityIdType, isEntityNew } from './entity-functions';
import { QueryEntry, selectQueriesForHash } from './query-cache';
import { QueryDef } from './query-def';
import { SaveEntitiesChanges, changeSetFactory, isChangeSet } from './save-entities-changes';
import {
  SaveEntitiesCommand,
  SaveEntitiesCommandResult,
  toSaveEntitiesResult,
  unwrapSaveEntitiesResult
} from './save-entities-command';
import { swallowCancellationError } from './swallow-cancellation-error';
import { UpdateCommand, UpdateCommandResult, toUpdateResult, unwrapUpdateResult } from './update-command';

interface CancelationResource extends Unsubscribable {
  entityActionOptions: EntityActionOptions;
}

type CancellationCallback = (options: EntityActionOptions) => void;

type CrudWriteActionOp = 'Create' | 'Update' | 'Delete' | 'SaveMany';
type CrudActionOp = 'Create' | 'Read' | 'Update' | 'Delete' | 'SaveMany';

const ngrxDataDefaultDispatcherOptions = new EntityDispatcherDefaultOptions();

@Injectable()
export class DefaultEntityService<T, S$ extends EntitySelectors$<T> = EntitySelectors$<T>>
  extends EntityCollectionServiceBase<T, S$>
  implements OnDestroy
{
  toChangeSet = changeSetFactory(this.entityName, this.selectId, this.toUpdate);

  protected entityDispatcher: EntityCacheDispatcher;
  protected entityDefinitionService: EntityDefinitionService;

  private queryCache$: Observable<EntityState<QueryEntry>> = this.store.select(
    (this.selectors as any).selectQueryCache
  );

  /**
   * use the *same* array to allow observable chain to de-dup multiple emissions
   * that result in zero matches
   */
  private readonly noMatch: T[] = [];
  private httpUrlGenerator: HttpUrlGenerator;
  private config: DefaultDataServiceConfig;
  private metadata: EntityMetadata<T>;
  private optimisticSaveEntitiesCmd = new Subject<SaveEntitiesCommand>();
  private optimisticSaveEntitiesResult$: Observable<SaveEntitiesCommandResult>;
  private optimisticUpdateCmd = new Subject<UpdateCommand<T>>();
  private optimisticUpdateResult$: Observable<UpdateCommandResult<T>>;
  private subscriptions = new Subscription();

  constructor(
    /** Name of the entity type of this collection service */
    entityName: string,
    injector: Injector
  ) {
    super(entityName, injector.get(EntityCollectionServiceElementsFactory));

    this.config = injector.get(DefaultDataServiceConfig);
    this.entityDefinitionService = injector.get(EntityDefinitionService);
    this.entityDispatcher = injector.get(EntityCacheDispatcher);
    this.httpUrlGenerator = injector.get(HttpUrlGenerator);

    this.metadata = this.entityDefinitionService.getDefinition<T>(this.entityName, true).metadata;

    const updateResult$ = this.createOptimisticUpdateResult$(this.optimisticUpdateCmd);
    this.optimisticUpdateResult$ = updateResult$;
    // start the observable to listen to optimisticUpdateCmd notifications and process them as
    // they arrive to produce results
    this.subscriptions.add(updateResult$.connect());

    const saveEntitiesResult$ = this.createOptimisticSaveEntitiesResult$(this.optimisticSaveEntitiesCmd);
    this.optimisticSaveEntitiesResult$ = saveEntitiesResult$;
    // start the observable to listen to optimisticSaveEntitiesCmd notifications and process them as
    // they arrive to produce results
    this.subscriptions.add(saveEntitiesResult$.connect());
  }

  ngOnDestroy(): void {
    if (this.subscriptions) {
      this.subscriptions.unsubscribe();
    }
  }

  add(entity: T, options?: NonQueryEntityActionOptions): Observable<T> {
    const { requestOptions, ...finalOptions } = this.getWriteEntityActionOptions('Create', options);

    if (requestOptions) {
      entity = Object.setPrototypeOf({ ...entity }, { requestOptions });
    }

    const action$ = this.executeWithAutoCancellation(
      o => super.add(this.prepareForInsert(entity), o),
      o => this.cancel(o.correlationId),
      finalOptions
    );
    return action$.pipe(swallowCancellationError(!finalOptions.throwOnCancel));
  }

  /**
   * Dispatch action to delete entity from remote storage by key.
   *
   * By default this will issue a server request:
   *
   * `DELETE {baseUrl}/{pluralizedentityname}/{id}`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param key The entity to delete
   * @param [options] options that influence save and merge behavior
   * @returns Observable of the deleted key
   * after server reports successful save or the save error.
   */
  delete(entity: T, options?: NonQueryEntityActionOptions): Observable<number | string>;
  /**
   * Dispatch action to delete entity from remote storage by key.
   *
   * By default this will issue a server request:
   *
   * `DELETE {baseUrl}/{pluralizedentityname}/{id}`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param key The primary key of the entity to remove
   * @param [options] options that influence save and merge behavior
   * @returns Observable of the deleted key
   * after server reports successful save or the save error.
   */
  delete(key: number | string, options?: NonQueryEntityActionOptions): Observable<number | string>;
  delete(arg: number | string | T, options?: NonQueryEntityActionOptions): Observable<number | string> {
    const finalOptions = this.getWriteEntityActionOptions('Delete', options);
    const action$ = this.executeWithAutoCancellation(
      o => super.delete(arg as any, o),
      o => this.cancel(o.correlationId),
      finalOptions
    );
    return action$.pipe(swallowCancellationError(!finalOptions.throwOnCancel));
  }

  /**
   * Like `delete` but returns an Observable of the same `entity` supplied
   * @see delete
   */
  deleteEntity(entity: T, options?: EntityActionOptions): Observable<T> {
    return this.delete(entity, options).pipe(mapTo(entity));
  }

  /**
   * Return an observable that will emit every time a change occurs
   * to the entity in the collection that matches `id` or whenever
   * the the `id` does not match
   * @returns An Observable of either the entity matching the `id` or undefined
   */
  entityById$(id: EntityIdType) {
    return this.store.select(this.selectors.selectEntityMap).pipe(
      map(entitiesMap => entitiesMap[id]),
      distinctUntilChanged()
    );
  }

  /**
   * Return an observable that will emit every time entity collection
   * changes and will return those entities matching the `query` criteria
   * supplied
   *
   * * Where the `query` is an object or a `QueryDef` built from an query params object,
   *   perform a partial deep case-insensitive comparison between each entity and the
   *   `query` object to determine if that entity contains equivalent property values
   *   (@see `findByMatch`)
   * * where the `query` is a `QueryDef` built from a searchTerm string,
   *   use the `filterFn` registered as model metadata for the entity to find matches
   * * where the `query` is a `QueryDef` built from a searchTerm string,
   *   and where a `filterFn` has not been registered then perform
   *   a case insensitive "contains" search on every string field looking for
   *   matches (@see `findBySearch`)
   * * where the `query` is a `QueryDef` built from a key value, use the `selectId`
   *   function registered for the entity to determine a match
   *
   * @param query query criteria to match against collection entities
   */
  entitiesByQuery$(query: QueryDef<T> | Partial<T>): Observable<T[]> {
    const queryDef = QueryDef.from(query);

    if (queryDef.isGetByKey) {
      return this.entityById$(queryDef.query as EntityIdType).pipe(
        map(found => (found ? [found] : this.noMatch)),
        distinctUntilChanged()
      );
    }

    if (queryDef.isGetAll || queryDef.isLoadAll) {
      return this.entities$;
    }

    if (queryDef.isGetWithQuery) {
      const predicate = this.getFilterFunc(queryDef.query as any);
      const selectByQuery = createSelector(this.selectors.selectEntities, entities => {
        const matches = predicate(entities);
        return matches.length > 0 ? matches : this.noMatch;
      });
      return this.store.select(selectByQuery);
    }

    throw new Error('unsupported QueryDef');
  }

  /**
   * @description
   * Execute an ngrx-data persistence action that should be cancelled whenever the
   * the Observable returned is unsubscribed before it has completed or errored
   *
   * @param action the ngrx-data persistence action to execute
   * @param options any options that should be supplied to the `action`
   */
  protected executeWithAutoCancellation<R = T>(
    action: (options: EntityActionOptions) => Observable<R>,
    cancellationCallback: CancellationCallback,
    options: EntityActionOptions = {}
  ) {
    const {
      // remove all options from ngrx-data action payload that doesn't need to be sent to the store
      cancelOnEarlyUnsubscribe,
      throwOnCancel: _throwOnCancel,
      // keep only those options that ngrx-data will actually use to process the store
      ...opts
    } = options;

    if (cancelOnEarlyUnsubscribe === false) {
      return action(opts);
    }

    const action$ = cleanUpOnEarlyUnsubscribe(
      () => this.createCancelation(cancellationCallback, opts),
      ({ entityActionOptions }: any) => action(entityActionOptions)
    ).pipe(publishReplay(1)) as ConnectableObservable<R>;

    // make sure the entity action is dispatched immediately, even if someone does NOT subscribe
    // in this way we are keeping to the original behavior of ngrx-data
    const connection = action$.connect();

    return new Observable<R>(subscriber => {
      const consumerSub = action$.subscribe(subscriber);
      return () => {
        // "link" the life-time of our consumers' subscription with the life of
        // action$; whenever our consumer unsubscribes, then cause the action$
        // `connection` subscription to also be unsubscribed.
        // in this way we allow for an early unsubscribe by our consumer to trigger the
        // cancelation of the ngrx-data persistence action
        if (consumerSub) {
          consumerSub.unsubscribe();
        }
        connection.unsubscribe();
      };
    });
  }

  /**
   * Dispatch action to query remote storage for all entities and merge the queried entities
   * into the cached collection.
   *
   * By default this will issue a server request:
   *
   * `GET {baseUrl}/{pluralizedentityname}/`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param [options] options that influence merge behavior
   * @returns Observable of the collection
   * after server reports successful query or the query error.
   * @see load()
   */
  getAll(options?: QueryEntityActionOptions): Observable<T[]> {
    const finalOptions = this.getReadEntityActionOptions(options);
    const action$ = this.executeWithAutoCancellation(
      o => super.getAll(o),
      o => this.cancel(o.correlationId),
      finalOptions
    );
    return action$.pipe(swallowCancellationError(!finalOptions.throwOnCancel));
  }

  /**
   * Dispatch an action to query remote storage for the entity with this primary key.
   * If the server returns an entity, merge it into the cached collection.
   *
   * By default this will issue a server request:
   *
   * `GET {baseUrl}/{pluralizedentityname}/{id}`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param key The primary key of the entity to get.
   * @param [options] options that influence merge behavior
   * @returns Observable of the queried entity that is in the collection
   * after server reports success or the query error.
   */
  getByKey(key: any, options?: QueryEntityActionOptions): Observable<T> {
    const finalOptions = this.getReadEntityActionOptions(options);
    const action$ = this.executeWithAutoCancellation(
      o => super.getByKey(key, o),
      o => this.cancel(o.correlationId),
      finalOptions
    );
    return action$.pipe(swallowCancellationError(!finalOptions.throwOnCancel));
  }

  protected getAutoUndoOnErrorOption(
    crudActionOp: CrudWriteActionOp,
    options: NonQueryEntityActionOptions | undefined
  ) {
    if (options?.autoUndoOnError != null) {
      return options.autoUndoOnError;
    }

    return this.isOptimisticDispatch(crudActionOp, options);
  }

  protected getCancelOnEarlyUnsubscribeOption(crudActionOp: CrudActionOp, options: EntityActionOptions | undefined) {
    if (options?.cancelOnEarlyUnsubscribe != null) {
      return options.cancelOnEarlyUnsubscribe;
    }

    // for optimistic UI update pattern we do NOT want to cancel the HTTP
    // request that is persisting the entity changes if our consumer happens to
    // unsubscribe from the observable returned by save/delete
    return crudActionOp === 'Read' ? true : !this.isOptimisticDispatch(crudActionOp, options);
  }

  protected getErrorLogPolicyOption(
    crudActionOp: CrudWriteActionOp,
    options: NonQueryEntityActionOptions | undefined
  ): EntityActionErrorLogPolicy {
    if (options?.errorLogPolicy != null) {
      return options.errorLogPolicy;
    }

    // by default for optimistic UI update pattern no one is likely to
    // be listening to errors returned by the server as by its nature
    // the write to the server is happening even when the page that triggered
    // that write has closed
    return this.isOptimisticDispatch(crudActionOp, options) ? 'log' : 'none';
  }

  protected getReadEntityActionOptions(options: QueryEntityActionOptions | undefined): QueryEntityActionOptions {
    return {
      ...options,
      cancelOnEarlyUnsubscribe: this.getCancelOnEarlyUnsubscribeOption('Read', options)
    };
  }

  protected getWriteEntityActionOptions(
    crudActionOp: CrudWriteActionOp,
    options: NonQueryEntityActionOptions | undefined
  ): NonQueryEntityActionOptions {
    const cancelOnEarlyUnsubscribe = this.getCancelOnEarlyUnsubscribeOption(crudActionOp, options);
    if (cancelOnEarlyUnsubscribe && this.isOptimisticDispatch(crudActionOp, options)) {
      throw new Error('`cancelOnEarlyUnsubscribe` enabled is not supported with optimistic action dispatch');
    }

    return {
      ...options,
      autoUndoOnError: this.getAutoUndoOnErrorOption(crudActionOp, options),
      cancelOnEarlyUnsubscribe,
      errorLogPolicy: this.getErrorLogPolicyOption(crudActionOp, options)
    };
  }

  protected getFilterFunc(query: string | Partial<T>) {
    if (typeof query === 'string') {
      const filterFn = this.metadata.filterFn;
      if (filterFn) {
        return (entities: T[]) => filterFn(entities, query);
      } else {
        return (entities: T[]) => findBySearch<T>()(entities, query);
      }
    } else {
      return findByMatch<T>(query);
    }
  }

  /**
   * Dispatch action to query remote storage for the entities that satisfy a query expressed
   * with either a query parameter map or an HTTP URL query string,
   * and merge the results into the cached collection.
   *
   * @example
   * ```
   * getWithQuery('foo=bar')
   * ```
   * The above will be sent to the server as follows:
   *
   * `GET {baseUrl}/{pluralizedentityname}/?foo=bar`
   *
   * @example
   * ```
   * getWithQuery({ firstName: 'Christian', age: '46' })
   * ```
   * The above will be sent to the server as follows:
   *
   * `GET {baseUrl}/{pluralizedentityname}/?firstName=Christian&age=46`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param queryParams the query in a form understood by the server
   * @param [options] options that influence merge behavior
   * @returns Observable of the queried entities
   * after server reports successful query or the query error.
   */
  getWithQuery(queryParams: QueryParams | string, options?: QueryEntityActionOptions): Observable<T[]> {
    const finalOptions = this.getReadEntityActionOptions(options);
    const action$ = this.executeWithAutoCancellation(
      o => super.getWithQuery(queryParams, o),
      o => this.cancel(o.correlationId),
      finalOptions
    );
    return action$.pipe(swallowCancellationError(!finalOptions.throwOnCancel));
  }

  /**
   * Call one of the `getXxx` or `load` method on this service based on the
   * `queryDef` supplied
   * @param queryDef definition of the query method to call
   * @param [options] options that influence merge behavior
   * @returns Observable of the queried entities
   * after server reports successful query or the query error.
   * This will always be an array of entities even a search by key
   */
  getWithQueryDef(queryDef: QueryDef<T>, options?: QueryEntityActionOptions): Observable<T[]> {
    if (queryDef.isGetAll) {
      return this.getAll(options);
    } else if (queryDef.isLoadAll) {
      return this.load(options);
    } else if (queryDef.isGetWithQuery) {
      if (queryDef.queryParams == null) {
        // this shouldn't be possible but make typescript happy anyway
        throw new Error('QueryDef is missing queryParams');
      }
      return this.getWithQuery(queryDef.queryParams, options);
    } else if (queryDef.isGetByKey) {
      return this.getByKey(queryDef.query, options).pipe(map(found => (found ? [found] : [])));
    } else {
      throw new Error('unsupported QueryDef');
    }
  }

  isEntityNew(entity: T) {
    return isEntityNew(entity, this.selectId);
  }

  /**
   * Dispatch action to query remote storage for all entities and completely replace the
   * cached collection with the queried entities.
   *
   * By default this will issue a server request:
   *
   * `GET {baseUrl}/{pluralizedentityname}/`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param [options] options that influence load behavior
   * @returns Observable of the collection
   * after server reports successful query or the query error.
   * @see getAll
   */
  load(options?: QueryEntityActionOptions): Observable<T[]> {
    const finalOptions = this.getReadEntityActionOptions(options);
    const action$ = this.executeWithAutoCancellation(
      o => super.load(o),
      o => this.cancel(o.correlationId),
      finalOptions
    );
    return action$.pipe(swallowCancellationError(!finalOptions.throwOnCancel));
  }

  /**
   * Override in the subclass where you need to modify the fields of the entity
   * before it is POST'ed to the server. For example, removing the primary key field
   *
   * @param entity a new entity to be inserted
   */
  protected prepareForInsert(entity: T): T {
    return entity;
  }

  /**
   * Return an observable that will emit changes to the loading status of the `query`
   * supplied
   */
  queryIsLoading$(query: QueryDef<T> | Partial<T>) {
    const queryDef = QueryDef.from(query);
    return this.queryEntriesByHash$(queryDef.queryHash).pipe(
      map(queries => queries.some(q => q.loading)),
      distinctUntilChanged()
    );
  }

  /**
   * Return an observable that will emit changes to the loaded status of the `query`
   * supplied
   */
  queryIsLoaded$(query: QueryDef<T> | Partial<T>) {
    const queryDef = QueryDef.from(query);
    return this.queryEntriesByHash$(queryDef.queryHash).pipe(
      map(queries => queries.some(q => q.loaded)),
      distinctUntilChanged()
    );
  }

  /**
   * Dispatch action to insert or update `entity` in remote storage
   *
   * By default an insert will issue a server request:
   *
   * `POST {baseUrl}/{pluralizedentityname}/`
   *
   * By default an update will issue a server request:
   *
   * `PUT {baseUrl}/{pluralizedentityname}/`
   *
   * This behaviour can be customized by creating a custom data service
   * (see guide: https://ngrx.io/guide/data/entity-dataservice)
   *
   * @param entity The entity to persist
   * @param [originalEntity] The last know server state of `entity`
   *        ie before the updates to it that are now being persisted
   * @param [options] options that influence save and merge behavior
   * @returns Observable of the inserted or updated entity
   * after server reports successful save or the save error.
   */
  save(entity: T, _originalEntity?: T, options?: NonQueryEntityActionOptions): Observable<T> {
    const crudOp: CrudActionOp = this.isEntityNew(entity) ? 'Create' : 'Update';
    if (crudOp === 'Create') {
      return this.add(entity, options);
    } else {
      return this.update(entity, options);
    }
  }

  /**
   * Dispatch action to save multiple entity changes to remote storage.
   * Important: only call if your server supports the SaveEntities protocol
   * through your EntityDataService.saveEntities method.
   * @param changes Either the entities to save, as a {SaveEntitiesChanges}, or
   * a ChangeSet that holds such changes.
   * @param [options] options such as requestOptions, tag, correlationId, isOptimistic, etc.
   * These values are defaulted if not supplied.
   * @returns A terminating Observable<ChangeSet> with data returned from the server
   * after server reports successful save OR the save error.
   * TODO: should return the matching entities from cache rather than the raw server data.
   */
  saveEntities(
    changes: SaveEntitiesChanges<T> | ChangeSet,
    options?: NonQueryEntityActionOptions
  ): Observable<ChangeSet> {
    const changeSet = isChangeSet(changes) ? changes : this.toChangeSet(changes);
    const { requestOptions, ...finalOptions } = this.getWriteEntityActionOptions('SaveMany', options);
    const url =
      requestOptions?.url || this.httpUrlGenerator.collectionResource(this.entityName, this.config.root || 'api');

    if (this.isOptimisticDispatch('SaveMany', options)) {
      // trigger the server update cancelling any inflight saveEntities command
      const cmd = {
        changeSet,
        url,
        // note: although we do not support cancelOnEarlyUnsubscribe for the result$ observable returned
        // below, we DO want that behaviour for concurrent commands to arrive to cancel ones that are inflight
        options: { ...finalOptions, cancelOnEarlyUnsubscribe: true }
      };
      this.optimisticSaveEntitiesCmd.next(cmd);

      // return the response from the server
      const result$ = this.optimisticSaveEntitiesResult$.pipe(
        filter(result => result.originalChangeSet === changeSet),
        take(1),
        unwrapSaveEntitiesResult()
      );
      return result$;
    } else {
      return this.doSaveEntities(changeSet, url, finalOptions);
    }
  }

  /**
   * Ensure the entity cache is populated with entities that match
   * the `query` supplied and return a terminating observable of
   * the matching entities
   *
   * The `query` will first be checked against an internal cache of
   * previously executed queries. On a cache miss, the query will
   * be executed against the server to populate the entity cache.
   * The query is then ALWAYS fullfilled locally against the entity
   * cache by returning the first emission from `entitiesByQuery$(query)`.
   *
   * Note: you want to make sure that the client-side
   * query will return the _same or subset_ of results that the
   * server returns. For example, you might want to allow the server
   * to fullfil part of the filtering required, getting as close to
   * the final filter result as possible. For the client-side query
   * to then apply _additional_ restrictions.
   *
   * If the client-side query is not a strict superset of the
   * predicate run on the server, then you need to implement one of
   * the following techniques instead:
   *
   * * Call this method, but ignore the results of the array returned;
   *   `switchMap` to `entities$` observable `filter`'ed by your
   *   own client-side predicate that matches the server
   * * Don't use this method, but instead call `getWithQuery` with
   *   the option of `replaceCache: true` and then `switchMap` to the
   *   `entities$` observable which is now going to match exactly the
   *   results of the server
   *
   * Both alternative techniquest have pros and cons. For example,
   * using `getWithQuery` with `replaceCache: true` you lose any benefit
   * of the local client entity cache, and you will need a real api for
   * when developing the client-side.
   *
   * @param query the query in a form understood by BOTH the server
   * and client-side
   * @param [options] options that influence merge behavior of entities
   * returned by the server
   * @returns Observable of the matching entities or the query error from the server
   *
   * @see `entitiesByQuery$`
   * @see `getWithQueryDef`
   * @see `queryIsLoaded$`
   */
  setWithQuery(query: QueryDef<T> | Partial<T>, options?: QueryEntityActionOptions) {
    const queryDef = QueryDef.from<T>(query);
    const localQueryResult$ = this.entitiesByQuery$(queryDef).pipe(first());

    return this.queryEntriesByHash$(queryDef.queryHash).pipe(
      first(),
      mergeMap(queries => {
        if (queries.some(q => q.loaded)) {
          return localQueryResult$;
        } else if (queries.some(q => q.loading)) {
          const inflightQueries = queries.filter(q => q.loading).map(q => q.id);
          const queryResponse$ = this.getQueryResponse(...inflightQueries);

          return queryResponse$.pipe(
            mergeMap(resp => {
              if (resp.payload.entityOp === EntityOp.CANCELED_PERSIST) {
                // query we've been waiting on was cancelled - run our own instead
                const opts = { ...options, correlationId: getGuid() };
                return this.getWithQueryDef(queryDef, opts).pipe(mergeMapTo(localQueryResult$));
              }
              if (resp.payload.entityOp.endsWith(OP_SUCCESS)) {
                return localQueryResult$;
              } else {
                return throwError(resp.payload.data.error);
              }
            })
          );
        } else {
          return this.getWithQueryDef(queryDef, options).pipe(mergeMapTo(localQueryResult$));
        }
      })
    );
  }

  update(entity: T, options?: NonQueryEntityActionOptions): Observable<T> {
    const { requestOptions, ...finalOptions } = this.getWriteEntityActionOptions('Update', options);

    if (requestOptions) {
      entity = Object.setPrototypeOf({ ...entity }, { requestOptions });
    }

    if (this.isOptimisticDispatch('Update', options)) {
      // trigger the server update cancelling any inflight update for the same entity
      const cmd = {
        entity,
        // note: although we do not support cancelOnEarlyUnsubscribe for the result$ observable returned
        // below, we DO want that behaviour for concurrent commands to arrive to cancel ones that are inflight
        options: { ...finalOptions, cancelOnEarlyUnsubscribe: true }
      };
      this.optimisticUpdateCmd.next(cmd);

      // return the response from the server
      const result$ = this.optimisticUpdateResult$.pipe(
        filter(result => result.originalEntity === entity),
        take(1),
        unwrapUpdateResult()
      );
      return result$;
    } else {
      return this.updateEntity(entity, finalOptions);
    }
  }

  /**
   * Alias of `save`
   * @see `save`
   */
  upsert(entity: T, options?: EntityActionOptions): Observable<T> {
    return this.save(entity, undefined, options);
  }

  queryEntriesByHash$(hash: string) {
    return this.queryCache$.pipe(map(selectQueriesForHash(hash)));
  }

  private createCancelation(
    cancellationCallback: CancellationCallback,
    options?: EntityActionOptions
  ): CancelationResource {
    const o = { correlationId: getGuid(), ...options };
    return {
      entityActionOptions: o,
      unsubscribe: () => cancellationCallback(o)
    };
  }

  private createOptimisticSaveEntitiesResult$(saveEntitiesCmd: Subject<SaveEntitiesCommand>) {
    return saveEntitiesCmd.pipe(
      switchMap(({ changeSet, url, options }) =>
        this.doSaveEntities(changeSet, url, options).pipe(toSaveEntitiesResult(changeSet))
      ),
      publishReplay(1)
    ) as ConnectableObservable<SaveEntitiesCommandResult>;
  }

  private createOptimisticUpdateResult$(updateCmd: Subject<UpdateCommand<T>>) {
    // note: this timeout is an upper ceiling to let `switchMapByKey` know when it can stop
    // tracking concurrent inflight requests
    const httpConnectionTimeoutMs = 120000;

    const updateResult$ = updateCmd.pipe(
      // switchMapByKey will cancel any inflight update for the same entity matched by id;
      // this leaves only one update for an entity to continue process at any given moment
      switchMapByKey(
        ({ entity }) => this.selectId(entity),
        ({ entity, options }) => this.updateEntity(entity, options).pipe(toUpdateResult(entity)),
        httpConnectionTimeoutMs
      ),
      publishReplay(1)
    ) as ConnectableObservable<UpdateCommandResult<T>>;
    return updateResult$;
  }

  private doSaveEntities(
    changeSet: ChangeSet,
    url: string,
    options: NonQueryEntityActionOptions
  ): Observable<ChangeSet> {
    const entityNames = Array.from(new Set<string>(changeSet.changes.map(c => c.entityName)));

    const action$ = this.executeWithAutoCancellation(
      o => this.entityDispatcher.saveEntities(changeSet, url, o),
      o => this.entityDispatcher.cancelSaveEntities(o.correlationId, undefined, entityNames),
      options
    );
    return action$.pipe(swallowCancellationError(!options.throwOnCancel));
  }

  private getQueryResponse(...queryIds: string[]) {
    return this.entityActions$.pipe(
      filter(act => {
        const { correlationId, entityName, entityOp } = act.payload;
        return (
          entityName === this.entityName &&
          queryIds.includes(correlationId) &&
          (entityOp.endsWith(OP_SUCCESS) || entityOp.endsWith(OP_ERROR) || entityOp === EntityOp.CANCELED_PERSIST)
        );
      }),
      take(1)
    );
  }

  private isOptimisticDispatch(crudActionOp: CrudWriteActionOp, options: EntityActionOptions | undefined) {
    if (options?.isOptimistic != null) {
      return options.isOptimistic;
    }

    const { entityDispatcherOptions: dispatcherOptions } = this.metadata;

    // make typescript happy!
    if (!dispatcherOptions) {
      throw new Error('Expected entityDispatcherOptions to be set at this point');
    }

    switch (crudActionOp) {
      case 'Create':
        return dispatcherOptions.optimisticAdd ?? ngrxDataDefaultDispatcherOptions.optimisticAdd;
      case 'Update':
        return dispatcherOptions.optimisticUpdate ?? ngrxDataDefaultDispatcherOptions.optimisticUpdate;
      case 'Delete':
        return dispatcherOptions.optimisticDelete ?? ngrxDataDefaultDispatcherOptions.optimisticDelete;
      case 'SaveMany':
        return dispatcherOptions.optimisticSaveEntities ?? ngrxDataDefaultDispatcherOptions.optimisticSaveEntities;
    }
  }

  private updateEntity(entity: T, options: NonQueryEntityActionOptions): Observable<T> {
    const action$ = this.executeWithAutoCancellation(
      o => super.update(entity, o),
      o => this.cancel(o.correlationId),
      options
    );
    return action$.pipe(swallowCancellationError(!options.throwOnCancel));
  }
}
