파이썬으로 카프카 토픽 가져와서 블록체인에 전송하고 txid 받아서 디비에 넣기
from kafka import KafkaConsumer
import sys
import hashlib
import pymysql
import random
import web3
from web3 import Web3, HTTPProvider
conn = pymysql.connect(host='192.168.0.1', user='testman', password='testman', db='openchain', charset='utf8')
curs = conn.cursor()
rpc_url = ["http://192.168.0.1:8545","http://192.168.0.2:8545","http://192.168.0.3:8545"]
#rpc = "http://localhost:8545"
rpc=random.choice(rpc_url)
w3 = Web3(HTTPProvider(rpc))
bootstrap_servers = ["192.168.0.1:39092","192.168.0.2:39092","192.168.0.3:39092"]
topicName = 'topicName'
consumer = KafkaConsumer (topicName, group_id = 'TEST',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest')
addr="0xb847db312283031603d08a97591f2f7d5b15f99a"
checkAddr= w3.toChecksumAddress(addr)
try:
for message in consumer:
data=message.value
encodedData=str(data).encode()
hashData=hexdigest = hashlib.sha256(encodedData).hexdigest()
params={
"to": checkAddr,
"from": w3.eth.coinbase,
"value": w3.toWei("0.5", "ether"),
"data":hashData,
"gas": 700000
}
tx = w3.eth.sendTransaction(params)
txid=tx.hex()
query="insert into table (txid) values ('"+txid+"')"
print(txid)
curs.execute(query)
conn.commit()
except KeyboardInterrupt:
conn.close()
sys.exit()
위 소스대로 하면 된다...
쉽다...