Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

SOLVED: Unable to get key in Kafka Python Consumer

hasherBaba:

I have written a code that would produce messages in key-value fashion in a topic:


from kafka import KafkaProducer
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
response = '{ "_id": "g0G5AhSuZr", "timeZone": { "name": "India Standard Time", "offset": "GMT+05:30" }, "appVersion": { "createdIn": "2.2.1", "submittedIn": "2.2.1", "lastModifiedIn": "2.2.1" }, "formRevision": { "createdIn": "MfFUSui33VBqemj3RDej", "submittedIn": "MfFUSui33VBqemj3RDej", "lastModifiedIn": "MfFUSui33VBqemj3RDej", "lastRecomputedIn": "MfFUSui33VBqemj3RDej" }, "_p_createdBy": "_User$bU4azzUNQC", "lastModifiedOnDeviceAt": "2017-11-21T12:01:13.122+0000", "submittedOnDeviceAt": "2017-11-21T12:01:13.122+0000", "createdOnDeviceAt": "2017-11-21T11:58:16.523+0000", "syncedFromDeviceAt": "2017-11-29T05:44:34.784+0000", "client": "android", "ip": "100.120.136.139", "deviceId": "4bc03533ccc94065", "formId": "DwTp9nwx2m", "responseId": "c03c4851-701f-4265-aafd-eb133c09c08e", "responseFamilyId": "c03c4851-701f-4265-aafd-eb133c09c08e", "isActive": true, "_wperm": [ "bU4azzUNQC", "role:analysts_DwTp9nwx2m" ], "_rperm": [ "bU4azzUNQC", "role:analysts_DwTp9nwx2m" ], "_acl": { "bU4azzUNQC": { "w": true, "r": true }, "role:analysts_DwTp9nwx2m": { "w": true, "r": true } }, "_created_at": "2017-11-29T05:45:24.076000", "_updated_at": "2017-11-29T05:45:24.076000"}'
responeJson = json.loads(response)
deviceId = responeJson["deviceId"]
responseId = responeJson["responseId"]

print deviceId
print responseId

producer.send('collect-response-devices', {'deviceId': deviceId})
producer.send('collect-response-responses', {'responseId' : responseId})

def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)

def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
print excp
# handle exception

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)

However, my consumer would consume messages, assign nothing(None in the key) and everything in the value part.


from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['collect-response-devices'])
for message in consumer:
print (message.key, message.value)

This is the output of the consumer:

(None, {u'deviceId': u'4bc03533ccc94065'})



Posted in S.E.F
via StackOverflow & StackExchange Atomic Web Robots


This post first appeared on Stack Solved, please read the originial post: here

Share the post

SOLVED: Unable to get key in Kafka Python Consumer

×

Subscribe to Stack Solved

Get updates delivered right to your inbox!

Thank you for your subscription

×