⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/RxJava/observable-defer/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RxJava 1.2.X 版本

本系列写作目的,为了辅助 Hystrix 的理解,因此会较为零散与琐碎,望见谅见谅。


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

在一些业务场景下,我们需要 Observable 是动态的,例如说,《Hystrix 源码解析 —— 执行结果缓存》 分享的缓存 Observable ,无法在创建 Observable 阶段就知道是否有缓存,通过 Observable#defer(...) 方法,声明动态的 Observable 。示例代码如下:

public static void main(String[] args) {
Observable.defer(new Func0<Observable<String>>() { // #defer(...)
@Override
public Observable<String> call() {
String name = Math.random() > 0.5 ? "小明" : "小贾"; // 随机名字
return Observable.just(name);
}
}).subscribe(new Action1<String>() { // #subscribe(...)
@Override
public void call(String s) {
System.out.println(s);
}
});
}


Observable#defer(...) 方法,代码如下:

// Observable.java
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
return create(new OnSubscribeDefer<T>(observableFactory));
}

public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

  • 使用传入 observableFactory 参数,生成动态的 Observable 。

OnSubscribeDefer 类,代码如下:

public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
final Func0<? extends Observable<? extends T>> observableFactory;

public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}

@Override
public void call(final Subscriber<? super T> s) {
Observable<? extends T> o;
try {
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
o.unsafeSubscribe(Subscribers.wrap(s));
}

}

  • Observable#subscribe(...) 方法调用时,调用 OnSubscribeDefer#call(...) 方法 :
    • 调用 Func0#call() 方法,创建动态的 Observable 。
    • 调用 Observable#unsafeSubscribe(...) 方法,继续订阅逻辑

666. 彩蛋

知识星球

文章目录
  1. 1. 666. 彩蛋