first commit
This commit is contained in:
288
node_modules/rxjs/src/internal/operators/groupBy.ts
generated
vendored
Normal file
288
node_modules/rxjs/src/internal/operators/groupBy.ts
generated
vendored
Normal file
@@ -0,0 +1,288 @@
|
||||
import { Observable } from '../Observable';
|
||||
import { innerFrom } from '../observable/innerFrom';
|
||||
import { Subject } from '../Subject';
|
||||
import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
|
||||
import { operate } from '../util/lift';
|
||||
import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber';
|
||||
|
||||
export interface BasicGroupByOptions<K, T> {
|
||||
element?: undefined;
|
||||
duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
|
||||
connector?: () => SubjectLike<T>;
|
||||
}
|
||||
|
||||
export interface GroupByOptionsWithElement<K, E, T> {
|
||||
element: (value: T) => E;
|
||||
duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
|
||||
connector?: () => SubjectLike<E>;
|
||||
}
|
||||
|
||||
export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
|
||||
|
||||
export function groupBy<T, K, E>(
|
||||
key: (value: T) => K,
|
||||
options: GroupByOptionsWithElement<K, E, T>
|
||||
): OperatorFunction<T, GroupedObservable<K, E>>;
|
||||
|
||||
export function groupBy<T, K extends T>(
|
||||
key: (value: T) => value is K
|
||||
): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
|
||||
|
||||
export function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
|
||||
|
||||
/**
|
||||
* @deprecated use the options parameter instead.
|
||||
*/
|
||||
export function groupBy<T, K>(
|
||||
key: (value: T) => K,
|
||||
element: void,
|
||||
duration: (grouped: GroupedObservable<K, T>) => Observable<any>
|
||||
): OperatorFunction<T, GroupedObservable<K, T>>;
|
||||
|
||||
/**
|
||||
* @deprecated use the options parameter instead.
|
||||
*/
|
||||
export function groupBy<T, K, R>(
|
||||
key: (value: T) => K,
|
||||
element?: (value: T) => R,
|
||||
duration?: (grouped: GroupedObservable<K, R>) => Observable<any>
|
||||
): OperatorFunction<T, GroupedObservable<K, R>>;
|
||||
|
||||
/**
|
||||
* Groups the items emitted by an Observable according to a specified criterion,
|
||||
* and emits these grouped items as `GroupedObservables`, one
|
||||
* {@link GroupedObservable} per group.
|
||||
*
|
||||
* 
|
||||
*
|
||||
* When the Observable emits an item, a key is computed for this item with the key function.
|
||||
*
|
||||
* If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new
|
||||
* {@link GroupedObservable} for this key is created and emits.
|
||||
*
|
||||
* A {@link GroupedObservable} represents values belonging to the same group represented by a common key. The common
|
||||
* key is available as the `key` field of a {@link GroupedObservable} instance.
|
||||
*
|
||||
* The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
|
||||
* returned by the element function.
|
||||
*
|
||||
* ## Examples
|
||||
*
|
||||
* Group objects by `id` and return as array
|
||||
*
|
||||
* ```ts
|
||||
* import { of, groupBy, mergeMap, reduce } from 'rxjs';
|
||||
*
|
||||
* of(
|
||||
* { id: 1, name: 'JavaScript' },
|
||||
* { id: 2, name: 'Parcel' },
|
||||
* { id: 2, name: 'webpack' },
|
||||
* { id: 1, name: 'TypeScript' },
|
||||
* { id: 3, name: 'TSLint' }
|
||||
* ).pipe(
|
||||
* groupBy(p => p.id),
|
||||
* mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))
|
||||
* )
|
||||
* .subscribe(p => console.log(p));
|
||||
*
|
||||
* // displays:
|
||||
* // [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript'}]
|
||||
* // [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack'}]
|
||||
* // [{ id: 3, name: 'TSLint' }]
|
||||
* ```
|
||||
*
|
||||
* Pivot data on the `id` field
|
||||
*
|
||||
* ```ts
|
||||
* import { of, groupBy, mergeMap, reduce, map } from 'rxjs';
|
||||
*
|
||||
* of(
|
||||
* { id: 1, name: 'JavaScript' },
|
||||
* { id: 2, name: 'Parcel' },
|
||||
* { id: 2, name: 'webpack' },
|
||||
* { id: 1, name: 'TypeScript' },
|
||||
* { id: 3, name: 'TSLint' }
|
||||
* ).pipe(
|
||||
* groupBy(p => p.id, { element: p => p.name }),
|
||||
* mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [`${ group$.key }`]))),
|
||||
* map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
|
||||
* )
|
||||
* .subscribe(p => console.log(p));
|
||||
*
|
||||
* // displays:
|
||||
* // { id: 1, values: [ 'JavaScript', 'TypeScript' ] }
|
||||
* // { id: 2, values: [ 'Parcel', 'webpack' ] }
|
||||
* // { id: 3, values: [ 'TSLint' ] }
|
||||
* ```
|
||||
*
|
||||
* @param key A function that extracts the key
|
||||
* for each item.
|
||||
* @param element A function that extracts the
|
||||
* return element for each item.
|
||||
* @param duration
|
||||
* A function that returns an Observable to determine how long each group should
|
||||
* exist.
|
||||
* @param connector Factory function to create an
|
||||
* intermediate Subject through which grouped elements are emitted.
|
||||
* @return A function that returns an Observable that emits GroupedObservables,
|
||||
* each of which corresponds to a unique key value and each of which emits
|
||||
* those items from the source Observable that share that key value.
|
||||
*
|
||||
* @deprecated Use the options parameter instead.
|
||||
*/
|
||||
export function groupBy<T, K, R>(
|
||||
key: (value: T) => K,
|
||||
element?: (value: T) => R,
|
||||
duration?: (grouped: GroupedObservable<K, R>) => Observable<any>,
|
||||
connector?: () => Subject<R>
|
||||
): OperatorFunction<T, GroupedObservable<K, R>>;
|
||||
|
||||
// Impl
|
||||
export function groupBy<T, K, R>(
|
||||
keySelector: (value: T) => K,
|
||||
elementOrOptions?: ((value: any) => any) | void | BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, R, T>,
|
||||
duration?: (grouped: GroupedObservable<any, any>) => ObservableInput<any>,
|
||||
connector?: () => SubjectLike<any>
|
||||
): OperatorFunction<T, GroupedObservable<K, R>> {
|
||||
return operate((source, subscriber) => {
|
||||
let element: ((value: any) => any) | void;
|
||||
if (!elementOrOptions || typeof elementOrOptions === 'function') {
|
||||
element = elementOrOptions as ((value: any) => any);
|
||||
} else {
|
||||
({ duration, element, connector } = elementOrOptions);
|
||||
}
|
||||
|
||||
// A lookup for the groups that we have so far.
|
||||
const groups = new Map<K, SubjectLike<any>>();
|
||||
|
||||
// Used for notifying all groups and the subscriber in the same way.
|
||||
const notify = (cb: (group: Observer<any>) => void) => {
|
||||
groups.forEach(cb);
|
||||
cb(subscriber);
|
||||
};
|
||||
|
||||
// Used to handle errors from the source, AND errors that occur during the
|
||||
// next call from the source.
|
||||
const handleError = (err: any) => notify((consumer) => consumer.error(err));
|
||||
|
||||
// The number of actively subscribed groups
|
||||
let activeGroups = 0;
|
||||
|
||||
// Whether or not teardown was attempted on this subscription.
|
||||
let teardownAttempted = false;
|
||||
|
||||
// Capturing a reference to this, because we need a handle to it
|
||||
// in `createGroupedObservable` below. This is what we use to
|
||||
// subscribe to our source observable. This sometimes needs to be unsubscribed
|
||||
// out-of-band with our `subscriber` which is the downstream subscriber, or destination,
|
||||
// in cases where a user unsubscribes from the main resulting subscription, but
|
||||
// still has groups from this subscription subscribed and would expect values from it
|
||||
// Consider: `source.pipe(groupBy(fn), take(2))`.
|
||||
const groupBySourceSubscriber = new OperatorSubscriber(
|
||||
subscriber,
|
||||
(value: T) => {
|
||||
// Because we have to notify all groups of any errors that occur in here,
|
||||
// we have to add our own try/catch to ensure that those errors are propagated.
|
||||
// OperatorSubscriber will only send the error to the main subscriber.
|
||||
try {
|
||||
const key = keySelector(value);
|
||||
|
||||
let group = groups.get(key);
|
||||
if (!group) {
|
||||
// Create our group subject
|
||||
groups.set(key, (group = connector ? connector() : new Subject<any>()));
|
||||
|
||||
// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
|
||||
// because the grouped observable has special semantics around reference counting
|
||||
// to ensure we don't sever our connection to the source prematurely.
|
||||
const grouped = createGroupedObservable(key, group);
|
||||
subscriber.next(grouped);
|
||||
|
||||
if (duration) {
|
||||
const durationSubscriber = createOperatorSubscriber(
|
||||
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
|
||||
// when the duration subscription is torn down. That is important, because then
|
||||
// if someone holds a handle to the grouped observable and tries to subscribe to it
|
||||
// after the connection to the source has been severed, they will get an
|
||||
// `ObjectUnsubscribedError` and know they can't possibly get any notifications.
|
||||
group as any,
|
||||
() => {
|
||||
// Our duration notified! We can complete the group.
|
||||
// The group will be removed from the map in the finalization phase.
|
||||
group!.complete();
|
||||
durationSubscriber?.unsubscribe();
|
||||
},
|
||||
// Completions are also sent to the group, but just the group.
|
||||
undefined,
|
||||
// Errors on the duration subscriber are sent to the group
|
||||
// but only the group. They are not sent to the main subscription.
|
||||
undefined,
|
||||
// Finalization: Remove this group from our map.
|
||||
() => groups.delete(key)
|
||||
);
|
||||
|
||||
// Start our duration notifier.
|
||||
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
|
||||
}
|
||||
}
|
||||
|
||||
// Send the value to our group.
|
||||
group.next(element ? element(value) : value);
|
||||
} catch (err) {
|
||||
handleError(err);
|
||||
}
|
||||
},
|
||||
// Source completes.
|
||||
() => notify((consumer) => consumer.complete()),
|
||||
// Error from the source.
|
||||
handleError,
|
||||
// Free up memory.
|
||||
// When the source subscription is _finally_ torn down, release the subjects and keys
|
||||
// in our groups Map, they may be quite large and we don't want to keep them around if we
|
||||
// don't have to.
|
||||
() => groups.clear(),
|
||||
() => {
|
||||
teardownAttempted = true;
|
||||
// We only kill our subscription to the source if we have
|
||||
// no active groups. As stated above, consider this scenario:
|
||||
// source$.pipe(groupBy(fn), take(2)).
|
||||
return activeGroups === 0;
|
||||
}
|
||||
);
|
||||
|
||||
// Subscribe to the source
|
||||
source.subscribe(groupBySourceSubscriber);
|
||||
|
||||
/**
|
||||
* Creates the actual grouped observable returned.
|
||||
* @param key The key of the group
|
||||
* @param groupSubject The subject that fuels the group
|
||||
*/
|
||||
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
|
||||
const result: any = new Observable<T>((groupSubscriber) => {
|
||||
activeGroups++;
|
||||
const innerSub = groupSubject.subscribe(groupSubscriber);
|
||||
return () => {
|
||||
innerSub.unsubscribe();
|
||||
// We can kill the subscription to our source if we now have no more
|
||||
// active groups subscribed, and a finalization was already attempted on
|
||||
// the source.
|
||||
--activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
|
||||
};
|
||||
});
|
||||
result.key = key;
|
||||
return result;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* An observable of values that is the emitted by the result of a {@link groupBy} operator,
|
||||
* contains a `key` property for the grouping.
|
||||
*/
|
||||
export interface GroupedObservable<K, T> extends Observable<T> {
|
||||
/**
|
||||
* The key value for the grouped notifications.
|
||||
*/
|
||||
readonly key: K;
|
||||
}
|
||||
Reference in New Issue
Block a user