Вопрос: Как подписаться ровно один раз на элемент из AsyncSubject (шаблон потребителя)


В rxjs5, У меня есть AsyncSubject и вы хотите подписаться на него несколько раз, но только один пользователь должен когда-либо получать next() мероприятие. Все остальные (если они еще не отписаны) должны немедленно получить complete() событие без next(),

Пример:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);

9


источник


Ответы:


Простым способом является перенос вашего AsyncSubject в другой объект, который обрабатывает логику вызова только одного абонента. Предполагая, что вы хотите вызывать только 1-го абонента, код, приведенный ниже, должен быть хорошей отправной точкой

let as = new AsyncSubject();

const createSingleSubscriberAsyncSubject = as => {
    // define empty array for subscribers
    let subscribers = [];

    const subscribe = callback => {
        if (typeof callback !== 'function') throw new Error('callback provided is not a function');

        subscribers.push(callback);

        // return a function to unsubscribe
        const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
        return unsubscribe;
    };

    // the main subscriber that will listen to the original AS
    const mainSubscriber = (...args) => {
        // you can change this logic to invoke the last subscriber
        if (subscribers[0]) {
            subscribers[0](...args);
        }
    };

    as.subscribe(mainSubscriber);

    return {
        subscribe,
        // expose original AS methods as needed
        next: as.next.bind(as),
        complete: as.complete.bind(as),
    };
};

// use

const singleSub = createSingleSubscriberAsyncSubject(as);

// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));

// remove first subscriber
unsub1();

setTimeout(() => {
    as.next(undefined);
    as.complete();
    // only 'subscriber 2' is printed
}, 500);

0



Вы можете легко реализовать это, написав небольшой класс, который обертывает начальную AsyncSubject

import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'

class SingleSubscriberObservable<T> {
    private newSubscriberSubscribed = new Subject();

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        this.newSubscriberSubscribed.next();
        return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
    }
}

Затем вы можете попробовать его в своем примере:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)

let fired = false;

function setFired(label:string){
    return ()=>{
        if(fired == true) throw new Error("Multiple subscriptions executed");
        console.log("FIRED", label);
        fired = true;
    }
}

function logDone(label: string){
    return ()=>{
       console.log(`${label} Will stop subscribing to source observable`);
    }
}

const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));

setTimeout(()=>{
    as.next(undefined);
    as.complete();
}, 500)

Ключевым моментом здесь является эта часть:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
    this.newSubscriberSusbscribed.next();
    return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

Каждый раз, когда кто-то звонит, подписывается, мы будем сигнализировать newSubscriberSubscribed предмет.

Когда мы подписываемся на лежащий в основе Observable, мы используем

takeUntil(this.newSubscriberSubscribed)

Это означает, что когда следующий абонент вызывает:

this.newSubscriberSubscribed.next()

Возвращаемое ранее будет завершено.

Таким образом, это приведет к тому, что вы спрашиваете, что означает, что предыдущая подписка завершается всякий раз, когда приходит новая подписка.

Выходом приложения будет:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

РЕДАКТИРОВАТЬ:

Если вы хотите сделать это, когда первая подписка останется подпиской и все будущие подписки будут немедленно завершены (так что, хотя самый ранний абонент по-прежнему подписывается, никто не может подписаться). Вы можете сделать это следующим образом:

class SingleSubscriberObservable<T> {
    private isSubscribed: boolean = false;

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        if(this.isSubscribed){
            return Observable.empty().subscribe(next, error, complete);    
        }
        this.isSubscribed = true;
        var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
        return new Subscription(()=>{
            unsubscribe.unsubscribe();
            this.isSubscribed = false;
        });
    }
}

Мы держим флаг this.isSusbscribed чтобы отслеживать, есть ли кто-то в настоящее время подписан. Мы также возвращаем пользовательскую подписку, которую мы можем использовать, чтобы вернуть этот флаг в false, когда все отписывается.

Всякий раз, когда кто-то пытается подписаться, если мы вместо этого обвиняем их в пустом Observable который завершится немедленно. Результат будет выглядеть так:

Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable

1