⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-first-run/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(一)之正常执行逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

  • 框 :Hystrix 命令执行的过程。
  • 圈 :本文分享的部分 —— 正常执行逻辑。

推荐 Spring Cloud 书籍

2. #applyHystrixSemantics(...)

《Hystrix 源码解析 —— 执行结果缓存》 里,我们看到 #toObservable() 方法里的第 11 至 19 行,当缓存特性未开启,或者缓存未命中时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明执行命令的 Observable。

创建 applyHystrixSemantics 变量,代码如下 :

// `AbstractCommand#toObservable()` 方法
1: final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
2: @Override
3: public Observable<R> call() {
4: // commandState 处于 UNSUBSCRIBED 时,不执行命令
5: if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
6: return Observable.never();
7: }
8: // 获得 执行Observable
9: return applyHystrixSemantics(_cmd);
10: }
11: };

  • 第 5 至 7 行 :当 commandState 处于 UNSUBSCRIBED 时,不执行命令。
  • 第 9 行 :调用 #applyHystrixSemantics(...) 方法,获得执行 Observable 。

#applyHystrixSemantics(...) 方法,代码如下 :

 1: private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
2: // TODO 【2003】【HOOK】
3: // mark that we're starting execution on the ExecutionHook
4: // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
5: executionHook.onStart(_cmd);
6:
7: /* determine if we're allowed to execute */
8: if (circuitBreaker.attemptExecution()) {
9: // 获得 信号量
10: final TryableSemaphore executionSemaphore = getExecutionSemaphore();
11:
12: // 信号量释放Action
13: final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
14: final Action0 singleSemaphoreRelease = new Action0() {
15: @Override
16: public void call() {
17: if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
18: executionSemaphore.release();
19: }
20: }
21: };
22:
23: // TODO 【2011】【Hystrix 事件机制】
24: final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
25: @Override
26: public void call(Throwable t) {
27: eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
28: }
29: };
30:
31: // 信号量 获得
32: if (executionSemaphore.tryAcquire()) {
33: try {
34: // 标记 executionResult 调用开始时间
35: /* used to track userThreadExecutionTime */
36: executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
37:
38: // 获得 执行Observable
39: return executeCommandAndObserve(_cmd)
40: .doOnError(markExceptionThrown)
41: .doOnTerminate(singleSemaphoreRelease)
42: .doOnUnsubscribe(singleSemaphoreRelease);
43: } catch (RuntimeException e) {
44: return Observable.error(e);
45: }
46: } else {
47: return handleSemaphoreRejectionViaFallback();
48: }
49: } else {
50: return handleShortCircuitViaFallback();
51: }
52: }

  • 第 5 行 :TODO 【2003】【HOOK】
  • 第 8 行 :TODO 【2012】【链路健康度】
  • 第 10 行 :调用 #getExecutionSemaphore() 方法,获得信号量( TryableSemaphore )对象,在 「3. TryableSemaphore」 详细解析。
  • 第 13 至 21 行 :信号量释放 Action ,用于下面【执行命令 Observable】的 #doOnTerminate(Action)#doOnUnsubscribe(Action) 方法( 见第 41 至 42 行 )。
  • 第 24 至 29 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 32 行 :调用 TryableSemaphore#tryAcquire() 方法,信号量( TryableSemaphore )使用成功,在 「3. TryableSemaphore」 详细解析。
  • 第 36 行 :标记 executionResult调用开始时间。
  • 第 39 行 :调用 #executeCommandAndObserve() 方法,获得【执行命令 Observable】。在 「4. #executeCommandAndObserve(...)」 详细解析。
  • 第 43 至 45 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。
  • 第 46 至 48 行 :信号量( TryableSemaphore )使用失败,调用 #handleSemaphoreRejectionViaFallback() 方法,处理信号量拒绝的失败回退逻辑,在 《Hystrix 源码解析 —— 命令执行(四)之失败回退逻辑》 详细解析。
  • 第 49 至 51 行 :链路处于熔断状态,调用 #handleShortCircuitViaFallback() 方法,处理链路熔断的失败回退逻辑,在 《Hystrix 源码解析 —— 命令执行(四)之失败回退逻辑》 详细解析。

3. TryableSemaphore

com.netflix.hystrix.AbstractCommand.TryableSemaphore ,Hystrix 定义的信号量接口。代码如下 :

interface TryableSemaphore {

boolean tryAcquire();

void release();

int getNumberOfPermitsUsed();
}

  • 从 API 上,Java 自带的 java.util.concurrent.Semaphore 都能满足,为什么不使用它呢?继续一起往下看。

TryableSemaphore 共有两个子类实现 :

  • TryableSemaphoreNoOp
  • TryableSemaphoreActual

3.1 TryableSemaphoreNoOp

com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp无操作的信号量。代码如下 :

