From 87fe461ce01a4a81290d93fd098b6ea3daa5e98a Mon Sep 17 00:00:00 2001
From: PrasadMoka <prasad@sanketika.in>
Date: Fri, 16 Sep 2022 11:48:48 +0530
Subject: [PATCH] fix:LR-105 using kafka topic variables

---
 .../helm_charts/datapipeline_jobs/values.j2   | 60 +++++++++----------
 1 file changed, 30 insertions(+), 30 deletions(-)

diff --git a/kubernetes/helm_charts/datapipeline_jobs/values.j2 b/kubernetes/helm_charts/datapipeline_jobs/values.j2
index 371db55d..f7748493 100644
--- a/kubernetes/helm_charts/datapipeline_jobs/values.j2
+++ b/kubernetes/helm_charts/datapipeline_jobs/values.j2
@@ -83,7 +83,7 @@ base_config: |
       }
     }
     job {
-      env = "{{ env_name_bb }}"
+      env = "{{ env_name }}"
       enable.distributed.checkpointing = true
       statebackend {
         blob {
@@ -141,11 +141,11 @@ activity-aggregate-updater:
   activity-aggregate-updater: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.coursebatch.job.request
-      output.audit.topic = {{ env_name_bb }}.telemetry.raw
-      output.failed.topic = {{ env_name_bb }}.activity.agg.failed
-      output.certissue.topic = {{ env_name_bb }}.issue.certificate.request
-      groupId = {{ env_name_bb }}-activity-aggregate-group
+      input.topic = {{ kafka_topic_course_batch_job_request }}
+      output.audit.topic = {{ kafka_topic_telemetry_raw }}
+      output.failed.topic = {{ kafka_topic_activity_agg_failed }}
+      output.certissue.topic = {{ kafka_topic_certificate_request }}
+      groupId = {{ kafka_group_activity_agg }}
     }
     task {
       window.shards = {{ activity_agg_window_shards }}
@@ -201,8 +201,8 @@ relation-cache-updater:
   relation-cache-updater: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.content.postpublish.request
-      groupId = {{ env_name_bb }}-relation-cache-updater-group
+      input.topic = {{ kafka_topic_content_publish_request }}
+      groupId = {{ kafka_group_relation_cache_updater }}
     }
     task {
       consumer.parallelism = {{ relation_cache_updater_consumer_parallelism }}
@@ -233,11 +233,11 @@ enrolment-reconciliation:
   enrolment-reconciliation: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.batch.enrolment.sync.request
-      output.audit.topic = {{ env_name_bb }}.telemetry.raw
-      output.failed.topic = {{ env_name_bb }}.activity.agg.failed
-      output.certissue.topic = {{ env_name_bb }}.issue.certificate.request
-      groupId = {{ env_name_bb }}-enrolment-reconciliation-group
+      input.topic = {{ kafka_topic_enrolment_sync_request }}
+      output.audit.topic = {{ kafka_topic_telemetry_raw }}
+      output.failed.topic = {{ kafka_topic_activity_agg_failed }}
+      output.certissue.topic = {{ kafka_topic_certificate_request }}
+      groupId = {{ kafka_group_enrolment_reconciliation }}
     }
     task {
       restart-strategy.attempts = {{ restart_attempts }} # max 3 restart attempts
@@ -279,10 +279,10 @@ collection-cert-pre-processor:
   collection-cert-pre-processor: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.issue.certificate.request
-      output.topic = {{ env_name_bb }}.generate.certificate.request
-      output.failed.topic = {{ env_name_bb }}.issue.certificate.failed
-      groupId = {{ env_name_bb }}-collection-cert-pre-processor-group
+      input.topic = {{ kafka_topic_certificate_request }}
+      output.topic = {{ kafka_topic_generate_certificate_request }}
+      output.failed.topic = {{ kafka_topic_certificate_failed }}
+      groupId = {{ kafka_group_collection_pre_processor }}
     }
     task {
       restart-strategy.attempts = {{ restart_attempts }} # max 3 restart attempts
@@ -329,9 +329,9 @@ collection-certificate-generator:
   collection-certificate-generator: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.generate.certificate.request
-      output.audit.topic = {{ env_name_bb }}.telemetry.raw
-      groupId = {{ env_name_bb }}-certificate-generator-group
+      input.topic = {{ kafka_topic_generate_certificate_request }}
+      output.audit.topic = {{ kafka_topic_telemetry_raw }}
+      groupId = {{ kafka_group_certificate_generator }}
     }
     task {
       restart-strategy.attempts = {{ restart_attempts }} # max 3 restart attempts
@@ -375,10 +375,10 @@ merge-user-courses:
   merge-user-courses: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.lms.user.account.merge
-      output.failed.topic = {{ env_name_bb }}.learning.events.failed
-      groupId = {{ env_name_bb }}-merge-courses-group
-      output.course.batch.updater.topic = {{ env_name_bb }}.coursebatch.job.request
+      input.topic = {{ kafka_topic_lms_user_account }}
+      output.failed.topic = {{ kafka_topic_learning_failed }}
+      groupId = {{ kafka_group_merge_courses }}
+      output.course.batch.updater.topic = {{ kafka_topic_course_batch_job_request }}
     }
     task {
       consumer.parallelism = {{ merge_user_courses_consumer_parallelism }}
@@ -408,10 +408,10 @@ assessment-aggregator:
       producer.broker-servers = "{{ kafka_brokers }}"
       consumer.broker-servers = "{{ kafka_brokers }}"
       zookeeper = "{{ zookeepers }}"
-      input.topic = {{ env_name_bb }}.telemetry.assess
-      failed.topic= {{ env_name_bb }}.telemetry.assess.failed
-      groupId = {{ env_name_bb }}-assessment-aggregator-group
-      output.certissue.topic = {{ env_name_bb }}.issue.certificate.request
+      input.topic = {{ kafka_topic_assessment }}
+      failed.topic= {{ kafka_topic_assessment_failed }}
+      groupId = {{ kafka_group_assessment_aggregator }}
+      output.certissue.topic = {{ kafka_topic_certificate_request }}
     }
     task {
       consumer.parallelism = {{ assessaggregator_consumer_parallelism }}
@@ -450,8 +450,8 @@ notification-job:
   notification-job: |+
     include file("/data/flink/conf/base-config.conf")
     kafka {
-      input.topic = {{ env_name_bb }}.lms.notification
-      groupId = {{ env_name_bb }}-lms.notification
+      input.topic = {{ kafka_topic_lms_notification }}
+      groupId = {{ kafka_group_lms_notification }}
     }
     task {
       restart-strategy.attempts = {{ restart_attempts }} # max 3 restart attempts
-- 
GitLab