Posts RxJava源码分析
Post
Cancel

RxJava源码分析

RxJava创建过程

RxJava创建流程

create()

调用create()创建Observable

1
2
3
4
5
6
7
8
9
10
11
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        emitter.onNext(2);
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Throwable {
        System.out.println("integer = "+integer);
    }
});

Observablecreate方法会创建一个ObservableCreate对象。

1
2
3
4
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    //创建ObservableCreate对象
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

subscribe()

1
2
3
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
1
2
3
4
5
6
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) {
  	//创建LambdaObserver
    LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
    subscribe(ls);
    return ls;
}
1
2
3
4
5
6
7
8
9
10
11
12
public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
      	//调用subscribeActual
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
      //...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void subscribeActual(Observer<? super T> observer) {
  //创建发射器
    CreateEmitter<T> parent = new CreateEmitter<>(observer);
    //调用onSubscribe方法
    observer.onSubscribe(parent);
    try {
      //调用subscribe方法 
      //在subscribe方法中会调用emitter的onNext()
      //emitter的onNext会调用observer的onNext()
      //observer的onNext会调用Consumer的accept()
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

线程切换

线程控制的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        emitter.onNext(2);
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Throwable {
        System.out.println("integer = " + integer);
    }
});

subscribeOn()

subscribeOn流程

1
2
3
4
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
    //创建ObservableSubscribeOn
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
1
2
3
4
5
6
7
//ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {
  //创建观察者
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
    observer.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
1
2
3
4
5
6
7
8
9
10
11
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
      	//调用subscribe方法
        source.subscribe(parent);
    }
}

scheduleDirect()

1
2
3
4
//Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
1
2
3
4
5
6
7
8
9
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker(); //创建Worker
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //调用woker的schedule方法
    w.schedule(task, delay, unit);
    return task;
}
1
2
3
4
5
6
7
8
//EventLoopWorker.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }
    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

observeOn()

observeOn流程

1
2
3
4
5
6
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //创建ObservableObserveOn
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
1
2
3
4
5
6
7
8
9
10
//ObservableObserveOn
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker(); //获取worker
        //创建ObserveOnObserver 并调用subscribe
        source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//ObserveOnObserver
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}
void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}
This post is licensed under CC BY 4.0 by the author.