diff --git a/pom.xml b/pom.xml index 69851aa0387d9dc2680833ed44f23536a1113277..7328b3a46406f7a905e981dcbe83326e6b0c961d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ <name>compentency-passbook</name> <description>compentency-passbook project for Spring Boot</description> <properties> - <java.version>1.8</java.version> + <java.version>11</java.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> @@ -84,7 +84,15 @@ <mainClass>com.sphere.compentency.kafka.consumer.api.kafkaConsumer</mainClass> </configuration> </plugin> - </plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>11</source> + <target>11</target> + </configuration> + </plugin> + </plugins> </build> </project> diff --git a/src/main/java/com/sphere/compentency/CompentencyApplication.java b/src/main/java/com/sphere/compentency/CompentencyApplication.java index 3eec13d9c38f18b8bb30feed4f8cfe17995e4264..f1601f04841ac4bc1a267066af61e833d7394f24 100644 --- a/src/main/java/com/sphere/compentency/CompentencyApplication.java +++ b/src/main/java/com/sphere/compentency/CompentencyApplication.java @@ -1,13 +1,16 @@ package com.sphere.compentency; +import com.sphere.compentency.kafka.consumer.api.kafkaConsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; @SpringBootApplication +@ComponentScan(basePackages = "com.sphere.compentency.kafka.consumer.api") public class CompentencyApplication { public static void main(String[] args) { SpringApplication.run(CompentencyApplication.class, args); + kafkaConsumer.startKafkaConsumer(); } - } \ No newline at end of file diff --git a/src/main/java/com/sphere/compentency/common/utils/propertiesCache.java b/src/main/java/com/sphere/compentency/common/utils/propertiesCache.java index 905261d3a4c005e5ee878d9c3425d444d8194244..5bba457659daded87d806738de62a627eb946e9b 100644 --- a/src/main/java/com/sphere/compentency/common/utils/propertiesCache.java +++ b/src/main/java/com/sphere/compentency/common/utils/propertiesCache.java @@ -1,10 +1,14 @@ package com.sphere.compentency.common.utils; +import org.springframework.stereotype.Component; +@Component public class propertiesCache { public String getProperty(String key) { String value = System.getenv(key); -// if (StringUtils.isNotBlank(value)); + if (value == null) { + throw new IllegalStateException("Required property not set: " + key); + } return value; } } \ No newline at end of file diff --git a/src/main/java/com/sphere/compentency/externalservice/controller/ServiceController.java b/src/main/java/com/sphere/compentency/externalservice/controller/ServiceController.java index ca34b785074b8bb27a286b5ea95c7bc9d231406a..4c34fbca5f97e00d65169422fe4a649769bcd40e 100644 --- a/src/main/java/com/sphere/compentency/externalservice/controller/ServiceController.java +++ b/src/main/java/com/sphere/compentency/externalservice/controller/ServiceController.java @@ -1,37 +1,35 @@ package com.sphere.compentency.externalservice.controller; -import java.util.Map; - +import com.sphere.compentency.external.service.ExternalService; +import com.sphere.compentency.kafka.consumer.api.ApiService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import com.sphere.compentency.external.service.ExternalService; - @RestController @RequestMapping("/compentencytool/v1") public class ServiceController { @Autowired ExternalService externalService; - @GetMapping("/user/read/{user_Id}") - public ResponseEntity<String> UserRead(@RequestHeader Map<String,String> headers,@PathVariable (value = "user_Id") String UserID){ - return externalService.read_user(headers,UserID); - } - - @GetMapping("/content/read/{Content_Id}") - public ResponseEntity<String> ContentRead(@PathVariable(value = "Content_Id") String ContentID){ - return externalService.read_content(ContentID); - } - + @Autowired + ApiService services; + +// @GetMapping("/user/read/{user_Id}") +// public ResponseEntity<String> UserRead(@RequestHeader Map<String,String> headers,@PathVariable (value = "user_Id") String UserID){ +// return externalService.read_user(headers,UserID); +// } +// +// @GetMapping("/content/read/{Content_Id}") +// public ResponseEntity<String> ContentRead(@PathVariable(value = "Content_Id") String ContentID){ +// return externalService.read_content(ContentID); +// } +// @PatchMapping("/learner/course/v1/passbook/update") +// public ResponseEntity<String> UpdatePassbook(@RequestBody JSONObject request) throws JsonProcessingException { +// return services.passbookUpdate(request); +// } + + // @PostMapping(value = "/generate-token",consumes = {MediaType.APPLICATION_FORM_URLENCODED_VALUE}) // public ResponseEntity<String> GenerateToken(String username,String password,String client_id,String grant_type){ // System.out.println("generate token : "+username+password+client_id+grant_type); diff --git a/src/main/java/com/sphere/compentency/kafka/consumer/api/Api_services.java b/src/main/java/com/sphere/compentency/kafka/consumer/api/Api_services.java deleted file mode 100644 index 725ca67ce5bce9f60f7817b2b50df4a741093dcb..0000000000000000000000000000000000000000 --- a/src/main/java/com/sphere/compentency/kafka/consumer/api/Api_services.java +++ /dev/null @@ -1,214 +0,0 @@ -package com.sphere.compentency.kafka.consumer.api; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.sphere.compentency.common.utils.propertiesCache; -import org.apache.http.client.HttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.json.JSONArray; -import org.json.JSONObject; -import org.springframework.http.*; -import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; -import org.springframework.web.client.RestTemplate; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - - -public class Api_services { - RestTemplate restTemplate = new RestTemplate(); - -// ResourceBundle props = ResourceBundle.getBundle("application"); - propertiesCache env=new propertiesCache(); - public Object get_hierarchy(String courseId, JSONArray UserId) { -// String url = "https://sphere.aastrika.org/api/private/content/v3/hierarchy/"+courseId+"?hierarchyType=detail"; - String url = env.getProperty("get.hierarchy")+ courseId + "?hierarchyType=detail"; - System.out.println(url); - String Api_key = env.getProperty("Api.key"); - System.out.println(Api_key); - HttpHeaders header = new HttpHeaders(); - header.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); - header.add("Authorization", Api_key); - ResponseEntity<String> response = null; - try { - HttpEntity<String> entity = new HttpEntity<String>("parameters", header); - response = restTemplate.exchange(url, HttpMethod.GET, entity, String.class); - String responseStr = response.getBody(); - int begin = responseStr.indexOf("{"); - int end = responseStr.lastIndexOf("}") + 1; - responseStr = responseStr.substring(begin, end); - - System.out.println("=========******** hierachy API Called return response ************============="); - System.out.println(responseStr); - Object course_data = getCourseInfo_parse(responseStr, UserId); - } catch (Exception e) { - System.out.println(e.fillInStackTrace()); - } - - return response; - - } - - private Object getCourseInfo_parse(String response, JSONArray userId) throws JsonProcessingException { - - JSONObject Competency_object = new JSONObject(response); - JSONObject result_object = Competency_object.getJSONObject("result"); - JSONObject content_data = (JSONObject) result_object.get("content"); - // Course info - String Course_name = (String) content_data.get("name"); - String course_Id = (String) content_data.get("identifier"); - // parse competencies_v1 string json - System.out.println(content_data.get("competencies_v1")); - String competencyData = (String) content_data.get("competencies_v1"); - JSONArray competencyData1 = new JSONArray(competencyData); - - for (int id = 0; id < userId.length(); id++) { - System.out.println(userId.get(id)); - String Userid = (String) userId.get(id); - - for (int i = 0; i < competencyData1.length(); i++) { - System.out.println("competency loop inside "); - JSONObject compentency_parse = (JSONObject) competencyData1.get(i); - - // competencies_v1 Info - String cmp_id = compentency_parse.get("competencyId").toString(); - String competencyName = (String) compentency_parse.get("competencyName"); - String competency_level = String.valueOf(compentency_parse.get("level")) ; - System.out.println("cmp_id = " + cmp_id + "competencyName = " + competencyName + "competency_level =" + competency_level); - - // Call Get_entityById (CompetenceId) Api Service - JSONObject get_entity_response = get_entityById(cmp_id); - System.out.println("get_entity_response :" + get_entity_response); - - - System.out.println(get_entity_response.get("name")); - String entity_name = (String) get_entity_response.get("name"); - Integer levelId = (Integer) get_entity_response.get("levelId"); - -// Prepare passbook data - - JSONObject mainobj = new JSONObject(); - JSONObject main1 = new JSONObject(); - main1.put("userId", Userid); - main1.put("typeName", "competency"); - - JSONArray competencyDetails = new JSONArray(); - JSONObject competencyDetails_main_obj = new JSONObject(); - competencyDetails_main_obj.put("competencyId", cmp_id); - - JSONObject additionalParams = new JSONObject(); - additionalParams.put("competencyName", competencyName); -// additionalParams.put("competencyType", (Collection<?>) null); -// additionalParams.put("competencyArea", (Collection<?>) null); - competencyDetails_main_obj.put("additionalParams", additionalParams); - - JSONObject acquiredDetails = new JSONObject(); - - acquiredDetails.put("acquiredChannel", "course"); - acquiredDetails.put("competencyLevelId", competency_level); -// acquiredDetails.put("secondaryPassbookId", (Collection<?>) null); - JSONObject acquiredDetails_additionalParams = new JSONObject(); - acquiredDetails_additionalParams.put("courseId", course_Id); - acquiredDetails_additionalParams.put("courseName", Course_name); - - acquiredDetails.put("additionalParams", acquiredDetails_additionalParams); - competencyDetails_main_obj.put("acquiredDetails", acquiredDetails); - competencyDetails.put(competencyDetails_main_obj); - main1.put("competencyDetails", competencyDetails); - - mainobj.put("request", main1); - System.out.println(mainobj.toString(4)); - Passbook_update(Userid, mainobj); - } - } - return null; - } - - private void Passbook_update(String userId, JSONObject request) throws JsonProcessingException { - - RestTemplate restTemplate = new RestTemplate(); - HttpClient httpClient = HttpClientBuilder.create().build(); - HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient); - restTemplate.setRequestFactory(requestFactory); - - String request_body = request.toString(); - String url = env.getProperty("passbook.update.url"); - String Api_key =env.getProperty("Api.key"); - HttpHeaders header = new HttpHeaders(); - header.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); - header.add("Authorization", Api_key); - header.add("x-authenticated-userid", userId); - Map<String, Object> mapping = new ObjectMapper().readValue(request_body, HashMap.class); - - System.out.println("mapping : " + mapping); - - HttpEntity<Map<String, Object>> entity = new HttpEntity<>(mapping, header); - ResponseEntity<String> passbookResponse = restTemplate.exchange(url, HttpMethod.PATCH, entity, String.class); - System.out.println(passbookResponse); - String responseStr = passbookResponse.getBody(); - int begin = responseStr.indexOf("{"); - int end = responseStr.lastIndexOf("}") + 1; - responseStr = responseStr.substring(begin, end); - System.out.println(responseStr); - JSONObject passbook_payload = new JSONObject(responseStr); - System.out.println(passbook_payload.toString(4)); - - } - - private JSONObject get_entityById(String competency_id) throws JsonProcessingException { - String url = env.getProperty("get.entityById") + competency_id; - System.out.println("get_entityById fun "); - HttpHeaders header = new HttpHeaders(); - header.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); -// header.add("x-authenticated-user-token", token); - - String req = "{\"filter\":{\"isDetail\":true}}"; - Map<String, Object> mapping = new ObjectMapper().readValue(req, HashMap.class); - - HttpEntity<Map<String, Object>> entity = new HttpEntity<>(mapping, header); - ResponseEntity<String> entityResponse = restTemplate.exchange(url, HttpMethod.POST, entity, String.class); -// System.out.println(entityResponse); - String responseStr = entityResponse.getBody(); - int begin = responseStr.indexOf("{"); - int end = responseStr.lastIndexOf("}") + 1; - responseStr = responseStr.substring(begin, end); -// System.out.println(responseStr); - JSONObject passbook_payload = new JSONObject(responseStr); - JSONObject passbook_result = passbook_payload.getJSONObject("result"); - return (JSONObject) passbook_result.get("response"); - - } - -// private void GetPassbook(JSONArray userId) throws JsonProcessingException { -// -// String url = "https://sphere.aastrika.org/api/user/v1/passbook"; -// String Api_key = "bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJTNHNNVFdjZUZqYkxUWGxiczkzUzk4dmFtODBhdkRPUiJ9.nPOCY0-bVX28iNcxxnYbGpihY3ZzfNwx0-SFCnJwjas"; -// for (int i=0; i<userId.length();i++){ -// System.out.println("Passbook Update loop"); -// System.out.println(userId.get(i)); -// HttpHeaders header = new HttpHeaders(); -// header.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); -// header.add("Authorization", Api_key); -// header.add("x-authenticated-userid", (String) userId.get(i)); -// String req = "{\"request\":{\"typeName\":\"competency\"}}"; -// Map<String, Object> mapping = new ObjectMapper().readValue(req, HashMap.class); -// -// System.out.println(mapping); -// -// HttpEntity<Map<String, Object>> entity = new HttpEntity<>(mapping, header); -// ResponseEntity<String> entityResponse = restTemplate.exchange(url , HttpMethod.POST, entity, String.class); -// System.out.println(entityResponse); -// String responseStr = entityResponse.getBody(); -// int begin = responseStr.indexOf("{"); -// int end = responseStr.lastIndexOf("}") + 1; -// responseStr = responseStr.substring(begin, end); -// System.out.println(responseStr); -// JSONObject passbook_payload = new JSONObject(responseStr); -// JSONObject passbook_result=passbook_payload.getJSONObject("result"); -// System.out.println(passbook_result); -// -// } - - -} diff --git a/src/main/java/com/sphere/compentency/kafka/consumer/api/kafkaConsumer.java b/src/main/java/com/sphere/compentency/kafka/consumer/api/kafkaConsumer.java index 7b6ae7cb91b575e11c02312f4deb30021665855d..e3e3303d94e423f448657d060956ee0b3fe78dd9 100644 --- a/src/main/java/com/sphere/compentency/kafka/consumer/api/kafkaConsumer.java +++ b/src/main/java/com/sphere/compentency/kafka/consumer/api/kafkaConsumer.java @@ -1,74 +1,91 @@ package com.sphere.compentency.kafka.consumer.api; +import com.fasterxml.jackson.core.JsonProcessingException; import com.sphere.compentency.common.utils.propertiesCache; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.json.JSONArray; import org.json.JSONObject; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; +import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.Properties; -@SpringBootApplication public class kafkaConsumer { - public static void main(String[] args) { - SpringApplication.run(kafkaConsumer.class, args); + private static final Logger logger = LoggerFactory.getLogger(kafkaConsumer.class); + private static ApiService api_services = new ApiService(); + + public static void startKafkaConsumer() { + propertiesCache env = new propertiesCache(); - propertiesCache env=new propertiesCache(); - Api_services api_services= new Api_services(); - Logger logger= LoggerFactory.getLogger(kafkaConsumer.class.getName()); + String bootstrapServers = "localhost:9092"; + String groupId = "dev-activity-aggregate-updater-group"; + String topic = "dev.issue.certificate.request"; + System.out.println("Bootstrap Servers: " + bootstrapServers); - String bootstrapServers=env.getProperty("kafka.bootstrapServers"); - String grp_id=env.getProperty("kafka.groupID"); - String topic= env.getProperty("kafka.topic");; - System.out.println("topic --- > "+topic); - System.out.println("bootstrapServers --- > "+bootstrapServers); - System.out.println("grp_id --- > "+grp_id); + // Creating consumer properties + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - //Creating consumer properties - Properties properties=new Properties(); - properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); - properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); - properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id); - properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); - //creating consumer - KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties); - //Subscribing + // Creating consumer + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); + + // Subscribing to the topic consumer.subscribe(Arrays.asList(topic)); - System.out.println("kafka consumer subscribe & running"); - //polling - while(true){ - ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100)); - for(ConsumerRecord<String,String> record: records) { -// logger.info("Key: " + record.key() + ", Value:" + record.value()); - String msg = record.value(); - if (msg.length() != 0 & msg != null && !msg.isEmpty() && !msg.trim().isEmpty()) { - JSONObject json = new JSONObject(record.value()); - JSONObject edata = json.getJSONObject("edata"); - JSONArray userId = (JSONArray) edata.get("userIds"); - String courseId = (String) edata.get("courseId"); - System.out.println(edata.get("userIds")); - System.out.println(edata.get("courseId")); - api_services.get_hierarchy(courseId,userId); - } else { - System.out.println("null check condition : "+record.value()); -// logger.info("Partition:" + record.partition()+",Offset:"+record.offset()); - } + logger.info("Kafka consumer subscribed and running"); + // Polling for messages + try { + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + processKafkaMessage(record); + } } + } catch (Exception e) { + logger.error("Error in Kafka consumer: {}", e.getMessage(), e); + } finally { + // Close the consumer when done + consumer.close(); + } + } + + private static void processKafkaMessage(ConsumerRecord<String, String> record) throws IOException, InterruptedException { + logger.info("Inside processKafkaMessage with record " + record); + String msg = record.value(); + if (msg != null && !msg.isEmpty() && !msg.trim().isEmpty()) { + JSONObject json = new JSONObject(record.value()); + JSONObject edata = json.getJSONObject("edata"); + String userIds = edata.getString("userIds"); + // Now, you can pass userIds to your method + JSONObject relatedObject = json.getJSONObject("edata").getJSONObject("related"); + String courseId = relatedObject.getString("courseId");//do_1139628834519941121286,do_11394806141846323211 + logger.info("Processing Kafka message - userId: {}, courseId: {}", userIds, courseId); + api_services.get_hierarchy(courseId, userIds); + } else { + logger.warn("Received empty or null message from Kafka"); } } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 6720049c7667c4488631ef688952de85f0b45d06..ca58c2c0618c75e4adf447dac7061dda1fd80973 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -3,7 +3,7 @@ spring.datasource.username= postgres spring.datasource.password= postgres spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation= true -spring.jpa.properties.hibernate.dialect= org.hibernate.dialect.PostgreSQLDialect +spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect # Hibernate ddl auto (create, create-drop, validate, update) spring.jpa.hibernate.ddl-auto= update @@ -14,19 +14,20 @@ spring.datasource.initialization-mode=always user.read.api = https://uphrh.in/learner/user/v5/read content.read.api = https://uphrh.in/content/v2/read +kafka.bootstrapServers=localhost:9092 +kafka.topic=dev.issue.certificate.request +kafka.groupID=dev-activity-aggregate-updater-group -kafka.bootstrapServers = localhost:9092 -kafka.topic = dev.issue.certificate.request -kafka.groupID = dev-activity-aggregate-updater-group - -get.hierarchy= https://uphrh.in/course/v1/hierarchy +get.hierarchy=https://uphrh.in/api/course/v1/hierarchy/ Api.key=bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJSR3RkMkZzeG1EMnJER3I4dkJHZ0N6MVhyalhZUzBSSyJ9.kMLn6177rvY53i0RAN3SPD5m3ctwaLb32pMYQ65nBdA -get.entityById = http://localhost:8087/getEntityById/ -passbook.update.url = https://uphrh.in/api/user/v1/passbook +get.entityById=http://localhost:8087/getEntityById/ +passbook.update.url=http://lms-service:9000/private/v1/passbook/update +host.url=https://uphrh.in/ +framework.read=https://uphrh.in/api/framework/v1/read/nirayama_frccl_fw?categories=difficultyLevel,subject server.port = 8087 -#logging.level.org.springframework=OFF +logging.level.org.springframework=DEBUG #logging.level.root=OFF #logging.level.org.springframework.web=ERROR \ No newline at end of file