/* package */static class TryableSemaphoreNoOp implements TryableSemaphore {

public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

@Override
public boolean tryAcquire() {
return true;
}

@Override
public void release() {

}

@Override
public int getNumberOfPermitsUsed() {
return 0;
}

}

  • 从实现上看,#tryAcquire() 方法,每次都返回的是 true#release() 方法,无任何操作。这个是为什么?在 Hystrix 里提供了两种执行隔离策略

3.2 TryableSemaphoreActual

com.netflix.hystrix.AbstractCommand.TryableSemaphoreActual真正的的信号量实现。不过实际上,TryableSemaphoreActual 更加像一个计数器。代码如下 :

/* package */static class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);

public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
this.numberOfPermits = numberOfPermits;
}

@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}

@Override
public void release() {
count.decrementAndGet();
}

@Override
public int getNumberOfPermitsUsed() {
return count.get();
}

}

  • numberOfPermits 属性,信号量上限com.netflix.hystrix.strategy.properties.HystrixProperty 是一个接口,当其使用类似 com.netflix.hystrix.strategy.properties.archaius.IntegerDynamicProperty 动态属性的实现时,可以实现动态调整信号量的上限,这就是上文提到的为什么不使用 java.util.concurrent.Semaphore 的原因之一。
  • count 属性,信号量使用数量。🙂,这是为什么说 TryableSemaphoreActual 更加像一个计数器 的原因。
  • 另一个不使用 java.util.concurrent.Semaphore 的原因,TryableSemaphoreActual 无阻塞获取信号量的需求,使用 AtomicInteger 可以达到更轻量级的实现。

3.3 #getExecutionSemaphore()

调用 #getExecutionSemaphore() 方法,获得信号量对象,代码如下 :

/**
* 执行命令(正常执行)信号量映射
* KEY :命令名 {@link #commandKey}
*/
/* each circuit has a semaphore to restrict concurrent fallback execution */
protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();

protected TryableSemaphore getExecutionSemaphore() {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) { // 不存在时,创建 TryableSemaphoreActual
// we didn't find one cache so setup
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
// return NoOp implementation since we're not using SEMAPHORE isolation
return TryableSemaphoreNoOp.DEFAULT;
}
}

  • 根据执行隔离策略不同获取不同的信号量实现 :
    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp 。
    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual 。
      • 相同的 commandKey ,使用相同的 TryableSemaphoreActual 。

4. #executeCommandAndObserve(...)

调用 #executeCommandAndObserve(...) 方法,获得【执行命令 Observable】。代码如下 :

 1: private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
2: // TODO 【】
3: final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
4:
5: // TODO 【2007】【executionResult】用途
6: final Action1<R> markEmits = new Action1<R>() {
7: @Override
8: public void call(R r) {
9: if (shouldOutputOnNextEvents()) {
10: executionResult = executionResult.addEvent(HystrixEventType.EMIT);
11: eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
12: }
13: if (commandIsScalar()) {
14: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
15: eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
16: executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
17: eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
18: circuitBreaker.markSuccess();
19: }
20: }
21: };
22:
23: // TODO 【2007】【executionResult】用途
24: final Action0 markOnCompleted = new Action0() {
25: @Override
26: public void call() {
27: if (!commandIsScalar()) {
28: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
29: eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
30: executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
31: eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
32: circuitBreaker.markSuccess();
33: }
34: }
35: };
36:
37: // 失败回退逻辑 Func1
38: final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
39: @Override
40: public Observable<R> call(Throwable t) {
41: circuitBreaker.markNonSuccess();
42: Exception e = getExceptionFromThrowable(t);
43: executionResult = executionResult.setExecutionException(e);
44: if (e instanceof RejectedExecutionException) {
45: return handleThreadPoolRejectionViaFallback(e);
46: } else if (t instanceof HystrixTimeoutException) {
47: return handleTimeoutViaFallback();
48: } else if (t instanceof HystrixBadRequestException) {
49: return handleBadRequestByEmittingError(e);
50: } else {
51: /*
52: * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
53: */
54: if (e instanceof HystrixBadRequestException) {
55: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
56: return Observable.error(e);
57: }
58:
59: return handleFailureViaFallback(e);
60: }
61: }
62: };
63:
64: // TODO 【2008】【请求缓存】
65: final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
66: @Override
67: public void call(Notification<? super R> rNotification) {
68: setRequestContextIfNeeded(currentRequestContext);
69: }
70: };
71:
72: Observable<R> execution;
73: if (properties.executionTimeoutEnabled().get()) {
74: execution = executeCommandWithSpecifiedIsolation(_cmd)
75: .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // 超时
76: } else {
77: execution = executeCommandWithSpecifiedIsolation(_cmd);
78: }
79:
80: return execution.doOnNext(markEmits)
81: .doOnCompleted(markOnCompleted)
82: .onErrorResumeNext(handleFallback)
83: .doOnEach(setRequestContext);
84: }

