Commit f4125ba3 authored by aj7tesh's avatar aj7tesh
Browse files

multiple kafka consumer handling for dev and production environment

Showing with 36 additions and 24 deletions
+36 -24
......@@ -8,4 +8,23 @@ producer_topics ={
"TEST_TOPIC":"listener",
"TO_DOCUMENT":"listener",
"new_topic":"nmt_translate_processed"
}
\ No newline at end of file
}
kafka_topic = [
{
"consumer":"to-nmt",
"producer":"listener",
"description":"Document translation,also used in Suvas"
},
{
"consumer":"nmt_translate",
"producer":"nmt_translate_processed",
"description":"Pdf dev environment translation"
},
{
"consumer":"nmt_translate_production",
"producer":"nmt_translate_processed_production",
"description":"Pdf production translation"
}
]
......@@ -9,7 +9,7 @@ bootstrap_server = os.environ.get(KAFKA_IP_HOST, default_value)
group_id = 'anuvaad'
def get_consumer(topic):
def get_consumer(topics):
try:
# consumer = KafkaConsumer(
# topic,
......@@ -19,16 +19,13 @@ def get_consumer(topic):
# group_id=group_id,
# value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer = KafkaConsumer(
topic,
bootstrap_servers=[bootstrap_server],
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
logger.info('get_consumer : consumer returned for topic = ' + topic)
consumer.subscribe(topics)
logger.info('get_consumer : consumer returned for topics:{}'.format(topics))
return consumer
except Exception as e:
logger.error(
'get_consumer : ERROR OCCURRED for getting consumer with topic = ' + topic)
logger.error('ERROR OCCURRED for getting consumer with topics:{}'.format(topics))
logger.error('get_consumer : ERROR = ' + str(e))
print('error')
return None
\ No newline at end of file
......@@ -6,7 +6,7 @@ import ancillary_functions_anuvaad.ancillary_functions as ancillary_functions
import ancillary_functions_anuvaad.sc_preface_handler as sc_preface_handler
import ancillary_functions_anuvaad.handle_date_url as date_url_util
from config.config import statusCode,benchmark_types, language_supported, file_location
from config.kafka_topics import consumer_topics,producer_topics
from config.kafka_topics import consumer_topics,producer_topics,kafka_topic
from onmt.utils.logging import init_logger,logger
import os
import datetime
......@@ -16,7 +16,7 @@ import sys
import translation_util.translate_util as translate_util
def doc_translator(translation_server,c_topic,p_topic):
def doc_translator(translation_server,c_topic):
logger.info('doc_translator')
iq =0
out = {}
......@@ -26,6 +26,8 @@ def doc_translator(translation_server,c_topic,p_topic):
p = get_producer()
try:
for msg in c:
producer_topic = [ topic["producer"] for topic in kafka_topic if topic["consumer"] == msg.topic][0]
logger.info("Producer for current consumer:{} is-{}".format(msg.topic,producer_topic))
msg_count +=1
logger.info("*******************msg receive count*********:{}".format(msg_count))
iq = iq +1
......@@ -52,15 +54,14 @@ def doc_translator(translation_server,c_topic,p_topic):
out['response_body'] = []
p.send(p_topic, value={'out':out})
p.send(producer_topic, value={'out':out})
p.flush()
msg_sent += 1
logger.info("*******************msg sent count*********:{}".format(msg_sent))
except ValueError: # includes simplejson.decoder.JSONDecodeError
logger.error("Decoding JSON has failed in document_translator: %s"% sys.exc_info()[0])
doc_translator(translation_server,c_topic,p_topic)
doc_translator(translation_server,c_topic)
except Exception as e:
logger.error("Unexpected error: %s"% sys.exc_info()[0])
logger.error("Unexpected error in kafak doc_translator: %s"% sys.exc_info()[0])
logger.error("error in doc_translator: {}".format(e))
doc_translator(translation_server,c_topic,p_topic)
doc_translator(translation_server,c_topic)
......@@ -27,7 +27,7 @@ from kafka_utils.document_translator import doc_translator
import threading
import translation_util.translate_util as translate_util
import translation_util.interactive_translate as interactive_translation
from config.kafka_topics import consumer_topics,producer_topics
from config.kafka_topics import consumer_topics,producer_topics,kafka_topic
STATUS_OK = "ok"
STATUS_ERROR = "error"
......@@ -62,17 +62,12 @@ def start(config_file,
translation_server.start(config_file)
def kafka_function():
logger.info('starting kafka from nmt-server')
doc_translator(translation_server,consumer_topics['DOCUMENT_REQ'],producer_topics['TO_DOCUMENT'])
def kafka_function_1():
logger.info('starting kafka on new topic from nmt-server')
doc_translator(translation_server,consumer_topics['new_topic'],producer_topics['new_topic'])
logger.info('starting kafka from nmt-server on thread-1')
doc_translator(translation_server,[kafka_topic[0]['consumer'],kafka_topic[1]['consumer'],kafka_topic[2]['consumer']])
if bootstrap_server_boolean:
t1 = threading.Thread(target=kafka_function)
t2 = threading.Thread(target=kafka_function_1)
t1.start()
t2.start()
@app.route('/models', methods=['GET'])
def get_models():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment