RxJava创建过程

RxJava创建流程

create()

调用create()创建Observable

1
2
3
4
5
6
7
8
9
10
11
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(2);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("integer = "+integer);
}
});

Observablecreate方法会创建一个ObservableCreate对象。

1
2
3
4
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
//创建ObservableCreate对象
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

subscribe()

1
2
3
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
1
2
3
4
5
6
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) {
//创建LambdaObserver
LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
1
2
3
4
5
6
7
8
9
10
11
12
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
//调用subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//...
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建发射器
CreateEmitter<T> parent = new CreateEmitter<>(observer);
//调用onSubscribe方法
observer.onSubscribe(parent);
try {
//调用subscribe方法
//在subscribe方法中会调用emitter的onNext()
//emitter的onNext会调用observer的onNext()
//observer的onNext会调用Consumer的accept()
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

线程切换

线程控制的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(2);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println("integer = " + integer);
}
});

subscribeOn()

subscribeOn流程

1
2
3
4
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
//创建ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
1
2
3
4
5
6
7
//ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {
//创建观察者
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
1
2
3
4
5
6
7
8
9
10
11
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//调用subscribe方法
source.subscribe(parent);
}
}

scheduleDirect()

1
2
3
4
//Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
1
2
3
4
5
6
7
8
9
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); //创建Worker
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//调用woker的schedule方法
w.schedule(task, delay, unit);
return task;
}
1
2
3
4
5
6
7
8
//EventLoopWorker.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}

observeOn()

observeOn流程

1
2
3
4
5
6
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//创建ObservableObserveOn
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
1
2
3
4
5
6
7
8
9
10
//ObservableObserveOn
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker(); //获取worker
//创建ObserveOnObserver 并调用subscribe
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//ObserveOnObserver
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}