5. #executeCommandWithSpecifiedIsolation(...)

调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】。代码如下 :

  1: private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
2: if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
3: // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
4: return Observable.defer(new Func0<Observable<R>>() {
5: @Override
6: public Observable<R> call() {
7:
8: // 标记 executionResult 执行已发生
9: executionResult = executionResult.setExecutionOccurred();
10:
11: // 设置 commandState 为 USER_CODE_EXECUTED
12: if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
13: return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
14: }
15:
16: // TODO 【2002】【metrics】
17: metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
18:
19: // TODO 【2009】【执行超时】
20: if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
21: // the command timed out in the wrapping thread so we will return immediately
22: // and not increment any of the counters below or other such logic
23: return Observable.error(new RuntimeException("timed out before executing run()"));
24: }
25:
26: // 设置 线程状态 为 ThreadState.STARTED
27: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
28: // TODO 【2002】【metrics】
29: //we have not been unsubscribed, so should proceed
30: HystrixCounters.incrementGlobalConcurrentThreads();
31: threadPool.markThreadExecution();
32:
33: // TODO 【2010】【endCurrentThreadExecutingCommand】
34: // store the command that is being run
35: endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
36:
37: // 标记 executionResult 使用线程执行
38: executionResult = executionResult.setExecutedInThread();
39: /**
40: * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
41: */
42: try {
43: // TODO 【2003】【HOOK】
44: executionHook.onThreadStart(_cmd);
45: executionHook.onRunStart(_cmd);
46: executionHook.onExecutionStart(_cmd);
47:
48: // 获得 执行Observable
49: return getUserExecutionObservable(_cmd);
50: } catch (Throwable ex) {
51: return Observable.error(ex);
52: }
53: } else {
54: //command has already been unsubscribed, so return immediately
55: return Observable.empty();
56: }
57: }
58: }).doOnTerminate(new Action0() {
59: @Override
60: public void call() {
61: if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
62: handleThreadEnd(_cmd);
63: }
64: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
65: //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
66: }
67: //if it was unsubscribed, then other cleanup handled it
68: }
69: }).doOnUnsubscribe(new Action0() {
70: @Override
71: public void call() {
72: if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
73: handleThreadEnd(_cmd);
74: }
75: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
76: //if it was never started and was cancelled, then no need to clean up
77: }
78: //if it was terminal, then other cleanup handled it
79: }
80: }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { // TODO 芋艿:Scheduler
81: @Override
82: public Boolean call() {
83: return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
84: }
85: }));
86: } else {
87: return Observable.defer(new Func0<Observable<R>>() {
88: @Override
89: public Observable<R> call() {
90: // 标记 executionResult 执行已发生
91: executionResult = executionResult.setExecutionOccurred();
92:
93: // 设置 commandState 为 USER_CODE_EXECUTED
94: if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
95: return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
96: }
97:
98: // TODO 【2002】【metrics】
99: metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
100:
101: // TODO 【2010】【endCurrentThreadExecutingCommand】
102: // semaphore isolated
103: // store the command that is being run
104: endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
105: try {
106: // TODO 【2003】【HOOK】
107: executionHook.onRunStart(_cmd);
108: executionHook.onExecutionStart(_cmd);
109:
110: // 获得 执行Observable
111: return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
112: } catch (Throwable ex) {
113: //If the above hooks throw, then use that as the result of the run method
114: return Observable.error(ex);
115: }
116: }
117: });
118: }
119: }

  • 根据执行隔离策略不同,创建不同的【执行命令 Observable】。仔细对比下,大体逻辑都是相同的,差别在于执行隔离策略Thread 时,使用 RxJava Scheduler 以及对线程的处理。

  • 第 2 至 85 行 :执行隔离策略Thread

    • 第 9 行 :标记 executionResult 执行已发生。
    • 第 12 至 14 行 :设置 commandStateUSER_CODE_EXECUTED 。若设置失败,调用 Observable#error(Exception) 方法返回 Observable 。
    • 第 17 行 :TODO 【2002】【metrics】
    • 第 20 至 24 行 :TODO 【2009】【执行超时】
    • 第 27 行 :设置 threadStateThreadState.STARTED 成功。
      • 第 30 至 31 行 :TODO 【2002】【metrics】
      • 第 35 行 :TODO 【2010】【endCurrentThreadExecutingCommand】
      • 第 38 行 :标记 executionResult 使用线程执行。
      • 第 44 至 46 行 :TODO 【2003】【HOOK】
      • 第 49 行 :调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。
      • 第 50 至 52 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。
    • 第 53 至 56 行 :设置 threadStateThreadState.STARTED 失败,执行命令此时已经被取消,调用 Observable#empty() 方法返回 Observable 。
    • 第 58 至 68 行 :调用 Observable#doOnTerminate(...) 方法,添加 Action0 。#handleThreadEnd(...) 方法,点击 链接 查看。
    • 第 69 至 79 行 :调用 Observable#doOnUnsubscribe(...) 方法,添加 Action0 。
    • 第 80 至 85 行 :调用 Observable#subscribeOn(Scheduler) 方法,指定 Observable 自身在哪个调度器上执行。
  • 第 86 至 118 行 :执行隔离策略SEMAPHORE

    • 第 91 行 :[ 与第 9 行相同 ]。
    • 第 94 至 96 行 :[ 与第 12 至 14行相同 ]。
    • 第 99 行 :[ 与第 17 行类似 ]。
    • 第 104 行 :[ 与第 35 行相同 ]。
    • 第 107 至 108 行 :[ 与第 45 至 46 行相同 ]。
    • 第 111 行 :[ 与第 49 行相同 ]。
    • 第 112 至 115 行 :[ 与第 50 至 52 行相同 ]。

