Skip to content

Ejemplos con Python#

Requisitos previos#

  1. Instalar Kafka: Aseg煤rate de tener Kafka instalado y en funcionamiento en tu entorno.
  2. Instalar la librer铆a confluent_kafka: Puedes instalarla mediante pip:
pip install confluent_kafka

Kafka producer#

El productor env铆a mensajes a un topic espec铆fico en Kafka. A continuaci贸n, se muestra un ejemplo simple de c贸mo crear un productor en Python:

producer.py
from confluent_kafka import Producer

# Configuraci贸n del productor
conf = {
    'bootstrap.servers': 'localhost:9092'  # Direcci贸n del broker de Kafka
}

# Crear el productor
producer = Producer(conf)

# Funci贸n para confirmar la entrega de los mensajes
def delivery_report(err, msg):
    if err is not None:
        print(f'Error en la entrega: {err}')
    else:
        print(f'Mensaje entregado a {msg.topic()} [{msg.partition()}]')

# Enviar un mensaje
topic = 'mi_topic'
producer.produce(topic, key='mi_clave', value='Hola, Kafka!', callback=delivery_report)

# Esperar a que se env铆en todos los mensajes
producer.flush()

Explicaci贸n:

  • bootstrap.servers: Especifica la direcci贸n del broker de Kafka.
  • producer.produce: Envia un mensaje al topic especificado. La funci贸n delivery_report es una callback que maneja la confirmaci贸n de la entrega.
  • producer.flush: Asegura que todos los mensajes se hayan enviado antes de cerrar el productor.

Kafka consumer#

El consumidor se suscribe a un topic y recibe mensajes. A continuaci贸n se muestra c贸mo crear un consumidor en Python:

consumer.py
from confluent_kafka import Consumer, KafkaException

# Configuraci贸n del consumidor
conf = {
    'bootstrap.servers': 'localhost:9092',  # Direcci贸n del broker de Kafka
    'group.id': 'mi_grupo',                 # ID del grupo de consumidores
    'auto.offset.reset': 'earliest'         # Leer mensajes desde el principio si no hay offset guardado
}

# Crear el consumidor
consumer = Consumer(conf)

# Suscribirse al topic
topic = 'mi_topic'
consumer.subscribe([topic])

# Leer mensajes
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        print(f'Recibido: {msg.value().decode("utf-8")} de {msg.topic()} [{msg.partition()}] en offset {msg.offset()}')

except KeyboardInterrupt:
    pass

finally:
    # Cerrar el consumidor
    consumer.close()

Explicaci贸n:

  • group.id: Define el ID del grupo de consumidores, lo cual permite que m煤ltiples consumidores compartan la carga.
  • auto.offset.reset: Configura desde d贸nde comenzar a leer mensajes si no se ha guardado un offset previo.
  • consumer.poll: Espera mensajes del topic y los procesa. Se usa un bucle para leer mensajes de manera continua.