bearking58 commited on
Commit
2bf48c4
·
1 Parent(s): d9764fe

fix: add try catch to deserializer

Browse files
public-prediction/kafka_consumer.py CHANGED
@@ -31,13 +31,19 @@ def consume_messages():
31
  auto_offset_reset='earliest',
32
  client_id="ai-detector-1",
33
  group_id=None,
34
- value_deserializer=lambda x: json.loads(x.decode('utf-8'))
35
  )
36
 
 
 
37
  BATCH_SIZE = 5
38
 
39
  for message in consumer:
40
- full_batch = message.value
 
 
 
 
 
41
 
42
  for i in range(0, len(full_batch), BATCH_SIZE):
43
  batch = full_batch[i:i+BATCH_SIZE]
 
31
  auto_offset_reset='earliest',
32
  client_id="ai-detector-1",
33
  group_id=None,
 
34
  )
35
 
36
+ print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
37
+
38
  BATCH_SIZE = 5
39
 
40
  for message in consumer:
41
+ try:
42
+ full_batch = json.loads(message.value.decode("utf-8"))
43
+ except json.JSONDecodeError:
44
+ print("Failed to decode JSON from message:", message.value)
45
+ print("Continuing...")
46
+ continue
47
 
48
  for i in range(0, len(full_batch), BATCH_SIZE):
49
  batch = full_batch[i:i+BATCH_SIZE]