6. #getUserExecutionObservable(...)

调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。代码如下 :

 1: private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
2: Observable<R> userObservable;
3:
4: try {
5: userObservable = getExecutionObservable();
6: } catch (Throwable ex) {
7: // the run() method is a user provided implementation so can throw instead of using Observable.onError
8: // so we catch it here and turn it into Observable.error
9: userObservable = Observable.error(ex);
10: }
11:
12: return userObservable
13: .lift(new ExecutionHookApplication(_cmd)) // TODO 【2003】【HOOK】
14: .lift(new DeprecatedOnRunHookApplication(_cmd)); // 已废弃
15: }

  • 第 5 行 :调用 #getExecutionObservable() 方法,创建【执行命令 Observable】。#getExecutionObservable() 是个抽象方法,代码如下 :

    protected abstract Observable<R> getExecutionObservable();

  • 第 6 至 10 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。

  • 第 12 至 14 行 :返回【执行命令 Observable】。

    • 第 13 行 :TODO 【2003】【HOOK】

7. #getExecutionObservable()

调用 HystrixCommand#getExecutionObservable() 方法,创建【执行命令 Observable】。代码如下 :

 1: @Override
2: final protected Observable<R> getExecutionObservable() {
3: return Observable.defer(new Func0<Observable<R>>() {
4: @Override
5: public Observable<R> call() {
6: try {
7: return Observable.just(run());
8: } catch (Throwable ex) {
9: return Observable.error(ex);
10: }
11: }
12: }).doOnSubscribe(new Action0() {
13: @Override
14: public void call() {
15: // 记录 执行线程
16: // Save thread on which we get subscribed so that we can interrupt it later if needed
17: executionThread.set(Thread.currentThread());
18: }
19: });
20: }
21:
22: protected abstract R run() throws Exception;

  • 第 3 至 11 行 :调用 Observable#defer(Func0<Observable<R>) 方法,创建【执行命令 Observable】。
    • 第 7 行 :调用 #run() 方法,运行正常执逻辑。通过 Observable#just(...) 方法,返回创建【执行命令 Observable】。
  • 第 12 至 19 行 :调用 #doOnSubscribe(...) 方法,添加 Action 。该操作记录执行线程( executionThread ) 。executionThread 用于 HystrixCommand#queue() 方法,返回的 Future 结果,可以调用 Future#cancel(Boolean) 方法,点击 链接 查看该方法。
  • 第 22 行 :#run() 抽象方法,实现该方法,运行正常执逻辑

8. CommandState

com.netflix.hystrix.AbstractCommand.CommandState ,命令状态,代码如下 :

protected enum CommandState {
NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL
}

状态变迁如下图 :

9. ThreadState

com.netflix.hystrix.AbstractCommand.ThreadState ,线程状态,代码如下 :

protected enum ThreadState {
NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL
}

状态变迁如下图 :

666. 彩蛋

知识星球

对 Hystrix 和 RxJava 慢慢更有感觉了。

柳暗花明又一村。

继续加油!

胖友,分享一波朋友圈可好!

文章目录
  1. 1. 1. 概述
  2. 2. 2. #applyHystrixSemantics(...)
  3. 3. 3. TryableSemaphore
    1. 3.1. 3.1 TryableSemaphoreNoOp
    2. 3.2. 3.2 TryableSemaphoreActual
    3. 3.3. 3.3 #getExecutionSemaphore()
  4. 4. 4. #executeCommandAndObserve(...)
  5. 5. 5. #executeCommandWithSpecifiedIsolation(...)
  6. 6. 6. #getUserExecutionObservable(...)
  7. 7. 7. #getExecutionObservable()
  8. 8. 8. CommandState
  9. 9. 9. ThreadState
  10. 10. 666. 彩蛋