O que se Kafka consumidor lida com uma mensagem muito longa? Vai Kafka nomear esta partição para outro consumidor e a mensagem será duplamente tratada?

0

Pergunta

Suponha que Kafka, 1 partition, 2 consumers.(2º consumidor é ocioso)

Suponha que o 1º de uma consumido uma mensagem, vai para lidar com isso com 3 outros serviços e de repente sticks em um deles e perder o Kafka tempo de espera.

Vai Kafka reconduzir a partição para o 2º consumidor e a mensagem será duplamente processados (suponha que o 1º de uma bem-sucedida)?

1

Melhor resposta

1

O que se Kafka consumidor lida com uma mensagem muito longa? Vai Kafka nomear esta partição para outro consumidor e a mensagem será duplamente tratada?

Sim, isso é correto. Se Kafka consumidor leva muito tempo para processar uma mensagem e posterior votação() está atrasada, Kafka vai re-nomear esta partição para outro consumidor e a mensagem será processada novamente (e novamente).

Para mais clareza, primeiro temos de decidir e definir Quanto tempo é demasiado longo?'.

Este é definido pela propriedade max.poll.interval.ms. A partir do docs,

O atraso máximo entre invocações de pesquisa() quando utilizar o grupo de consumidores de gestão. Isto coloca um limite superior sobre a quantidade de tempo que o consumidor pode ficar ociosa antes de buscar mais registros. Se poll() não é chamado antes da expiração do tempo limite e, em seguida, o consumidor é considerado falha e o grupo irá reequilibrar a fim de reatribuir as partições de outro membro.

Grupo de consumidores é rebalanced se existem chamadas para a enquete() dentro deste período de tempo.

Não é mais uma propriedade auto.commit.interval.ms. A confirmação automática deslocamentos de seleção será chamado apenas durante a pesquisa - verifica se o tempo decorrido for maior do que o configurado automaticamente a consolidação intervalo de tempo e, se o resultado for sim, o deslocamento está comprometida.

Se Kafka consumidor está levando muito tempo para processar os registos e, em seguida, a posterior pesquisa() chamada também está atrasado e os deslocamentos retornou na última pesquisa() não são comprometidos. Se reequilibrar acontece neste momento, o novo consumidor de cliente atribuído a essa partição irá iniciar o processamento de mensagens novamente.

Grupo de consumidores de reequilibrar e partição resultante de mudança pode ser evitada se aumentar este valor. Isto irá aumentar o permitido intervalo entre as sondagens e dar mais tempo para os consumidores para lidar com o registro(s) devolvido a partir de pesquisa(). O consumidor só vai se juntar a reequilibrar o apelo interior para a enquete, de modo a aumentar max intervalo de sondagem também o atraso de grupo reequilibra.

Não é mais um problema crescente de max intervalo de sondagem para um grande valor. Se o consumidor morre por algum outro motivo, leva mais tempo do que o configurado max.poll.interval.ms intervalo para detectar a falha.

session.timeout.ms e heartbeat.interval.ms estão disponíveis, neste caso, para detectar a falha total, mais cedo possível.

Para mais detalhes sobre estes parâmetros:

Por favor, note que os valores configurados para session.timeout.ms deve ser admissível, no intervalo, tal como configurado na corretora configuração de propriedades

  • do grupo.min.sessão.tempo de espera.ms
  • do grupo.max.sessão.tempo de espera.ms

Caso contrário, a seguinte exceção será lançada durante a partida de consumo do cliente.

Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)

Atualização: Para evitar a manipulação das mensagens novamente

Existe um outro método na classe KafkaConsumer commitAsync() para acionar cometer desvios de operação.

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();

Para obter mais detalhes sobre commitSync() e commitAsync(), por favor, verifique esta thread

Cometer um desvio manualmente é uma ação de dizer que o deslocamento tenha sido processada, de modo que o Kafka não enviar os registos comprometidos para a mesma partição de novo. Quando os deslocamentos são cometidos manualmente, é importante notar que, se o consumidor morre antes que o processamento de registros, por qualquer motivo, há uma chance de que esses registros não será processado novamente.

2021-11-25 07:04:25

Obrigado, é claro. Há formas de evitar o segundo tratamento?
J.J. Beam

@J. J. Feixe de resposta atualizado com links e amostra
arunkvelu

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................