From f4125ba373afd100cccb9a4b2e064783c2e9c632 Mon Sep 17 00:00:00 2001
From: aj7tesh <ajitesh.sharma@tarento.com>
Date: Thu, 28 May 2020 17:33:24 +0530
Subject: [PATCH] multiple kafka consumer handling for dev and production
 environment

---
 config/kafka_topics.py             | 21 ++++++++++++++++++++-
 kafka_utils/consumer.py            | 13 +++++--------
 kafka_utils/document_translator.py | 15 ++++++++-------
 server.py                          | 11 +++--------
 4 files changed, 36 insertions(+), 24 deletions(-)

diff --git a/config/kafka_topics.py b/config/kafka_topics.py
index 2ed38dac..e7a701c3 100644
--- a/config/kafka_topics.py
+++ b/config/kafka_topics.py
@@ -8,4 +8,23 @@ producer_topics ={
   "TEST_TOPIC":"listener",
   "TO_DOCUMENT":"listener",
   "new_topic":"nmt_translate_processed"
-}
\ No newline at end of file
+}
+
+kafka_topic = [
+  {
+    "consumer":"to-nmt",
+    "producer":"listener",
+    "description":"Document translation,also used in Suvas"
+  },
+  {
+    "consumer":"nmt_translate",
+    "producer":"nmt_translate_processed",
+    "description":"Pdf dev environment translation"
+  },
+  {
+    "consumer":"nmt_translate_production",
+    "producer":"nmt_translate_processed_production",
+    "description":"Pdf production translation"
+  }
+
+  ]
diff --git a/kafka_utils/consumer.py b/kafka_utils/consumer.py
index 34b2641b..9f25ae08 100644
--- a/kafka_utils/consumer.py
+++ b/kafka_utils/consumer.py
@@ -9,7 +9,7 @@ bootstrap_server = os.environ.get(KAFKA_IP_HOST, default_value)
 group_id = 'anuvaad'
 
 
-def get_consumer(topic):
+def get_consumer(topics):
     try:
         # consumer = KafkaConsumer(
         #            topic,
@@ -19,16 +19,13 @@ def get_consumer(topic):
         #            group_id=group_id,
         #            value_deserializer=lambda x: json.loads(x.decode('utf-8')))
         consumer = KafkaConsumer(
-            topic,
             bootstrap_servers=[bootstrap_server],
             value_deserializer=lambda x: json.loads(x.decode('utf-8')))
-        
-
-        logger.info('get_consumer : consumer returned for topic = ' + topic)
+    
+        consumer.subscribe(topics)    
+        logger.info('get_consumer : consumer returned for topics:{}'.format(topics))
         return consumer
     except Exception as e:
-        logger.error(
-            'get_consumer : ERROR OCCURRED for getting consumer with topic = ' + topic)
+        logger.error('ERROR OCCURRED for getting consumer with topics:{}'.format(topics))
         logger.error('get_consumer : ERROR = ' + str(e))
-        print('error')
         return None
\ No newline at end of file
diff --git a/kafka_utils/document_translator.py b/kafka_utils/document_translator.py
index ce8d9ee0..b7a562e9 100644
--- a/kafka_utils/document_translator.py
+++ b/kafka_utils/document_translator.py
@@ -6,7 +6,7 @@ import ancillary_functions_anuvaad.ancillary_functions as ancillary_functions
 import ancillary_functions_anuvaad.sc_preface_handler as sc_preface_handler
 import ancillary_functions_anuvaad.handle_date_url as date_url_util
 from config.config import statusCode,benchmark_types, language_supported, file_location
-from config.kafka_topics import consumer_topics,producer_topics
+from config.kafka_topics import consumer_topics,producer_topics,kafka_topic
 from onmt.utils.logging import init_logger,logger
 import os
 import datetime
@@ -16,7 +16,7 @@ import sys
 import translation_util.translate_util as translate_util
 
 
-def doc_translator(translation_server,c_topic,p_topic):
+def doc_translator(translation_server,c_topic):
     logger.info('doc_translator')  
     iq =0
     out = {}
@@ -26,6 +26,8 @@ def doc_translator(translation_server,c_topic,p_topic):
     p = get_producer()
     try:
         for msg in c:
+            producer_topic = [ topic["producer"] for topic in kafka_topic if topic["consumer"] == msg.topic][0]
+            logger.info("Producer for current consumer:{} is-{}".format(msg.topic,producer_topic))
             msg_count +=1
             logger.info("*******************msg receive count*********:{}".format(msg_count))
             iq = iq +1
@@ -52,15 +54,14 @@ def doc_translator(translation_server,c_topic,p_topic):
                     out['response_body'] = []
 
                 
-            p.send(p_topic, value={'out':out})
+            p.send(producer_topic, value={'out':out})
             p.flush()
             msg_sent += 1
             logger.info("*******************msg sent count*********:{}".format(msg_sent))
-            
     except ValueError:  # includes simplejson.decoder.JSONDecodeError
         logger.error("Decoding JSON has failed in document_translator: %s"% sys.exc_info()[0])
-        doc_translator(translation_server,c_topic,p_topic)  
+        doc_translator(translation_server,c_topic)  
     except Exception  as e:
-        logger.error("Unexpected error: %s"% sys.exc_info()[0])
+        logger.error("Unexpected error in kafak doc_translator: %s"% sys.exc_info()[0])
         logger.error("error in doc_translator: {}".format(e))
-        doc_translator(translation_server,c_topic,p_topic)
+        doc_translator(translation_server,c_topic)
diff --git a/server.py b/server.py
index e62ee75a..261277bb 100755
--- a/server.py
+++ b/server.py
@@ -27,7 +27,7 @@ from kafka_utils.document_translator import doc_translator
 import threading
 import translation_util.translate_util as translate_util
 import translation_util.interactive_translate as interactive_translation
-from config.kafka_topics import consumer_topics,producer_topics
+from config.kafka_topics import consumer_topics,producer_topics,kafka_topic
 
 STATUS_OK = "ok"
 STATUS_ERROR = "error"
@@ -62,17 +62,12 @@ def start(config_file,
     translation_server.start(config_file)
 
     def kafka_function():
-        logger.info('starting kafka from nmt-server')
-        doc_translator(translation_server,consumer_topics['DOCUMENT_REQ'],producer_topics['TO_DOCUMENT'])
-    def kafka_function_1():
-        logger.info('starting kafka on new topic from nmt-server')
-        doc_translator(translation_server,consumer_topics['new_topic'],producer_topics['new_topic'])     
+        logger.info('starting kafka from nmt-server on thread-1')
+        doc_translator(translation_server,[kafka_topic[0]['consumer'],kafka_topic[1]['consumer'],kafka_topic[2]['consumer']])     
 
     if bootstrap_server_boolean:
         t1 = threading.Thread(target=kafka_function)
-        t2 = threading.Thread(target=kafka_function_1)
         t1.start()
-        t2.start()
 
     @app.route('/models', methods=['GET'])
     def get_models():
-- 
GitLab