(WebFlux)004、WebFilter踩坑記錄

2022-09-29 06:02:25

一、背景

使用SpringWebFlux的WebFilter時,由於不熟悉或一些思考疏忽,容易出現未知的異常。記錄一下排查與解決方案,給大家分享一下。

二、問題

2.1 問題描述

在測試介面方法時,出現的錯誤資訊如下(對一些專案路徑做了修改):

java.lang.IllegalStateException: COMPLETED
	at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ com.xxx.config.LoginWebFilter$$EnhancerBySpringCGLIB$$f3da6bdf [DefaultWebFilterChain]
	*__checkpoint ⇢ com.xxx.config.TraceIdFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP POST "/abc/test/testMethod" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451)
		at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105)

2.2 解決問題

通過檢視錯誤資訊描述,checkpoint點都在webfilter中,由於對webflux也不是特別熟,所以就只有一個個測試。

通過一系列操作, 把swagger移除,細讀TraceIdFilter(內容不多),主要歸功於原方案是正確的,修改後錯誤,最後才定位問題出現在LoginWebFilter。

說說插曲,原實現方式(有阻塞邏輯,沒出現上述異常),程式碼如下:

@Configuration
@Slf4j
@Order(-10)
public class LoginWebFilter implements WebFilter {
    // 略...

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();

        if (!enableGateway) {
            String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
                    .orElse("");
            // 獲取使用者資訊
            User user = getUser(token);
            if (user != null) {
                ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                        .build();
                exchange = exchange.mutate().request(mutateRequest).build();
            }
        }
        return chain.filter(exchange);
    }

    private User getUser(String token) {
        if (StringUtils.isNotBlank(token)) {
            return redisTemplate.opsForValue().get("xxx:tk:" + token)
                    .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))).block();
        }
        return null;
    }
}

這樣寫,沒有複雜的業務邏輯,從上到下,完全OJBK,但是調整後,就出現了上述異常。

改完後的問題程式碼如下:

// 錯誤
public class LoginWebFilter implements WebFilter {
	/...略
    @Autowired
    private ReactiveStringRedisTemplate redisTemplate;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        if (!enableGateway) {
            ServerHttpRequest request = exchange.getRequest();
            String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
                    .orElse("");

            return getUser(token).flatMap(user -> {
                ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                        .header(UserUtils.MEMBER_ID, user.getMemId())
                        .header(UserUtils.MOBILE, user.getMobile())
                        .build();

                ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();
                return chain.filter(newexchange);
               // 問題點 
            }).switchIfEmpty(chain.filter(exchange));
        }
        return chain.filter(exchange);
    }
	// 不在用block
    private Mono<User> getUser(String token) {
        if (StringUtils.isNotBlank(token)) {
            return redisTemplate.opsForValue().get("xxx:tk:" + token)
                    .flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class)));
        }
        return Mono.empty();
    }
}

2.3 如何解決

對比改造前和改造後的程式碼,其實差異不大,那問題出現在哪呢?

由於對webflux也不是特別熟,那就只能一點點試(太蠢了)。 最後發現問題出現在了switchIfEmpty(chain.filter(exchange)),在去掉了switchIfEmpty(chain.filter(exchange)),就不會在出現上述異常。

修改後部分程式碼如下:

// 半正確
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

    if (!enableGateway) {

        ServerHttpRequest request = exchange.getRequest();
        String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
            .orElse(「」);

        return getUser(token).flatMap(user -> {
            ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                .header(UserUtils.MEMBER_ID, user.getMemId())
                .header(UserUtils.MOBILE, user.getMobile())
                .build();

            ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();
            return chain.filter(newexchange);
        });
    }
    return chain.filter(exchange);
}

雖然現在不回在出現異常,但是去掉switchIfEmpty後,程式碼邏輯是不完整的,當獲取不到User時,返回Mono.emtpy,那會直接結束流程,不在執行剩下的filter或其他邏輯。真是連環坑,一坑接一坑。所以對程式碼需要調整一番,調整後如下:

// 有點正確 但是不多
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

    if (!enableGateway) {

        ServerHttpRequest request = exchange.getRequest();
        String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
            .orElse(「」);

        return getUser(token).switchIfEmpty(Mono.error(() -> new BizException(ErrorCode.USER_IS_NULL_ERROR)))
            .flatMap(user -> {
                ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                    .header(UserUtils.MEMBER_ID, user.getMemId())
                    .header(UserUtils.MOBILE, user.getMobile())
                    .build();

                ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();
                return chain.filter(newexchange);
            }).onErrorResume(e -> chain.filter(exchange));

    }
    return chain.filter(exchange);
}

當獲取使用者為空後,丟擲異常,然後在兜底,當異常的時候執行chain.filter(exchange)(好蠢的方式.. 但是解決問題了)。

2.4 意外之喜

各位看官,就在我寫完上完上面的程式碼修改方案之後,讀了一下修改完後的程式碼,突然發現問題出在哪了,所以連夜修改了程式碼方式。現在我聽我細細道來。

2.4.1 問題點

原因點chain.filter(exchange)重複執行

switchIfEmpty(chain.filter(exchange))這個點本意是想用在當getUser 方法為空時,執行其它WebFilter的邏輯,從而不影響主流程。

忽略了一點是:當chain.filter(newexchange)這個方法執行完後,返回的也是Mono<Void>,也是為空。所以無論如何,程式碼最後的邏輯都會走到switchIfEmpty(chain.filter(exchange))

但是當getUser獲取到使用者後,會重複執行chain.filter(exchange),如下

  • return chain.filter(newexchange)
  • switchIfEmpty(chain.filter(exchange))

由於第一次執行完chain.filter(exchange),request、response都已經關閉,所以出現了xx COMPLETE,那看來的確符合邏輯。

2.4.2 驗證猜想

這個驗證方式還是挺簡單的,那就是分別傳入正常的TOKEN和錯誤的TOKEN。

具體操作:.....(本人已完成)

結論:

當傳入錯誤的token的時候,確實沒有丟擲異常,完美執行。但是當傳入正確的token,出現了熟悉的異常。

2.4.3 程式碼調整

知道問題的原因,那就好調整程式碼了。修改後如下:

public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    if (!enableGateway) {
        ServerHttpRequest request = exchange.getRequest();
        String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN))
            .orElse(request.getHeaders().getFirst("suuid"));

        return getUser(token).map(user -> {
            ServerHttpRequest mutateRequest = exchange.getRequest().mutate()
                .header(UserUtils.MEMBER_ID, user.getMemId())
                .header(UserUtils.MOBILE, user.getMobile())
                .build();
            return exchange.mutate().request(mutateRequest).build();
            // 調整當getUser為空時,返回的內容
        }).switchIfEmpty(Mono.just(exchange)).flatMap(chain::filter);
        
    }
    return chain.filter(exchange);
}

至此,問題就完全解決拉!心裡美滋滋!

三、總結

1、遇到問題,還是要多看看呀,細細思考一下

2、多看程式碼,發現問題,實現完美的解決方案