파이썬으로 카프카에 토픽을 만들어 등록해보자...
#-*- coding: utf-8 -*-
import sys
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=["kafka1:9092","kafka2:9092","kafka3:9092"])
topicName="t5"
msg={"id":"test","tel":"010-1111-2222","regDate":"20190603"}
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
producer = KafkaProducer(value_serializer=lambda m: json.dumps(msg).encode('ascii'))
producer.send(topicName, {'key': 'value'}).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
producer = KafkaProducer(retries=5)
또는
import sys
import json
from kafka import KafkaProducer
bootstrap_servers = ["kafka1:9092","kafka2:9092","kafka3:9092"]
topicName = 't5'
msg={"id":"test2","tel":"010-1111-2222","regDate":"20190603"}
producer = KafkaProducer(value_serializer=lambda m: json.dumps(msg).encode('ascii'))
ack = producer.send(topicName, {'key': 'value'})
metadata = ack.get()
print(metadata.topic)
print(metadata.partition)
print(metadata.offset)
위에걸로 에러가 나면 아래처럼 한다..
-*- coding: utf-8 -*-
import sys
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=["kafka1:9092","kafka2:9092","kafka3:9092"])
topicName="topicKwon"
msg={"id":"test","tel":"010-1111-2222","regDate":"20190603"}
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
producer.send(topicName, json.dumps(msg).encode('ascii')).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()
이렇게 입력하고 파이썬 실행한다.
#python producer.py
이 토픽을 읽을땐 consumer.py를 만든다...
from kafka import KafkaConsumer
import sys
bootstrap_servers = ["kafka1:9092","kafka2:9092","kafka3:9092"]
topicName = 't5'
consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers,
auto_offset_reset = 'earliest')
try:
for message in consumer:
#print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
print(message.value)
except KeyboardInterrupt:
sys.exit()
#python consumer.py
이렇게 하면 나온다..
여기서 주목해야할것은 group_id이다. group_id가 같으면 다른 서버에서 메세지를 먼저 읽으면 그 메세지를 불러오지 않는다.
지금 테스트서버가 3대이니까 1번서버에서 consumer.py를 실행해서 읽으면 2번서버에서는 consumer.py를 실행해도 아무것도 나오지 않는다는거다. 중복으로 메세지를 읽는걸 방지하기 위해서 있는거다.
물론 group_id를 다르게 하면 다시 메세지를 읽어올 수 있다...
그건 각자 해보도록....
'파이썬' 카테고리의 다른 글
파이썬으로 스마트컨트랙트 배포하기 feat POA 2/2 (0) | 2019.07.23 |
---|---|
파이썬으로 스마트컨트랙트 배포하기 feat POA 1/2 (0) | 2019.07.23 |
파이썬으로 크롤링하는 서버는 용량체크에 신경써라 (0) | 2019.06.14 |
파이썬으로 코인 전송하고 트랜잭션 확인하기 feat web3 (0) | 2019.06.04 |
파이썬으로 크롤링 할때 크롬이나 파폭 드라이버 쓰면 서버 부하 올라가는 현상 (0) | 2018.07.06 |