diff --git a/ansible/core_kafka_setup.yml b/ansible/core_kafka_setup.yml
new file mode 100644
index 0000000000000000000000000000000000000000..c48a5c464502acc3cc802f7c3c36fa1e228de0d2
--- /dev/null
+++ b/ansible/core_kafka_setup.yml
@@ -0,0 +1,12 @@
+---
+- hosts: processing-cluster-kafka
+  become: yes
+  pre_tasks:
+    - name: Registering kafka_id
+      set_fact:
+        kafka_id: "{% for servername in play_hosts %}{% if inventory_hostname==servername %}{{ loop.index }}{% endif %}{% endfor %}"
+    - name: Print kafka_id
+      debug:
+        var: kafka_id
+  roles:
+    - setup-kafka
diff --git a/ansible/inventory/env/group_vars/all.yml b/ansible/inventory/env/group_vars/all.yml
index f1dcb8fbe098a5820fe6acd1cf333405aca4b1df..839e4e702b9eb7519d0efc7979e82d0b12a546e5 100644
--- a/ansible/inventory/env/group_vars/all.yml
+++ b/ansible/inventory/env/group_vars/all.yml
@@ -658,3 +658,13 @@ kafka_urls: "{{groups['processing-cluster-kafka']|join(':9092,')}}:9092"
 kafka_topics_certificate_instruction: "{{env_name}}.coursebatch.certificate.request"
 cert_service_container_name: "{{env}}-e-credentials"
 cert_service_cloud_storage_type: "{{cert_service_cloud_storage_type}}"
+
+### Release-2.4.0 ###
+samza_tar_files_localpath: roles/samza-jobs/defaults    
+job_names:    
+  lms.user-account-merger_1:    
+      job_file_name: 'user-account-merge'    
+  lms.sso-account-updater_1:    
+      job_file_name: 'sso-account-updater'    
+  lms.indexer_1:    
+      job_file_name: 'indexer' 
diff --git a/ansible/roles/samza-jobs/defaults/main.yml b/ansible/roles/samza-jobs/defaults/main.yml
index bde0b20f953d4fa440a9e8661091e87e4bb5255d..0adcc6cd9f8ce380db7604bd7f5f151b4bf23260 100644
--- a/ansible/roles/samza-jobs/defaults/main.yml
+++ b/ansible/roles/samza-jobs/defaults/main.yml
@@ -6,5 +6,11 @@ hadoop_version: 2.7.2
 __yarn_port__: 8000
 cassandra_port: 9042
 es_port: 9200
