Enviar/produzir mensagem json através de kafka

0

Pergunta

Esta é a minha primeira vez usando Kafka e estou planejando usar kafka com .net

Eu queria saber se eu posso enviar JSON como uma mensagem, quando eu produzir um evento

Eu estou seguindo o tutorial: https://developer.confluent.io/get-started/dotnet/#build-producer

Além disso, há um modo para que o valor a ser mapeada para um modelo de forma que o valor de/estrutura json é sempre ligado ao modelo de

Assim, por exemplo: se eu quiser que meus json valor a ser

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

A maioria dos exemplos que eu possa encontrar são assim:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

Eu gostaria que o item a ser uma classe na estrutura json.

.net apache-kafka asp.net-core
2021-11-23 21:53:21
1

Melhor resposta

0

queria saber se eu posso enviar JSON como uma mensagem, quando eu produzir um evento

Sim. Kafka armazena bytes e converte os bytes usando Serializadores. Quando a construção de um Produtor, você tem a opção de chamada SetValueSerializer.

Alguns dos alto-serializadores pode ser encontrado em https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs

Você precisa escrever a sua própria genericamente lidar com qualquer JSON tipos de modelo.

Quando utilizar o Utf8Serializer para cadeias de caracteres, você vai precisar para pré-serializar o objeto de sua classe de modelo e, em seguida, enviá-lo como o valor. No seu exemplo, você poderia substituir var item com algum objeto serializado.

Como faço para transformar um objeto de C# em uma seqüência de caracteres JSON em .NET?

Quando utilizar o modelo de classes, os seus dados irão ser tipicamente fortemente tipados até que você o inicie manualmente a escrita JSON ou usar o Dicionário de tipos. Se você queria externo mensagem de validação, Confluente de Registro de Esquema é um exemplo que suporta JSONSchema e o JsonSerializer a partir de confluent-dotnet-kafka projeto oferece suporte a isso.

2021-11-23 22:27:28

Apenas um acompanhamento pergunta. Você sabe se eu posso limitar o tamanho da mensagem e existe uma maneira de produtor para verificar qual é o tamanho do evento antes do envio e não permitirá o envio da mensagem, se o tamanho é mais do que o limite?
Learn AspNet

Kafka tem um limite padrão de 1MB lotes de mensagem. Se você obter o tamanho do serializado matriz de bytes, que deve ser uma estreita aproximação de um indivíduo tamanho do registro (há uma sobrecarga adicional, tais como cabeçalhos de registro e carimbos de data / hora, embora)
OneCricketeer

Obrigado. Pode você por favor, responda: stackoverflow.com/questions/70097676/...
Learn AspNet

Em outros idiomas

Esta página está em outros idiomas

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................