ES 使用者端 RestHighLevelClient Connection reset by peer 親測有效 2022-11-05

2022-11-06 06:02:25

導讀

  最新公司ES叢集老出現連線關閉,進而導致查詢|寫入ES時報錯,報錯紀錄檔顯示如下

[2m2022-10-23 14:13:10.088 - ERROR - [NONE][NONE][NONE][0][1584065372948234240] - [XNIO-1 task-3] - com.mall.search.service.EsRestService:235 : 新增檔案失敗:{}

java.io.IOException: Connection reset by peer
    at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:964)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:233)
    at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448)
    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418)
    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388)
    at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)
    at sun.reflect.GeneratedMethodAccessor117.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:523)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
    at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:129)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:108)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:84)
    at io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62)
    at io.undertow.servlet.handlers.ServletChain$1.handleRequest(ServletChain.java:68)
    at io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36)
    at io.undertow.servlet.handlers.RedirectDirHandler.handleRequest(RedirectDirHandler.java:68)
    at io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:132)
    at io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57)
    at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
    at io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46)
    at io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64)
    at io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:60)
    at io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:77)
    at io.undertow.security.handlers.AbstractSecurityContextAssociationHandler.handleRequest(AbstractSecurityContextAssociationHandler.java:43)
    at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
    at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
    at io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:269)
    at io.undertow.servlet.handlers.ServletInitialHandler.access$100(ServletInitialHandler.java:78)
    at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:133)
    at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:130)
    at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:48)
    at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)
    at io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:249)
    at io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:78)
    at io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:99)
    at io.undertow.server.Connectors.executeRootHandler(Connectors.java:376)
    at io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:830)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:197)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at org.apache.http.impl.nio.conn.LoggingIOSession$LoggingByteChannel.read(LoggingIOSession.java:204)
    at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)
    at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136)
    at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    ... 1 common frames omitted

優化&解決

  因為這個專案是助農app(抖音+淘寶,雜交版app),app首頁下拉獲取最新推薦視訊,需要通過第三方極光拉取推薦視訊,存入本地資料庫mysql後;一次拉取20條,然後通過執行緒池非同步寫入ES,大概架構如下

問題修復

步驟一

  修改使用者端tcp keepalive,系統預設為7200,修改為300(單位秒,5分鐘)

修改為300

[root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 
7200
[root@ybchen-1 ~]# 
[root@ybchen-1 ~]# 
[root@ybchen-1 ~]# 
[root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 
7200
[root@ybchen-1 ~]# echo 300 > /proc/sys/net/ipv4/tcp_keepalive_time 
[root@ybchen-1 ~]# cat /proc/sys/net/ipv4/tcp_keepalive_time 
300
[root@ybchen-1 ~]# 
[root@ybchen-1 ~]# 
[root@ybchen-1 ~]# 
[root@ybchen-1 ~]# 

步驟二

  修改ES使用者端的KeepAlive時間,並新增執行緒池大小等引數

=========ES設定類-開始=============

@Data
@Component
@ConfigurationProperties(prefix = "elastic.search")
public class EsConfiguration {
    //主機
    private String host;
    //
    private int port;
    //叢集名稱
    private String clusterName;
    //存取協定,如:http
    private String schema;
    //使用者名稱
    private String userName;
    //密碼
    private String password;
}

=========ES設定類-結束=============

============ES使用者端-開始=======================
    @Autowired
    EsConfiguration esConfiguration;

    private static RestHighLevelClient restHighLevelClient;

    public RestHighLevelClient getRestClient() {

        if (null != restHighLevelClient) {
            return restHighLevelClient;
        }
        List<HttpHost> httpHosts = new ArrayList<>();
        //填充資料
        httpHosts.add(new HttpHost(esConfiguration.getHost(), esConfiguration.getPort()));
        //填充host節點
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));

        if (StringUtils.isNotBlank(esConfiguration.getUserName()) && StringUtils.isNotBlank(esConfiguration.getPassword())) {
            //填充使用者名稱密碼
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfiguration.getUserName(), esConfiguration.getPassword()));
            //非同步連結延時設定
            builder.setRequestConfigCallback(requestConfigBuilder ->
                    requestConfigBuilder
                            .setConnectTimeout(5000) //5秒
                            .setSocketTimeout(5000)
                            .setConnectionRequestTimeout(5000)
            );
            //非同步連結數設定
            builder.setHttpClientConfigCallback(httpClientBuilder -> {
                //最大連線數100個
                httpClientBuilder.setMaxConnTotal(100);
                //最大路由連線數
                httpClientBuilder.setMaxConnPerRoute(100);
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                // 設定KeepAlive為5分鐘的時間,不設定預設為-1,也就是持續連線,然而這會受到外界的影響比如Firewall,會將TCP連線單方面斷開,從而會導致Connection reset by peer的報錯
                // 參考github解決方案:https://github.com/TFdream/Elasticsearch-learning/issues/30
                httpClientBuilder.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(3))
                        .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).setSoKeepAlive(true).build());
                return httpClientBuilder;
            });
        }

        restHighLevelClient = new RestHighLevelClient(builder);
        return restHighLevelClient;
    }

==============ES使用者端-結束================

========== ES使用者端使用範例 ===============
@Data
@ApiModel("新增/更新ES通用資料結構")
public class DocSearchVo implements Serializable {

    @ApiModelProperty(value = "檔案ID")
    private String docId;

    @ApiModelProperty(value = "檔案JSON")
    private String docStrJson;

}


    /**
     * 批次新增檔案
     *
     * @param indexName
     * @param docSearchVoList
     * @return
     */
    public boolean batchIndexDoc(String indexName, List<DocSearchVo> docSearchVoList) {
        RestHighLevelClient client = getRestClient();
        BulkRequest request = new BulkRequest();
        for (DocSearchVo docSearchVo : docSearchVoList) {
            IndexRequest indexRequest = new IndexRequest(indexName).id(docSearchVo.getDocId()).source(docSearchVo.getDocStrJson(), XContentType.JSON);
            request.add(indexRequest);
        }
        client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkItemResponses) {
                log.debug("非同步批次新增檔案成功,indexName:{},docSearchVoList:{}", indexName, docSearchVoList);
            }

            @Override
            public void onFailure(Exception e) {
                log.error("非同步批次新增檔案失敗,indexName:{},docSearchVoList:{},錯誤資訊:{}", indexName, docSearchVoList, e);
            }
        });
        return true;
    }

  注:細心的童鞋已經發現,建立ES使用者端的時候,不是執行緒安全的單例模式(這塊別的同事寫的,我只是負責修改這個bug,然後就沒管這個執行緒安全問題,其實是來背鍋的,嗚嗚嗚~~~~~~)

步驟三

  新增ES使用者端心跳檢查,30秒一次

@Component
@Slf4j
public class EsSchedule {
    @Autowired
    EsRestService esRestService;

    /**
     * 30秒一次檢查es狀態
     */
    @Scheduled(fixedRate = 30 * 1000)
    public void heartbeatToES() {
        try {
            RequestOptions requestOptions = RequestOptions.DEFAULT.toBuilder().build();
            boolean result = esRestService.getRestClient().ping(requestOptions);
            log.info("檢查ES狀態:{}", result);
        } catch (Exception e) {
            log.error("檢查ES狀態發生異常:{}", e);
        }
    }
}

搞定~