Kafka Classe não é confiável erro de pacote

0

Pergunta

Eu estou tentando integrar Kafka em o meu projeto. Eu tenho 2 aplicações de mola, um tem de ser configurado como um produtor e a outra, como um consumidor. Eu estou tentando enviar uma mensagem de saudação. Saudação é uma classe, tendo duas campo, ele vai ser visto abaixo. Mas eu continuar a receber este erro, em que o consumidor aplicativo de console:

2021-11-23 13:31:53.412 ERROR 8452 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition greetingTopic-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'assignment2.kafka.Greeting' is not in the trusted packages: [java.util, java.lang, ro.tuc.ds2020.kafkaconsumer, ro.tuc.ds2020.kafkaconsumer.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

O código para o Kafka produtor de configuração:

package assignment2.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
}

Eu tenho o seguinte código para o Kafka configuração do consumidor:

package ro.tuc.ds2020.kafkaconsumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "sensorGroup");
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>  kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "assignment2.kafka.Greeting");
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(greetingConsumerFactory());
        return factory;
    }
}

E a Saudação de classe, que está presente em ambas as aplicações. Sobre o consumidor em app, no ro.tuc.ds2020.kafkaconsumer pacote e o produtor, sobre o assignment2.kafka pacote.

public class Greeting {

    private String msg;
    private String name;

    public Greeting() {

    }

    public Greeting(String msg, String name) {
        this.msg = msg;
        this.name = name;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return msg + ", " + name + "!";
    }
}

Eu suspeito que há algo de errado com a minha configuração, eu encontrei um similar resolvido pergunta aqui: Primavera Inicialização Kafka Consumidor jogando erro no loop . Eu adicionei as linhas específicas que resolveu esta pessoa de erro, mas infelizmente eu não posso fazê-lo funcionar. Eu acredito que ele está relacionado com o facto de eu ter duas aplicações distintas, de modo que o objeto do tipo de Saudação que está a enviar a partir do produtor para o consumidor não está no mesmo pacote. Obrigado! Espero que alguém possa ajudar

apache-kafka java spring spring-boot
2021-11-23 13:46:49
2
0

Você está mapeando o tipo errado no lado do consumidor...

props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");

Precisa ser a versão para o consumidor.

2021-11-23 15:34:33

Eu modifiquei a configuração para greetingConsumeFactory() , desta forma: props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:ro.tuc.ds2020.kafkaconsumer.Greeting"); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "ro.tuc.ds2020.kafkaconsumer.Greeting"); mas o erro ainda aparece
Gloria

Que não faz sentido. Oh - você tem de definir ADD_TYPE_INFO_HEADERS para false; ele precisa ser verdadeiro para transmitir cabeçalhos. Mas ele ainda deve ter sucesso por causa do padrão de tipo de configuração. Por favor, postar um MCRE algum lugar e eu vou dar uma olhada para ver o que está errado.
Gary Russell

Muito obrigado por sua ajuda. Eu encontrei aqui: stackoverflow.com/questions/54638846/... que eu possa definir o useHeadersIfPresent de JsonDeserializer para falso e parece estar funcionando para mim.
Gloria
0

Eu resolvi este problema modificando a configuração da seguinte maneira: return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class, false)); Na última linha no greetingConsumerFactory, eu adicionei falso no JsonDeserializer.

2021-11-23 16:19:48

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................