⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 juejin.im/post/5b1d1c5f6fb9a01e6a0c8e06 「go4it」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

本文主要研究一下spring cloud gateway如何集成hystrix

maven

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

添加spring-cloud-starter-netflix-hystrix依赖,开启hystrix

配置实例

hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds: 5000
spring:
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: employee-service
uri: lb://employee-service
predicates:
- Path=/employee/**
filters:
- RewritePath=/employee/(?<path>.*), /$\{path}
- name: Hystrix
args:
name: fallbackcmd
fallbackUri: forward:/fallback

  • 首先filter里头配置了name为Hystrix的filter,实际是对应HystrixGatewayFilterFactory
  • 然后指定了hystrix command的名称,及fallbackUri,注意fallbackUri要以forward开头
  • 最后通过hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds指定该command的超时时间

fallback实例

@RestController
@RequestMapping("/fallback")
public class FallbackController {

@RequestMapping("")
public String fallback(){
return "error";
}
}

源码解析

GatewayAutoConfiguration

spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java

@Configuration
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore(HttpHandlerAutoConfiguration.class)
@AutoConfigureAfter({GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class})
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
//......
@Configuration
@ConditionalOnClass({HystrixObservableCommand.class, RxReactiveStreams.class})
protected static class HystrixConfiguration {
@Bean
public HystrixGatewayFilterFactory hystrixGatewayFilterFactory(DispatcherHandler dispatcherHandler) {
return new HystrixGatewayFilterFactory(dispatcherHandler);
}
}
//......
}

引入spring-cloud-starter-netflix-hystrix类库,就有HystrixObservableCommand.class, RxReactiveStreams.class,便开启HystrixConfiguration

HystrixGatewayFilterFactory

spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/factory/HystrixGatewayFilterFactory.java

/**
* Depends on `spring-cloud-starter-netflix-hystrix`, {@see http://cloud.spring.io/spring-cloud-netflix/}
* @author Spencer Gibb
*/
public class HystrixGatewayFilterFactory extends AbstractGatewayFilterFactory<HystrixGatewayFilterFactory.Config> {

public static final String FALLBACK_URI = "fallbackUri";

private final DispatcherHandler dispatcherHandler;

public HystrixGatewayFilterFactory(DispatcherHandler dispatcherHandler) {
super(Config.class);
this.dispatcherHandler = dispatcherHandler;
}

@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList(NAME_KEY);
}

public GatewayFilter apply(String routeId, Consumer<Config> consumer) {
Config config = newConfig();
consumer.accept(config);

if (StringUtils.isEmpty(config.getName()) && !StringUtils.isEmpty(routeId)) {
config.setName(routeId);
}

return apply(config);
}

@Override
public GatewayFilter apply(Config config) {
//TODO: if no name is supplied, generate one from command id (useful for default filter)
if (config.setter == null) {
Assert.notNull(config.name, "A name must be supplied for the Hystrix Command Key");
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(getClass().getSimpleName());
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(config.name);

config.setter = Setter.withGroupKey(groupKey)
.andCommandKey(commandKey);
}

return (exchange, chain) -> {
RouteHystrixCommand command = new RouteHystrixCommand(config.setter, config.fallbackUri, exchange, chain);

return Mono.create(s -> {
Subscription sub = command.toObservable().subscribe(s::success, s::error, s::success);
s.onCancel(sub::unsubscribe);
}).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> {
if (throwable instanceof HystrixRuntimeException) {
HystrixRuntimeException e = (HystrixRuntimeException) throwable;
if (e.getFailureType() == TIMEOUT) { //TODO: optionally set status
setResponseStatus(exchange, HttpStatus.GATEWAY_TIMEOUT);
return exchange.getResponse().setComplete();
}
}
return Mono.error(throwable);
}).then();
};
}
//......
}

这里创建了RouteHystrixCommand,将其转换为Mono,然后在onErrorResume的时候判断如果HystrixRuntimeException的failureType是FailureType.TIMEOUT类型的话,则返回GATEWAY_TIMEOUT(504, "Gateway Timeout")状态码。

RouteHystrixCommand

//TODO: replace with HystrixMonoCommand that we write
private class RouteHystrixCommand extends HystrixObservableCommand<Void> {

private final URI fallbackUri;
private final ServerWebExchange exchange;
private final GatewayFilterChain chain;

RouteHystrixCommand(Setter setter, URI fallbackUri, ServerWebExchange exchange, GatewayFilterChain chain) {
super(setter);
this.fallbackUri = fallbackUri;
this.exchange = exchange;
this.chain = chain;
}

@Override
protected Observable<Void> construct() {
return RxReactiveStreams.toObservable(this.chain.filter(exchange));
}

@Override
protected Observable<Void> resumeWithFallback() {
if (this.fallbackUri == null) {
return super.resumeWithFallback();
}

//TODO: copied from RouteToRequestUrlFilter
URI uri = exchange.getRequest().getURI();
//TODO: assume always?
boolean encoded = containsEncodedParts(uri);
URI requestUrl = UriComponentsBuilder.fromUri(uri)
.host(null)
.port(null)
.uri(this.fallbackUri)
.build(encoded)
.toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);

ServerHttpRequest request = this.exchange.getRequest().mutate().uri(requestUrl).build();
ServerWebExchange mutated = exchange.mutate().request(request).build();
return RxReactiveStreams.toObservable(HystrixGatewayFilterFactory.this.dispatcherHandler.handle(mutated));
}
}

  • 这里重写了construct方法,RxReactiveStreams.toObservable(this.chain.filter(exchange)),将reactor的Mono转换为rxjava的Observable
  • 这里重写了resumeWithFallback方法,针对有fallbackUri的情况,重新路由到fallbackUri的地址

Config

public static class Config {
private String name;
private Setter setter;
private URI fallbackUri;

public String getName() {
return name;
}

public Config setName(String name) {
this.name = name;
return this;
}

public Config setFallbackUri(String fallbackUri) {
if (fallbackUri != null) {
setFallbackUri(URI.create(fallbackUri));
}
return this;
}

public URI getFallbackUri() {
return fallbackUri;
}

public void setFallbackUri(URI fallbackUri) {
if (fallbackUri != null && !"forward".equals(fallbackUri.getScheme())) {
throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
}
this.fallbackUri = fallbackUri;
}

public Config setSetter(Setter setter) {
this.setter = setter;
return this;
}
}

可以看到Config校验了fallbackUri,如果不为null,则必须以forward开头

小结

spring cloud gateway集成hystrix,分为如下几步:

  • 添加spring-cloud-starter-netflix-hystrix依赖
  • 在对应route的filter添加name为Hystrix的filter,同时指定hystrix command的名称,及其fallbackUri(可选)
  • 指定该hystrix command的超时时间等。

doc

文章目录
  1. 1.
  2. 2. maven
  3. 3. 配置实例
    1. 3.1. fallback实例
  4. 4. 源码解析
    1. 4.1. GatewayAutoConfiguration
    2. 4.2. HystrixGatewayFilterFactory
    3. 4.3. RouteHystrixCommand
    4. 4.4. Config
  5. 5. 小结
  6. 6. doc