Apache Kafka

Como ler dados de Kafka com Python

Como ler dados de Kafka com Python
Kafka é um sistema de mensagens distribuídas de código aberto para enviar a mensagem em tópicos particionados e diferentes. O streaming de dados em tempo real pode ser implementado usando o Kafka para receber dados entre os aplicativos. Tem três partes principais. Estes são produtor, consumidor e tópicos. O produtor é usado para enviar uma mensagem para um determinado tópico e cada mensagem é anexada com uma chave. O consumidor é usado para ler uma mensagem sobre um determinado tópico do conjunto de partições. Os dados recebidos do produtor e armazenados nas partições com base em um tópico específico. Existem muitas bibliotecas em python para criar produtor e consumidor para construir um sistema de mensagens usando Kafka. Como os dados do Kafka podem ser lidos usando python é mostrado neste tutorial.

Pré-requisito

Você deve instalar a biblioteca python necessária para ler os dados do Kafka. Python3 é usado neste tutorial para escrever o script de consumidor e produtor. Se o pacote pip não foi instalado antes em seu sistema operacional Linux, você deve instalar o pip antes de instalar a biblioteca Kafka para python. python3-kafka é usado neste tutorial para ler dados de Kafka. Execute o seguinte comando para instalar a biblioteca.

$ pip install python3-kafka

Lendo dados de texto simples de Kafka

Diferentes tipos de dados podem ser enviados do produtor sobre um determinado tópico que pode ser lido pelo consumidor. Como dados de texto simples podem ser enviados e recebidos de Kafka usando produtor e consumidor é mostrado nesta parte deste tutorial.

Crie um arquivo chamado produtor1.py com o seguinte script python. KafkaProducer módulo é importado da biblioteca Kafka. A lista de corretores precisa ser definida no momento da inicialização do objeto produtor para se conectar ao servidor Kafka. A porta padrão do Kafka é '9092'. O argumento bootstrap_servers é usado para definir o nome do host com a porta. 'First_Topic'é definido como um nome de tópico pelo qual a mensagem de texto será enviada do produtor. A seguir, uma mensagem de texto simples, 'Olá de Kafka'é enviado usando enviar() método de KafkaProducer para o tópico, 'First_Topic'.

produtor1.py:

# Importar KafkaProducer da biblioteca Kafka
de kafka import KafkaProducer
# Definir servidor com porta
bootstrap_servers = ['localhost: 9092']
# Defina o nome do tópico onde a mensagem será publicada
topicName = 'First_Topic'
# Inicializar a variável do produtor
produtor = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Publicar texto em tópico definido
produtor.enviar (topicName, b'Hello from kafka… ')
# Imprimir mensagem
imprimir ("Mensagem enviada")

Crie um arquivo chamado consumidor1.py com o seguinte script python. KafkaConsumer módulo é importado da biblioteca Kafka para ler dados do Kafka. sys módulo é usado aqui para encerrar o script. O mesmo nome de host e número de porta do produtor são usados ​​no script do consumidor para ler dados de Kafka. O nome do tópico do consumidor e do produtor deve ser o mesmo que é 'First_topic'.  Em seguida, o objeto consumidor é inicializado com os três argumentos. Nome do tópico, id do grupo e informações do servidor. para loop é usado aqui para ler o texto enviado pelo produtor Kafka.

consumidor1.py:

# Importar KafkaConsumer da biblioteca Kafka
de kafka import KafkaConsumer
# Módulo de importação sys
import sys
# Definir servidor com porta
bootstrap_servers = ['localhost: 9092']
# Defina o nome do tópico de onde a mensagem será recebida
topicName = 'First_Topic'
# Inicializar a variável do consumidor
consumidor = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Leia e imprima a mensagem do consumidor
para msg no consumidor:
print ("Nome do Tópico =% s, Mensagem =% s"% (msg.tópico, msg.valor))
# Terminar o script
sys.saída()

Resultado:

Execute o seguinte comando em um terminal para executar o script do produtor.

