Python Kafka Producer
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
BOOTSTRAP_SERVERS = ['161.97.112.186:9094']
TOPIC_NAME = 'coolify-test-topic'
def json_serializer(data):
return json.dumps(data).encode('utf-8')
def on_send_success(record_metadata):
print(f"Topic: {record_metadata.topic}")
print(f"Partition: {record_metadata.partition}")
print(f"Offset: {record_metadata.offset}")
def on_send_error(excp):
print(f"Failed: {excp}")
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
value_serializer=json_serializer,
acks='all',
retries=3
)
for i in range(1, 6):
payload = {
'id': i+15,
'message': f'Hello from Python to Coolify Kafka! Test message {i+15}',
'timestamp': time.time()
}
producer.send(
TOPIC_NAME,
value=payload
).add_callback(
on_send_success
).add_errback(
on_send_error
)
producer.flush()
producer.close()