Ejemplos con Python#
Requisitos previos#
- Instalar Kafka: Aseg煤rate de tener Kafka instalado y en funcionamiento en tu entorno.
- Instalar la librer铆a confluent_kafka: Puedes instalarla mediante pip:
pip install confluent_kafka
Kafka producer#
El productor env铆a mensajes a un topic espec铆fico en Kafka. A continuaci贸n, se muestra un ejemplo simple de c贸mo crear un productor en Python:
producer.py
from confluent_kafka import Producer
# Configuraci贸n del productor
conf = {
'bootstrap.servers': 'localhost:9092' # Direcci贸n del broker de Kafka
}
# Crear el productor
producer = Producer(conf)
# Funci贸n para confirmar la entrega de los mensajes
def delivery_report(err, msg):
if err is not None:
print(f'Error en la entrega: {err}')
else:
print(f'Mensaje entregado a {msg.topic()} [{msg.partition()}]')
# Enviar un mensaje
topic = 'mi_topic'
producer.produce(topic, key='mi_clave', value='Hola, Kafka!', callback=delivery_report)
# Esperar a que se env铆en todos los mensajes
producer.flush()
Explicaci贸n:
- bootstrap.servers: Especifica la direcci贸n del broker de Kafka.
- producer.produce: Envia un mensaje al topic especificado. La funci贸n delivery_report es una callback que maneja la confirmaci贸n de la entrega.
- producer.flush: Asegura que todos los mensajes se hayan enviado antes de cerrar el productor.
Kafka consumer#
El consumidor se suscribe a un topic y recibe mensajes. A continuaci贸n se muestra c贸mo crear un consumidor en Python:
consumer.py
from confluent_kafka import Consumer, KafkaException
# Configuraci贸n del consumidor
conf = {
'bootstrap.servers': 'localhost:9092', # Direcci贸n del broker de Kafka
'group.id': 'mi_grupo', # ID del grupo de consumidores
'auto.offset.reset': 'earliest' # Leer mensajes desde el principio si no hay offset guardado
}
# Crear el consumidor
consumer = Consumer(conf)
# Suscribirse al topic
topic = 'mi_topic'
consumer.subscribe([topic])
# Leer mensajes
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
print(f'Recibido: {msg.value().decode("utf-8")} de {msg.topic()} [{msg.partition()}] en offset {msg.offset()}')
except KeyboardInterrupt:
pass
finally:
# Cerrar el consumidor
consumer.close()
Explicaci贸n:
- group.id: Define el ID del grupo de consumidores, lo cual permite que m煤ltiples consumidores compartan la carga.
- auto.offset.reset: Configura desde d贸nde comenzar a leer mensajes si no se ha guardado un offset previo.
- consumer.poll: Espera mensajes del topic y los procesa. Se usa un bucle para leer mensajes de manera continua.