$ python3 producer1.py

A seguinte saída aparecerá após o envio da mensagem.

Execute o seguinte comando de outro terminal para executar o script do consumidor.

$ python3 consumer1.py

A saída mostra o nome do tópico e a mensagem de texto enviada do produtor.

Lendo dados formatados em JSON de Kafka

Os dados formatados em JSON podem ser enviados pelo produtor Kafka e lidos pelo consumidor Kafka usando o json módulo de python. Como os dados JSON podem ser serializados e desserializados antes de enviar e receber os dados usando o módulo python-kafka é mostrado nesta parte deste tutorial.

Crie um script python chamado produtor2.py com o seguinte script. Outro módulo chamado JSON é importado com KafkaProducer módulo aqui. value_serializer argumento é usado com bootstrap_servers argumento aqui para inicializar o objeto do produtor Kafka. Este argumento indica que os dados JSON serão codificados usando 'utf-8'conjunto de caracteres no momento do envio. Em seguida, os dados formatados em JSON são enviados ao tópico denominado JSONtopic.

produtor2.py:

# Importar KafkaProducer da biblioteca Kafka
de kafka import KafkaProducer
# Importe módulo JSON para serializar dados
import json
# Inicializar a variável do produtor e definir o parâmetro para codificação JSON
produtor = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.despejos (v).codificar ('utf-8'))
# Envie dados no formato JSON
produtor.enviar ('JSONtopic', 'nome': 'fahmida', 'email': '[email protegido]')
 
# Imprimir mensagem
print ("Mensagem enviada para JSONtopic")

Crie um script python chamado consumidor2.py com o seguinte script. KafkaConsumer, sys e os módulos JSON são importados neste script. KafkaConsumer módulo é usado para ler dados formatados em JSON do Kafka. O módulo JSON é usado para decodificar os dados JSON codificados enviados do produtor Kafka. Sys módulo é usado para encerrar o script. value_deserializer argumento é usado com bootstrap_servers para definir como os dados JSON serão decodificados. Próximo, para loop é usado para imprimir todos os registros do consumidor e dados JSON recuperados de Kafka.

consumidor2.py:

# Importar KafkaConsumer da biblioteca Kafka
de kafka import KafkaConsumer
# Módulo de importação sys
import sys
# Importe o módulo json para serializar dados
import json
# Inicializar a variável do consumidor e definir a propriedade para decodificação JSON
consumidor = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.carrega (m.decodificar ('utf-8')))
# Leia os dados do kafka
para mensagem no consumidor:
imprimir ("Registros do consumidor: \ n")
imprimir (mensagem)
imprimir ("\ nLendo a partir de dados JSON \ n")
imprimir ("Nome:", mensagem [6] ['nome'])
imprimir ("E-mail:", mensagem [6] ['e-mail'])
# Terminar o script
sys.saída()

Resultado:

Execute o seguinte comando em um terminal para executar o script do produtor.

$ python3 producer2.py

O script imprimirá a seguinte mensagem após enviar os dados JSON.

Execute o seguinte comando de outro terminal para executar o script do consumidor.

$ python3 consumer2.py

A seguinte saída aparecerá após a execução do script.

Conclusão:

Os dados podem ser enviados e recebidos em diferentes formatos de Kafka usando python. Os dados também podem ser armazenados no banco de dados e recuperados do banco de dados usando Kafka e python. Estou em casa, este tutorial ajudará o usuário Python a começar a trabalhar com Kafka.

Vulkan para usuários de Linux
A cada nova geração de placas gráficas, vemos os desenvolvedores de jogos ultrapassarem os limites da fidelidade gráfica e se aproximarem mais do foto...
OpenTTD vs Simutrans
Criar sua própria simulação de transporte pode ser divertido, relaxante e extremamente atraente. É por isso que você precisa ter certeza de experiment...
Tutorial OpenTTD
OpenTTD é um dos jogos de simulação de negócios mais populares que existem. Neste jogo, você precisa criar um maravilhoso negócio de transporte. No en...