kafka-记一次线上收不到订阅的Topic的消息

有一个服务 ,订阅了6个topic 上次上线时间5.22 但是在6.2号的时候 开始接受不到其中的3个topic的消息

其他消费者可以收到,只有这一个服务收不到

1、使用了kafka的一个可视化工具:Kafka Tool

正常情况

kafka-记一次线上收不到订阅的Topic的消息

用该工具可以看到 消费者订阅的topic以及 partition数

发现少了3个topic

2、查看线上日志:

 Container exception org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time
between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either
 by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:721) ~[kafka-clients-1.0.0.jar!/:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:599) ~[kafka-clients-1.0.0.jar!/:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1242) ~[kafka-clients-1.0.0.jar!/:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1217) ~[spring-kafka-2.1.2.RELEASE.jar!/:2.1.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1096) ~[spring-kafka-2.1.2.RELEASE.jar!/:2.1.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:651) [spring-kafka-2.1.2.RELEASE.jar!/:2.1.2.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_60]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_60]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

createContractCore error,orderId->31347013 org.springframework.web.client.HttpServerErrorException: 504 GATEWAY_TIMEOUT
        at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:97) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:79) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:773) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:726) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:682) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.RestTemplate.postForObject(RestTemplate.java:433) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.RestTemplate$$FastClassBySpringCGLIB$$aa4e9ed0.invoke(<generated>) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:89) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.cloud.netflix.metrics.RestTemplateUrlTemplateCapturingAspect.captureUrlTemplate(RestTemplateUrlTemplateCapturingAspect.java:33) ~[spring-cloud-netflix-core-1.4.4.RELEASE.jar!/:1.4.4.RELEASE]
        at sun.reflect.GeneratedMethodAccessor528.invoke(Unknown Source) ~[?:?]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_60]
        at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_60]
        at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:643) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:632) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at org.springframework.web.client.RestTemplate$$EnhancerBySpringCGLIB$$d7b56511.postForObject(<generated>) ~[spring-web-5.0.3.RELEASE.jar!/:5.0.3.RELEASE]
        at net.wecash.common.service.impl.ContractServiceImpl.restTemplate(ContractServiceImpl.java:724) ~[classes!/:2.5.7]
        at net.wecash.common.service.impl.ContractServiceImpl.generateSignContract(ContractServiceImpl.java:661) ~[classes!/:2.5.7]
        at net.wecash.common.service.impl.ContractServiceImpl.createSignContract(ContractServiceImpl.java:620) ~[classes!/:2.5.7]
        at net.wecash.common.service.impl.ContractServiceImpl.createContractCore(ContractServiceImpl.java:396) ~[classes!/:2.5.7]
        at net.wecash.common.service.impl.ContractServiceImpl.createContract(ContractServiceImpl.java:168) ~[classes!/:2.5.7]
        at net.wecash.common.kafka.listener.IOUSuccessListener.lambda$kafkaListener$0(IOUSuccessListener.java:46) ~[classes!/:2.5.7]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) [?:1.8.0_60]
        at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) [?:1.8.0_60]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) [?:1.8.0_60]
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) [?:1.8.0_60]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_60]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_60]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_60]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_60]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_60] 

由日志得知,请求的第三方接口崩了,导致kafka提交时group已经rebalance 了,提交失败,因为请求时间太长 导致数据处理时间太长,poll()在间隔时间内没有再次调用,kafka任务该机器已经挂了,将其踢出,以此循环整个服务的5台机器全部被踢出。

3、问题分析

kafka的两个属性

max.poll.interval.ms :该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员,默认为300s

max.poll.records :单次拉取的数据量,默认为500

可以增大间隔时间,也可以减少单次拉取的数据量

4、其实 如果遇到这个第三方接口长时间无回复的问题,更改设置也可能无法解决问题,可以考虑将数据拉取下来 放到内存或者缓存中处理