-
-#telemetry_extractor_container_memory_mb: 1024
+samza_tar_files_localpath: roles/samza-jobs/defaults
+job_names:
+  lms.user-account-merger_1:
+      job_file_name: 'user-account-merge'
+  lms.sso-account-updater_1:
+      job_file_name: 'sso-account-updater'
+  lms.indexer_1:
+      job_file_name: 'indexer'
diff --git a/ansible/roles/samza-jobs/files/find_job_name.sh b/ansible/roles/samza-jobs/files/find_job_name.sh
deleted file mode 100644
index 05f06052239d66020f7ea30ee92ff3201805aa2c..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/find_job_name.sh
+++ /dev/null
@@ -1 +0,0 @@
-sed -n "/job\.name.*$/ p" $1 | sed -n "s/=/\\t/g p" | cut -f 2
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/files/get_all_job_name.sh b/ansible/roles/samza-jobs/files/get_all_job_name.sh
deleted file mode 100644
index 7975c8a34a8c5d3c7a72df8be218e945ad4be47d..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/get_all_job_name.sh
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/usr/bin/env bash
-find . -name "*.properties" | while read fname; do
-  job_name=`sed -n "/^job\.name.*$/ p" $fname | sed -n "s/=/\\t/g p" | cut -f 2`
-  folder_path=$(dirname `dirname "$fname"`)
-  folder_name=`basename $folder_path`
-  echo "$folder_name:$job_name:---:stopped"
-done > $1
diff --git a/ansible/roles/samza-jobs/files/get_all_running_app_id.sh b/ansible/roles/samza-jobs/files/get_all_running_app_id.sh
deleted file mode 100644
index 74aa7c049123dc11ecb7572a22aef2c30f5db088..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/get_all_running_app_id.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/usr/bin/env bash
-./yarn application -list | cut -f 2 | sed 1,'/Application-Name/'d
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/files/get_all_running_app_name.sh b/ansible/roles/samza-jobs/files/get_all_running_app_name.sh
deleted file mode 100644
index b3b1b9dff2010a1073ca96482db8505e6ededf5a..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/get_all_running_app_name.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env bash
-job_names=(`./yarn application -list | cut -f 2 | sed 1,'/Application-Name/'d | sed 's/_1$//'`)
-job_ids=(`./yarn application -list | cut -f 1 | sed 1,'/Application-Id/'d`)
-count=${#job_names[@]}
-for (( i=0; i<${count}; i++ ));
-do
-	job_name=${job_names[i]}
-	job_id=${job_ids[i]}
-	`sed -i /$job_name/s/stopped/started/g $1`
-	`sed -i /$job_name/s/---/$job_id/g $1`
-done
diff --git a/ansible/roles/samza-jobs/files/kill_all_app.sh b/ansible/roles/samza-jobs/files/kill_all_app.sh
deleted file mode 100644
index 55f7341e25ea3350113c398574182c4ad351a0cb..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/kill_all_app.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/usr/bin/env bash
-./yarn application -list > applist.txt
-sed -n "/$1.*$/ p" applist.txt | cut -f 1 > temp.txt
-while read in;
-do
-./yarn application -kill  "$in";
-done < temp.txt
-rm temp.txt
-rm applist.txt
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/files/kill_jobs.sh b/ansible/roles/samza-jobs/files/kill_jobs.sh
deleted file mode 100644
index 267515cdea2a7761b542db764418868145d59a71..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/kill_jobs.sh
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env bash
-cat $1 | while read LINE
-do
- application_id=`echo $LINE | awk -F':' '{print $3}'`;
- status=`echo $LINE | awk -F':' '{print $4}'`;
- 
- if [ "$status" == "restart" ]
- then
-  ./yarn application -kill $application_id
- fi
-done
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/files/remove_old_tar.sh b/ansible/roles/samza-jobs/files/remove_old_tar.sh
deleted file mode 100644
index 13d0547b89be9c75b3f076139ea522137489fc8b..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/remove_old_tar.sh
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/usr/bin/env bash
-cat $1 | awk -F':' '{print $1}' > tmp.txt
-DIRS=`ls -l $2/extract/ | egrep '^d'| awk '{print $9}'`
-for dir in $DIRS
-do
-  if ! grep -Fxq $dir tmp.txt
-  then
-     rm -rf $dir
-     rm $2/$dir
-  fi
-done
-rm tmp.txt
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/files/start_jobs.sh b/ansible/roles/samza-jobs/files/start_jobs.sh
deleted file mode 100644
index 4d048a58a8000b62b27b89202898f6d2f6e15525..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/start_jobs.sh
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/usr/bin/env bash
-folder_path=$2
-cat $1 | while read LINE
-do
- dir_name=`echo $LINE | awk -F':' '{print $1}'`;
- job_name=`echo $LINE | awk -F':' '{print $2}'`;
- application_id=`echo $LINE | awk -F':' '{print $3}'`;
- status=`echo $LINE | awk -F':' '{print $4}'`;
- properties_path="$folder_path/$dir_name/config/*.properties"
- config_file_path=`ls -d $properties_path`
- if [ "$status" == "stopped" ] || [ "$status" == "restart" ]
- then
-   ./$dir_name/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///$config_file_path
- fi
-done
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/files/update_new_job_name.sh b/ansible/roles/samza-jobs/files/update_new_job_name.sh
deleted file mode 100644
index 24e174ce540866aafd3ef3e88c66b3bc50d983a8..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/files/update_new_job_name.sh
+++ /dev/null
@@ -1,14 +0,0 @@
-#!/usr/bin/env bash
-find $2 -name "*.properties" | while read fname; do
-  job_name=`sed -n "/^job\.name.*$/ p" $fname | sed -n "s/=/\\t/g p" | cut -f 2`
-  folder_path=$(dirname `dirname "$fname"`)
-  folder_name=`basename $folder_path`
-  if grep -Fwq $job_name $1
-  	then
-      `sed -i /$job_name/s/^.*\.gz/$folder_name/ $1`;
-      `sed -i /$job_name/s/started/restart/ $1`;
-  	else
-      echo "adding"
-    	echo "$folder_name:$job_name:---:stopped" >> $1
-  fi
-done
\ No newline at end of file
diff --git a/ansible/roles/samza-jobs/tasks/deploy.yml b/ansible/roles/samza-jobs/tasks/deploy.yml
deleted file mode 100644
index 67c51a8b42b5eb28578edece68993d830f57433f..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/tasks/deploy.yml
+++ /dev/null
@@ -1,101 +0,0 @@
----
-- name: Create Directory for Jobs
-  file: path={{item}} owner=hduser group=hadoop recurse=yes state=directory
-  with_items:
-    - "{{samza_jobs_dir}}"
-    - "{{samza_jobs_dir}}/extract"
-
-- name: Copy script to get all running jobs
-  copy: src=get_all_running_app_name.sh dest=/usr/local/hadoop/bin owner=hduser group=hadoop mode="u=rwx,g=rx,o=r"
-
-- name: Copy script to get all job names
-  copy: src=get_all_job_name.sh dest="{{samza_jobs_dir}}/extract" owner=hduser group=hadoop mode="u=rwx,g=rx,o=r"
-
-- name: Copy script to get updated job names from extracted tar
-  copy: src=update_new_job_name.sh dest="{{samza_jobs_dir}}/extract" owner=hduser group=hadoop mode="u=rwx,g=rx,o=r"
-
-- name: Copy script to start jobs based on the status
-  copy: src=start_jobs.sh dest="{{samza_jobs_dir}}/extract" owner=hduser group=hadoop mode="u=rwx,g=rx,o=r"
-
-- name: Copy script to remove old job tar
-  copy: src=remove_old_tar.sh dest="{{samza_jobs_dir}}/extract" owner=hduser group=hadoop mode="u=rwx,g=rx,o=r"
-
-- name: Copy script to kill jobs based on the status
-  copy: src=kill_jobs.sh dest=/usr/local/hadoop/bin owner=hduser group=hadoop mode="u=rwx,g=rx,o=r"
-
-- name: Remove file of job status
-  file: path="{{job_status_file}}" state=absent
-
-- name: Get job names from folder
-  command: bash -lc "./get_all_job_name.sh {{job_status_file}}"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract"
-
-- name: Ensure yarn resource manager is running
-  command: bash -lc "(ps aux | grep yarn-hduser-resourcemanager | grep -v grep) || /usr/local/hadoop/sbin/yarn-daemon.sh --config /usr/local/hadoop-{{hadoop_version}}/conf/ start resourcemanager"
-  become: yes
-  become_user: hduser
-
-- name: Update status of running job in file
-  command: bash -lc "./get_all_running_app_name.sh {{job_status_file}}"
-  args:
-    chdir: /usr/local/hadoop/bin
-
-- name: copy new jobs tar ball
-  copy: src={{ item }} dest={{samza_jobs_dir}}/ force=no owner=hduser group=hadoop
-  with_fileglob:
-    - ./jobs/*
-  register: new_jobs
-
-- name: Create Directory to extract new jobs
-  file: path={{samza_jobs_dir}}/extract/{{item.item | basename }} owner=hduser group=hadoop recurse=yes state=directory
-  register: extract_dir
-  when: "{{item|changed}}"
-  with_items: "{{ (new_jobs|default({})).results|default([]) }}"
-
-- name: extract new jobs
-  command: tar -xvf "{{samza_jobs_dir}}/{{item.item | basename}}" -C "{{samza_jobs_dir}}/extract/{{item.item | basename }}"
-  when: "{{item|changed}}"
-  with_items: "{{ (new_jobs|default({})).results|default([]) }}"
-
-- name: Create Directory to extract new jobs
-  file: path={{samza_jobs_dir}}/extract/ owner=hduser group=hadoop recurse=yes
-
-- name: Get all new job configs
-  shell: "ls -d -1 {{item.path}}/config/*.properties"
-  register: config_files
-  when: "{{item|changed}}"
-  with_items: "{{ (extract_dir|default({})).results|default([]) }}"
-
-
-- name: update environment specific details in new job configs
-  replace: dest="{{item[1].stdout}}" regexp="{{item[0].key}}" replace="{{item[0].value}}"
-  when: "{{item[1]|changed}}"
-  with_nested:
-    - [{key: "__yarn_host__", value: "{{__yarn_host__}}"}, {key: "__yarn_port__", value: "{{__yarn_port__}}"}, {key: "__env__", value: "{{env_name}}" }, {key: "__zookeepers__", value: "{{zookeepers}}"}, {key: "__kafka_brokers__", value: "{{kafka_brokers}}"}, {key: "__lms_host__", value: "{{__lms_host__}}"}, {key: "__lms_es_port__", value: "{{sunbird_es_port}}"}, {key: "__lms_es_host__", value: "{{sunbird_es_host}}"}]
-    - "{{ (config_files|default({})).results|default([]) }}"
-
-- name: Update status of new jobs in file
-  command: bash -lc "./update_new_job_name.sh {{job_status_file}} {{samza_jobs_dir}}/extract/{{item.item | basename}}"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract/"
-  when: "{{item|changed}}"
-  with_items: "{{ (new_jobs|default({})).results|default([]) }}"
-
-- name: Kill jobs
-  command: bash -lc "./kill_jobs.sh {{job_status_file}}"
-  args:
-    chdir: /usr/local/hadoop/bin
-
-- name: Start jobs
-  command: bash -lc "./start_jobs.sh {{job_status_file}} {{samza_jobs_dir}}/extract"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract/"
-  become_user: hduser
-
-- name: Remove all old tar
-  command: bash -lc "./remove_old_tar.sh {{job_status_file}} {{samza_jobs_dir}}"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract/"
-
-- file: path={{samza_jobs_dir}} owner=hduser group=hadoop state=directory recurse=yes
diff --git a/ansible/roles/samza-jobs/tasks/main.yml b/ansible/roles/samza-jobs/tasks/main.yml
index 0feb5dcd994a6828f46059024d5a3642a7ccf1a5..6352fe84f6875b6b437817af3147dc07bfe89b12 100644
--- a/ansible/roles/samza-jobs/tasks/main.yml
+++ b/ansible/roles/samza-jobs/tasks/main.yml
@@ -1,9 +1,73 @@
----
-- include: deploy.yml
-  when: deploy_jobs | default(false)
+ - name: Create Directory for Jobs
+   file: path={{ item }} owner=hduser group=hadoop recurse=yes state=directory
+   with_items:
+     - "{{ samza_jobs_dir }}"
+     - "{{ samza_jobs_dir }}/extract"
 
-- include: stop_jobs.yml
-  when: stop_jobs | default(false)
+ - name: Get the application id to kill the app
+   shell: "{{ yarn_path }}/yarn application --list | grep -i {{ item }} | awk '{print $1}'"
+   with_items: "{{ job_names_to_kill.split(',')|list }}"
+   register: appid
 
-- include: start_jobs.yml
-  when: start_jobs | default(false)
+ - name: Kill the mentioned applications
+   shell: "{{ yarn_path }}/yarn application -kill {{ item.stdout }}"
+   with_items:
+     - "{{ appid['results'] }}"
+   when: item.stdout | length > 0
+
+ - name: find the existing file names to remove
+   find:
+     paths: "{{ samza_jobs_dir }}"
+     patterns: "{{ job_names['%s'|format(item)].job_file_name }}*"
+     recurse: yes
+   with_items: "{{ job_names_to_kill.split(',') }}"
+   register: existing_files
+
+ - name: remove the files under "{{ samza_jobs_dir }}" directory
+   command: rm -rf "{{ item.path | basename }}"
+   with_items: "{{ existing_files | json_query('results[].files[]') }}"
+   args:
+      chdir: "{{ samza_jobs_dir }}"
+
+ - name: remove the files under "{{ samza_jobs_dir }}/extract" directory
+   command: rm -rf "{{ item.path | basename }}"
+   with_items: "{{ existing_files | json_query('results[].files[]') }}"
+   args:
+      chdir: "{{ samza_jobs_dir }}/extract"
+
+ - name: copy new jobs tar ball
+   copy: src={{ item }} dest={{ samza_jobs_dir }}/ force=no owner=hduser group=hadoop
+   with_fileglob:
+     - ../defaults/jobs/*
+   register: new_jobs
+
+ - name: Create directory to extract new jobs
+   file: path="{{ samza_jobs_dir }}/extract/{{ item }}" owner=hduser group=hadoop recurse=yes state=directory
+   with_items:
+     - "{{ new_jobs | json_query('results[].invocation.module_args.original_basename') }}"
+
+ - name: extract new jobs
+   unarchive:
+         src: "{{ samza_jobs_dir }}/{{ item }}"
+         dest: "{{ samza_jobs_dir }}/extract/{{ item }}"
+         remote_src: yes
+   with_items:
+     - "{{ new_jobs | json_query('results[].invocation.module_args.original_basename') }}"
+
+ - name: Get all new jobs config
+   shell: "ls -d -1 {{ samza_jobs_dir }}/extract/{{ item }}/config/*.properties"
+   register: config_files
+   with_items:
+     - "{{ new_jobs | json_query('results[].invocation.module_args.original_basename') }}"
+
+ - name: update environment specific details in new job configs
+   replace: dest="{{ item[1].stdout }}" regexp="{{ item[0].key }}" replace="{{ item[0].value }}"
+   with_nested:
+     - [{key: "__yarn_host__", value: "{{__yarn_host__}}"}, {key: "__yarn_port__", value: "{{__yarn_port__}}"}, {key: "__env__", value: "{{env_name}}" }, {key: "__zookeepers__", value: "{{zookeepers}}"}, {key: "__kafka_brokers__", value: "{{kafka_brokers}}"}, {key: "__lms_host__", value: "{{__lms_host__}}"}, {key: "__lms_es_port__", value: "{{sunbird_es_port}}"}, {key: "__lms_es_host__", value: "{{sunbird_es_host}}"}]
+     - "{{ config_files | json_query('results[]') }}"
+
+ - name: Start the jobs
+   shell: "{{ samza_jobs_dir }}/extract/{{ item.0 }}/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path={{ item.1.stdout }}"
+   with_together:
+     - "{{ new_jobs | json_query('results[].invocation.module_args.original_basename') }}"
+     - "{{ config_files | json_query('results[]') }}"
diff --git a/ansible/roles/samza-jobs/tasks/start_jobs.yml b/ansible/roles/samza-jobs/tasks/start_jobs.yml
deleted file mode 100644
index 4bb0c65c9c58bd0667eaa3cb593fe692076b522a..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/tasks/start_jobs.yml
+++ /dev/null
@@ -1,21 +0,0 @@
----
-- name: Remove file of job status
-  file: path="{{job_status_file}}" state=absent
-  become: yes
-
-- name: Get job names from folder
-  command: bash -lc "./get_all_job_name.sh {{job_status_file}}"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract"
-  become: yes
-
-- name: Ensure yarn resource manager is running
-  command: bash -lc "(ps aux | grep yarn-hduser-resourcemanager | grep -v grep) || /usr/local/hadoop/sbin/yarn-daemon.sh --config /usr/local/hadoop-{{hadoop_version}}/conf/ start resourcemanager"
-  become: yes
-  become_user: hduser
-
-- name: Start jobs
-  command: bash -lc "./start_jobs.sh {{job_status_file}} {{samza_jobs_dir}}/extract"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract/"
-  become: yes
diff --git a/ansible/roles/samza-jobs/tasks/stop_jobs.yml b/ansible/roles/samza-jobs/tasks/stop_jobs.yml
deleted file mode 100644
index 1ef2f7b748366687005502f192bdc0092b75d08a..0000000000000000000000000000000000000000
--- a/ansible/roles/samza-jobs/tasks/stop_jobs.yml
+++ /dev/null
@@ -1,16 +0,0 @@
----
-- name: Remove file of job status
-  file: path="{{job_status_file}}" state=absent
-  become: yes
-
-- name: Get job names from folder
-  command: bash -lc "./get_all_job_name.sh {{job_status_file}}"
-  args:
-    chdir: "{{samza_jobs_dir}}/extract"
-  become: yes
-
-- name: Kill jobs
-  command: bash -lc "./kill_jobs.sh {{job_status_file}}"
-  args:
-    chdir: /usr/local/hadoop/bin
-  become: yes
diff --git a/ansible/roles/setup-kafka/defaults/main.yml b/ansible/roles/setup-kafka/defaults/main.yml
new file mode 100644
index 0000000000000000000000000000000000000000..1a2fb50cd595353f02d2d98accdda7a313648988
--- /dev/null
+++ b/ansible/roles/setup-kafka/defaults/main.yml
@@ -0,0 +1,23 @@
+env: dev
+
+processing_kafka_overriden_topics:
+  - name: lms.audit.events
+    retention_time: 172800000
+    replication_factor: 1
+  - name: lms.sso.events
+    retention_time: 172800000
+    replication_factor: 1
+  - name: lms.user.account.merge
+    retention_time: 172800000
+    replication_factor: 1
+
+processing_kafka_topics:
+  - name: lms.audit.events
+    num_of_partitions: 1
+    replication_factor: 1
+  - name: lms.sso.events
+    num_of_partitions: 4
+    replication_factor: 1
+  - name: lms.user.account.merge
+    num_of_partitions: 1
+    replication_factor: 1
diff --git a/ansible/roles/setup-kafka/tasks/main.yml b/ansible/roles/setup-kafka/tasks/main.yml
new file mode 100644
index 0000000000000000000000000000000000000000..b5811b969b2bfc4a33bda7409c22e3240ea1b985
--- /dev/null
+++ b/ansible/roles/setup-kafka/tasks/main.yml
@@ -0,0 +1,14 @@
+- name: create topics
+  command: /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic {{env_name}}.{{item.name}} --partitions {{ item.num_of_partitions }} --replication-factor {{ item.replication_factor }}
+  with_items: "{{processing_kafka_topics}}"
+  ignore_errors: true
+  when: kafka_id=="1"
+  tags:
+    - processing-kafka
+
+- name: override retention time
+  command: /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic {{env_name}}.{{item.name}} --config retention.ms={{ item.retention_time }}
+  with_items: "{{processing_kafka_overriden_topics}}"
+  when: kafka_id=="1" and item.retention_time is defined  
+  tags:
+    - processing-kafka
diff --git a/ansible/samza_deploy.yml b/ansible/samza_deploy.yml
index de923114dcd39d7fd32e23f1b6c36d082c3fdf10..9ac34de840b0111376dbb318ee330694a702eb15 100644
--- a/ansible/samza_deploy.yml
+++ b/ansible/samza_deploy.yml
@@ -1,14 +1,34 @@
 ---
-- name: "Start Nodemanager on Slaves"
+- name: Move the selected samza app tar files to another dir
+  hosts: localhost
+  tasks:
+    - name: find the selected samza app tar files path
+      find:
+        paths: "{{job_workspace}}/{{ samza_tar_files_localpath }}/allfiles"
+        patterns: "{{ job_names['%s'|format(item)].job_file_name }}*"
+        recurse: yes
+      with_items: "{{ job_names_to_kill.split(',') }}"
+      register: existing_files
+
+    - name: Copy the selected samza app tar files to jobs folder
+      copy:
+       src: "{{ item }}"
+       dest: "{{job_workspace}}/{{ samza_tar_files_localpath }}/jobs"
+      with_items:
+         - "{{ existing_files | json_query('results[].files[].path') }}"
+
+- name: "Start Nodemanager on Slaves if stopped"
   hosts: "yarn-slave"
   vars:
     hadoop_version: 2.7.2
   become: yes
-  tasks:
+  pre_tasks:
     - name: Ensure yarn nodemanager is running
       become_user: hduser
       shell: |
-         (ps aux | grep yarn-hduser-nodemanager | grep -v grep) || /usr/local/hadoop/sbin/yarn-daemon.sh --config /usr/local/hadoop-{{hadoop_version}}/conf/ start nodemanager || sleep 10
+         (ps aux | grep yarn-hduser-nodemanager | grep -v grep) \
+         || /usr/local/hadoop/sbin/yarn-daemon.sh --config /usr/local/hadoop-{{ hadoop_version }}/conf/ start nodemanager \
+         || sleep 10
 
     - name: install imagemagick
       apt: name=imagemagick state=present update_cache=yes
@@ -16,9 +36,13 @@
 - name: "Deploy Samza jobs"
   hosts: "yarn-master"
   become: yes
+  become_user: hduser
   vars_files:
-    - "{{inventory_dir}}/secrets.yml"
-  vars:
-    deploy_jobs: true
+    - "{{ inventory_dir }}/secrets.yml"
+  pre_tasks:
+    - name: Ensure yarn resource manager is running
+      shell: |
+        (ps aux | grep yarn-hduser-resourcemanager | grep -v grep) \
+        || /usr/local/hadoop/sbin/yarn-daemon.sh --config /usr/local/hadoop-{{ hadoop_version }}/conf/ start resourcemanager
   roles:
     - samza-jobs
diff --git a/pipelines/deploy/kafka/Jenkinsfile b/pipelines/deploy/kafka/Jenkinsfile
new file mode 100644
index 0000000000000000000000000000000000000000..7a99d974533e888e5d6fa847b4ff79fabd69e947
--- /dev/null
+++ b/pipelines/deploy/kafka/Jenkinsfile
@@ -0,0 +1,38 @@
+@Library('deploy-conf') _
+node() {
+    try {
+        String ANSI_GREEN = "\u001B[32m"
+        String ANSI_NORMAL = "\u001B[0m"
+        String ANSI_BOLD = "\u001B[1m"
+        String ANSI_RED = "\u001B[31m"
+        String ANSI_YELLOW = "\u001B[33m"
+
+        stage('checkout public repo') {
+            cleanWs()
+            checkout scm
+        }
+
+        ansiColor('xterm') {
+            stage('deploy'){
+                values = [:]
+                envDir = sh(returnStdout: true, script: "echo $JOB_NAME").split('/')[-3].trim()
+                module = sh(returnStdout: true, script: "echo $JOB_NAME").split('/')[-2].trim()
+                jobName = sh(returnStdout: true, script: "echo $JOB_NAME").split('/')[-1].trim()
+                currentWs = sh(returnStdout: true, script: 'pwd').trim()
+                ansiblePlaybook = "${currentWs}/ansible/core_kafka_setup.yml"
+                ansibleExtraArgs = "--vault-password-file /var/lib/jenkins/secrets/vault-pass"
+                values.put('currentWs', currentWs)
+                values.put('env', envDir)
+                values.put('module', module)
+                values.put('jobName', jobName)
+                values.put('ansiblePlaybook', ansiblePlaybook)
+                values.put('ansibleExtraArgs', ansibleExtraArgs)
+                println values
+                ansible_playbook_run(values)
+            }
+        }
+    }
+    catch (err) {
+        throw err
+    }
+}
diff --git a/pipelines/deploy/yarn/Jenkinsfile b/pipelines/deploy/yarn/Jenkinsfile
index c9d7f32e5dd30b3fce30c6eb3f9cd8169885c68d..a18a7317cc6cf1a9328eb965c6f675f3a825410e 100644
--- a/pipelines/deploy/yarn/Jenkinsfile
+++ b/pipelines/deploy/yarn/Jenkinsfile
@@ -41,14 +41,15 @@ node() {
                 stage('deploy artifact'){
                     sh """
                        unzip ${artifact}
-                       mv distribution-* ansible
-                       rm -rf ansible/roles/samza-jobs/files/jobs
-                       mkdir  ansible/roles/samza-jobs/files/jobs
-                       tar -xvf ansible/distribution-* -C ansible/roles/samza-jobs/files/jobs/
+                       mv distribution-*.tar.gz ansible
+                       rm -rf ansible/roles/samza-jobs/defaults/jobs
+                       mkdir  ansible/roles/samza-jobs/defaults/jobs ansible/roles/samza-jobs/defaults/allfiles
+                       chmod 777 ansible/roles/samza-jobs/defaults/jobs
+                       tar -xvf ansible/distribution-*.tar.gz -C ansible/roles/samza-jobs/defaults/allfiles/
 
                     """
                     ansiblePlaybook = "${currentWs}/ansible/samza_deploy.yml"
-                    ansibleExtraArgs = "--vault-password-file /var/lib/jenkins/secrets/vault-pass"
+                    ansibleExtraArgs = "--extra-vars \"job_names_to_kill=${params.job_names_to_deploy} job_workspace=${WORKSPACE}/ansible\" --vault-password-file /var/lib/jenkins/secrets/vault-pass"
                     values.put('ansiblePlaybook', ansiblePlaybook)
                     values.put('ansibleExtraArgs', ansibleExtraArgs)
                     println values
@@ -65,3 +66,4 @@ node() {
         throw err
     }
 }
+