반응형

파이썬으로 카프카에 토픽을 만들어 등록해보자...

 

 

#-*- coding: utf-8 -*-

import sys

import json

from kafka import KafkaProducer

from kafka.errors import KafkaError

 

producer = KafkaProducer(bootstrap_servers=["kafka1:9092","kafka2:9092","kafka3:9092"])

topicName="t5"

msg={"id":"test","tel":"010-1111-2222","regDate":"20190603"}

 

def on_send_success(record_metadata):

print(record_metadata.topic)

print(record_metadata.partition)

print(record_metadata.offset)

 

def on_send_error(excp):

log.error('I am an errback', exc_info=excp)

 

producer = KafkaProducer(value_serializer=lambda m: json.dumps(msg).encode('ascii'))

producer.send(topicName, {'key': 'value'}).add_callback(on_send_success).add_errback(on_send_error)

 

producer.flush()

 

producer = KafkaProducer(retries=5)

 

또는 

 

import sys

import json

from kafka import KafkaProducer

 

bootstrap_servers = ["kafka1:9092","kafka2:9092","kafka3:9092"]

topicName = 't5'

msg={"id":"test2","tel":"010-1111-2222","regDate":"20190603"}

 

 

producer = KafkaProducer(value_serializer=lambda m: json.dumps(msg).encode('ascii'))

 

ack = producer.send(topicName, {'key': 'value'})

metadata = ack.get()

print(metadata.topic)

print(metadata.partition)

print(metadata.offset)

 

위에걸로 에러가 나면 아래처럼 한다..

 

-*- coding: utf-8 -*-
import sys
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=["kafka1:9092","kafka2:9092","kafka3:9092"])
topicName="topicKwon"

msg={"id":"test","tel":"010-1111-2222","regDate":"20190603"}
def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)

producer.send(topicName, json.dumps(msg).encode('ascii')).add_callback(on_send_success).add_errback(on_send_error)

producer.flush()

 

이렇게 입력하고 파이썬 실행한다.

 

#python producer.py

 

이 토픽을 읽을땐 consumer.py를 만든다...

 

from kafka import KafkaConsumer

import sys

 

bootstrap_servers = ["kafka1:9092","kafka2:9092","kafka3:9092"]

topicName = 't5'

 

consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers,

auto_offset_reset = 'earliest')

 

try:

for message in consumer:

#print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

print(message.value)

 

except KeyboardInterrupt:

sys.exit()

 

#python consumer.py

 

이렇게 하면 나온다..

 

여기서 주목해야할것은 group_id이다. group_id가 같으면 다른 서버에서 메세지를 먼저 읽으면 그 메세지를 불러오지 않는다.

 

지금 테스트서버가 3대이니까 1번서버에서 consumer.py를 실행해서 읽으면 2번서버에서는 consumer.py를 실행해도 아무것도 나오지 않는다는거다. 중복으로 메세지를 읽는걸 방지하기 위해서 있는거다.

 

물론 group_id를 다르게 하면 다시 메세지를 읽어올 수 있다...

 

그건 각자 해보도록....

반응형

+ Recent posts