O que se Kafka consumidor lida com uma mensagem muito longa? Vai Kafka nomear esta partição para outro consumidor e a mensagem será duplamente tratada?
Sim, isso é correto. Se Kafka consumidor leva muito tempo para processar uma mensagem e posterior votação() está atrasada, Kafka vai re-nomear esta partição para outro consumidor e a mensagem será processada novamente (e novamente).
Para mais clareza, primeiro temos de decidir e definir Quanto tempo é demasiado longo?'.
Este é definido pela propriedade max.poll.interval.ms
. A partir do docs,
O atraso máximo entre invocações de pesquisa() quando utilizar o grupo de consumidores de gestão. Isto coloca um limite superior sobre a quantidade de tempo que o consumidor pode ficar ociosa antes de buscar mais registros. Se poll() não é chamado antes da expiração do tempo limite e, em seguida, o consumidor é considerado falha e o grupo irá reequilibrar a fim de reatribuir as partições de outro membro.
Grupo de consumidores é rebalanced se existem chamadas para a enquete() dentro deste período de tempo.
Não é mais uma propriedade auto.commit.interval.ms
. A confirmação automática deslocamentos de seleção será chamado apenas durante a pesquisa - verifica se o tempo decorrido for maior do que o configurado automaticamente a consolidação intervalo de tempo e, se o resultado for sim, o deslocamento está comprometida.
Se Kafka consumidor está levando muito tempo para processar os registos e, em seguida, a posterior pesquisa() chamada também está atrasado e os deslocamentos retornou na última pesquisa() não são comprometidos. Se reequilibrar acontece neste momento, o novo consumidor de cliente atribuído a essa partição irá iniciar o processamento de mensagens novamente.
Grupo de consumidores de reequilibrar e partição resultante de mudança pode ser evitada se aumentar este valor. Isto irá aumentar o permitido intervalo entre as sondagens e dar mais tempo para os consumidores para lidar com o registro(s) devolvido a partir de pesquisa(). O consumidor só vai se juntar a reequilibrar o apelo interior para a enquete, de modo a aumentar max intervalo de sondagem também o atraso de grupo reequilibra.
Não é mais um problema crescente de max intervalo de sondagem para um grande valor. Se o consumidor morre por algum outro motivo, leva mais tempo do que o configurado max.poll.interval.ms
intervalo para detectar a falha.
session.timeout.ms
e heartbeat.interval.ms
estão disponíveis, neste caso, para detectar a falha total, mais cedo possível.
Para mais detalhes sobre estes parâmetros:
Por favor, note que os valores configurados para session.timeout.ms
deve ser admissível, no intervalo, tal como configurado na corretora configuração de propriedades
- do grupo.min.sessão.tempo de espera.ms
- do grupo.max.sessão.tempo de espera.ms
Caso contrário, a seguinte exceção será lançada durante a partida de consumo do cliente.
Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
Atualização: Para evitar a manipulação das mensagens novamente
Existe um outro método na classe KafkaConsumer commitAsync()
para acionar cometer desvios de operação.
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();
Para obter mais detalhes sobre commitSync() e commitAsync(), por favor, verifique esta thread
Cometer um desvio manualmente é uma ação de dizer que o deslocamento tenha sido processada, de modo que o Kafka não enviar os registos comprometidos para a mesma partição de novo. Quando os deslocamentos são cometidos manualmente, é importante notar que, se o consumidor morre antes que o processamento de registros, por qualquer motivo, há uma chance de que esses registros não será processado novamente.