Skip to content
GitLab
Explore
Projects
Groups
Topics
Snippets
Projects
Groups
Topics
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Register
Sign in
Toggle navigation
Menu
UPSMF
competency-passbook
Commits
029c0442
Commit
029c0442
authored
1 year ago
by
Radheshhathwar
Browse files
Options
Download
Patches
Plain Diff
Passbook update changes with Kafka call
parent
100fc5ea
main
1 merge request
!4
Passbook update changes with Kafka call
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
pom.xml
+10
-2
pom.xml
src/main/java/com/sphere/compentency/CompentencyApplication.java
+4
-1
...n/java/com/sphere/compentency/CompentencyApplication.java
src/main/java/com/sphere/compentency/common/utils/propertiesCache.java
+5
-1
.../com/sphere/compentency/common/utils/propertiesCache.java
src/main/java/com/sphere/compentency/externalservice/controller/ServiceController.java
+20
-22
...entency/externalservice/controller/ServiceController.java
src/main/java/com/sphere/compentency/kafka/consumer/api/Api_services.java
+0
-214
...m/sphere/compentency/kafka/consumer/api/Api_services.java
src/main/java/com/sphere/compentency/kafka/consumer/api/kafkaConsumer.java
+59
-42
.../sphere/compentency/kafka/consumer/api/kafkaConsumer.java
src/main/resources/application.properties
+10
-9
src/main/resources/application.properties
with
108 additions
and
291 deletions
+108
-291
pom.xml
+
10
−
2
View file @
029c0442
...
...
@@ -14,7 +14,7 @@
<name>
compentency-passbook
</name>
<description>
compentency-passbook project for Spring Boot
</description>
<properties>
<java.version>
1
.8
</java.version>
<java.version>
1
1
</java.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
...
...
@@ -84,7 +84,15 @@
<mainClass>
com.sphere.compentency.kafka.consumer.api.kafkaConsumer
</mainClass>
</configuration>
</plugin>
</plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<configuration>
<source>
11
</source>
<target>
11
</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
This diff is collapsed.
Click to expand it.
src/main/java/com/sphere/compentency/CompentencyApplication.java
+
4
−
1
View file @
029c0442
package
com.sphere.compentency
;
import
com.sphere.compentency.kafka.consumer.api.kafkaConsumer
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
org.springframework.context.annotation.ComponentScan
;
@SpringBootApplication
@ComponentScan
(
basePackages
=
"com.sphere.compentency.kafka.consumer.api"
)
public
class
CompentencyApplication
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
CompentencyApplication
.
class
,
args
);
kafkaConsumer
.
startKafkaConsumer
();
}
}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
src/main/java/com/sphere/compentency/common/utils/propertiesCache.java
+
5
−
1
View file @
029c0442
package
com.sphere.compentency.common.utils
;
import
org.springframework.stereotype.Component
;
@Component
public
class
propertiesCache
{
public
String
getProperty
(
String
key
)
{
String
value
=
System
.
getenv
(
key
);
// if (StringUtils.isNotBlank(value));
if
(
value
==
null
)
{
throw
new
IllegalStateException
(
"Required property not set: "
+
key
);
}
return
value
;
}
}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
src/main/java/com/sphere/compentency/externalservice/controller/ServiceController.java
+
20
−
22
View file @
029c0442
package
com.sphere.compentency.externalservice.controller
;
import
java.util.Map
;
import
com.sphere.compentency.external.service.ExternalService
;
import
com.sphere.compentency.kafka.consumer.api.ApiService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.http.MediaType
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.util.LinkedMultiValueMap
;
import
org.springframework.util.MultiValueMap
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.PathVariable
;
import
org.springframework.web.bind.annotation.PostMapping
;
import
org.springframework.web.bind.annotation.RequestHeader
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RestController
;
import
com.sphere.compentency.external.service.ExternalService
;
@RestController
@RequestMapping
(
"/compentencytool/v1"
)
public
class
ServiceController
{
@Autowired
ExternalService
externalService
;
@GetMapping
(
"/user/read/{user_Id}"
)
public
ResponseEntity
<
String
>
UserRead
(
@RequestHeader
Map
<
String
,
String
>
headers
,
@PathVariable
(
value
=
"user_Id"
)
String
UserID
){
return
externalService
.
read_user
(
headers
,
UserID
);
}
@GetMapping
(
"/content/read/{Content_Id}"
)
public
ResponseEntity
<
String
>
ContentRead
(
@PathVariable
(
value
=
"Content_Id"
)
String
ContentID
){
return
externalService
.
read_content
(
ContentID
);
}
@Autowired
ApiService
services
;
// @GetMapping("/user/read/{user_Id}")
// public ResponseEntity<String> UserRead(@RequestHeader Map<String,String> headers,@PathVariable (value = "user_Id") String UserID){
// return externalService.read_user(headers,UserID);
// }
//
// @GetMapping("/content/read/{Content_Id}")
// public ResponseEntity<String> ContentRead(@PathVariable(value = "Content_Id") String ContentID){
// return externalService.read_content(ContentID);
// }
// @PatchMapping("/learner/course/v1/passbook/update")
// public ResponseEntity<String> UpdatePassbook(@RequestBody JSONObject request) throws JsonProcessingException {
// return services.passbookUpdate(request);
// }
// @PostMapping(value = "/generate-token",consumes = {MediaType.APPLICATION_FORM_URLENCODED_VALUE})
// public ResponseEntity<String> GenerateToken(String username,String password,String client_id,String grant_type){
// System.out.println("generate token : "+username+password+client_id+grant_type);
...
...
This diff is collapsed.
Click to expand it.
src/main/java/com/sphere/compentency/kafka/consumer/api/Api_services.java
deleted
100644 → 0
+
0
−
214
View file @
100fc5ea
package
com.sphere.compentency.kafka.consumer.api
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.sphere.compentency.common.utils.propertiesCache
;
import
org.apache.http.client.HttpClient
;
import
org.apache.http.impl.client.HttpClientBuilder
;
import
org.json.JSONArray
;
import
org.json.JSONObject
;
import
org.springframework.http.*
;
import
org.springframework.http.client.HttpComponentsClientHttpRequestFactory
;
import
org.springframework.web.client.RestTemplate
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.Map
;
public
class
Api_services
{
RestTemplate
restTemplate
=
new
RestTemplate
();
// ResourceBundle props = ResourceBundle.getBundle("application");
propertiesCache
env
=
new
propertiesCache
();
public
Object
get_hierarchy
(
String
courseId
,
JSONArray
UserId
)
{
// String url = "https://sphere.aastrika.org/api/private/content/v3/hierarchy/"+courseId+"?hierarchyType=detail";
String
url
=
env
.
getProperty
(
"get.hierarchy"
)+
courseId
+
"?hierarchyType=detail"
;
System
.
out
.
println
(
url
);
String
Api_key
=
env
.
getProperty
(
"Api.key"
);
System
.
out
.
println
(
Api_key
);
HttpHeaders
header
=
new
HttpHeaders
();
header
.
setAccept
(
Arrays
.
asList
(
MediaType
.
APPLICATION_JSON
));
header
.
add
(
"Authorization"
,
Api_key
);
ResponseEntity
<
String
>
response
=
null
;
try
{
HttpEntity
<
String
>
entity
=
new
HttpEntity
<
String
>(
"parameters"
,
header
);
response
=
restTemplate
.
exchange
(
url
,
HttpMethod
.
GET
,
entity
,
String
.
class
);
String
responseStr
=
response
.
getBody
();
int
begin
=
responseStr
.
indexOf
(
"{"
);
int
end
=
responseStr
.
lastIndexOf
(
"}"
)
+
1
;
responseStr
=
responseStr
.
substring
(
begin
,
end
);
System
.
out
.
println
(
"=========******** hierachy API Called return response ************============="
);
System
.
out
.
println
(
responseStr
);
Object
course_data
=
getCourseInfo_parse
(
responseStr
,
UserId
);
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
e
.
fillInStackTrace
());
}
return
response
;
}
private
Object
getCourseInfo_parse
(
String
response
,
JSONArray
userId
)
throws
JsonProcessingException
{
JSONObject
Competency_object
=
new
JSONObject
(
response
);
JSONObject
result_object
=
Competency_object
.
getJSONObject
(
"result"
);
JSONObject
content_data
=
(
JSONObject
)
result_object
.
get
(
"content"
);
// Course info
String
Course_name
=
(
String
)
content_data
.
get
(
"name"
);
String
course_Id
=
(
String
)
content_data
.
get
(
"identifier"
);
// parse competencies_v1 string json
System
.
out
.
println
(
content_data
.
get
(
"competencies_v1"
));
String
competencyData
=
(
String
)
content_data
.
get
(
"competencies_v1"
);
JSONArray
competencyData1
=
new
JSONArray
(
competencyData
);
for
(
int
id
=
0
;
id
<
userId
.
length
();
id
++)
{
System
.
out
.
println
(
userId
.
get
(
id
));
String
Userid
=
(
String
)
userId
.
get
(
id
);
for
(
int
i
=
0
;
i
<
competencyData1
.
length
();
i
++)
{
System
.
out
.
println
(
"competency loop inside "
);
JSONObject
compentency_parse
=
(
JSONObject
)
competencyData1
.
get
(
i
);
// competencies_v1 Info
String
cmp_id
=
compentency_parse
.
get
(
"competencyId"
).
toString
();
String
competencyName
=
(
String
)
compentency_parse
.
get
(
"competencyName"
);
String
competency_level
=
String
.
valueOf
(
compentency_parse
.
get
(
"level"
))
;
System
.
out
.
println
(
"cmp_id = "
+
cmp_id
+
"competencyName = "
+
competencyName
+
"competency_level ="
+
competency_level
);
// Call Get_entityById (CompetenceId) Api Service
JSONObject
get_entity_response
=
get_entityById
(
cmp_id
);
System
.
out
.
println
(
"get_entity_response :"
+
get_entity_response
);
System
.
out
.
println
(
get_entity_response
.
get
(
"name"
));
String
entity_name
=
(
String
)
get_entity_response
.
get
(
"name"
);
Integer
levelId
=
(
Integer
)
get_entity_response
.
get
(
"levelId"
);
// Prepare passbook data
JSONObject
mainobj
=
new
JSONObject
();
JSONObject
main1
=
new
JSONObject
();
main1
.
put
(
"userId"
,
Userid
);
main1
.
put
(
"typeName"
,
"competency"
);
JSONArray
competencyDetails
=
new
JSONArray
();
JSONObject
competencyDetails_main_obj
=
new
JSONObject
();
competencyDetails_main_obj
.
put
(
"competencyId"
,
cmp_id
);
JSONObject
additionalParams
=
new
JSONObject
();
additionalParams
.
put
(
"competencyName"
,
competencyName
);
// additionalParams.put("competencyType", (Collection<?>) null);
// additionalParams.put("competencyArea", (Collection<?>) null);
competencyDetails_main_obj
.
put
(
"additionalParams"
,
additionalParams
);
JSONObject
acquiredDetails
=
new
JSONObject
();
acquiredDetails
.
put
(
"acquiredChannel"
,
"course"
);
acquiredDetails
.
put
(
"competencyLevelId"
,
competency_level
);
// acquiredDetails.put("secondaryPassbookId", (Collection<?>) null);
JSONObject
acquiredDetails_additionalParams
=
new
JSONObject
();
acquiredDetails_additionalParams
.
put
(
"courseId"
,
course_Id
);
acquiredDetails_additionalParams
.
put
(
"courseName"
,
Course_name
);
acquiredDetails
.
put
(
"additionalParams"
,
acquiredDetails_additionalParams
);
competencyDetails_main_obj
.
put
(
"acquiredDetails"
,
acquiredDetails
);
competencyDetails
.
put
(
competencyDetails_main_obj
);
main1
.
put
(
"competencyDetails"
,
competencyDetails
);
mainobj
.
put
(
"request"
,
main1
);
System
.
out
.
println
(
mainobj
.
toString
(
4
));
Passbook_update
(
Userid
,
mainobj
);
}
}
return
null
;
}
private
void
Passbook_update
(
String
userId
,
JSONObject
request
)
throws
JsonProcessingException
{
RestTemplate
restTemplate
=
new
RestTemplate
();
HttpClient
httpClient
=
HttpClientBuilder
.
create
().
build
();
HttpComponentsClientHttpRequestFactory
requestFactory
=
new
HttpComponentsClientHttpRequestFactory
(
httpClient
);
restTemplate
.
setRequestFactory
(
requestFactory
);
String
request_body
=
request
.
toString
();
String
url
=
env
.
getProperty
(
"passbook.update.url"
);
String
Api_key
=
env
.
getProperty
(
"Api.key"
);
HttpHeaders
header
=
new
HttpHeaders
();
header
.
setAccept
(
Arrays
.
asList
(
MediaType
.
APPLICATION_JSON
));
header
.
add
(
"Authorization"
,
Api_key
);
header
.
add
(
"x-authenticated-userid"
,
userId
);
Map
<
String
,
Object
>
mapping
=
new
ObjectMapper
().
readValue
(
request_body
,
HashMap
.
class
);
System
.
out
.
println
(
"mapping : "
+
mapping
);
HttpEntity
<
Map
<
String
,
Object
>>
entity
=
new
HttpEntity
<>(
mapping
,
header
);
ResponseEntity
<
String
>
passbookResponse
=
restTemplate
.
exchange
(
url
,
HttpMethod
.
PATCH
,
entity
,
String
.
class
);
System
.
out
.
println
(
passbookResponse
);
String
responseStr
=
passbookResponse
.
getBody
();
int
begin
=
responseStr
.
indexOf
(
"{"
);
int
end
=
responseStr
.
lastIndexOf
(
"}"
)
+
1
;
responseStr
=
responseStr
.
substring
(
begin
,
end
);
System
.
out
.
println
(
responseStr
);
JSONObject
passbook_payload
=
new
JSONObject
(
responseStr
);
System
.
out
.
println
(
passbook_payload
.
toString
(
4
));
}
private
JSONObject
get_entityById
(
String
competency_id
)
throws
JsonProcessingException
{
String
url
=
env
.
getProperty
(
"get.entityById"
)
+
competency_id
;
System
.
out
.
println
(
"get_entityById fun "
);
HttpHeaders
header
=
new
HttpHeaders
();
header
.
setAccept
(
Arrays
.
asList
(
MediaType
.
APPLICATION_JSON
));
// header.add("x-authenticated-user-token", token);
String
req
=
"{\"filter\":{\"isDetail\":true}}"
;
Map
<
String
,
Object
>
mapping
=
new
ObjectMapper
().
readValue
(
req
,
HashMap
.
class
);
HttpEntity
<
Map
<
String
,
Object
>>
entity
=
new
HttpEntity
<>(
mapping
,
header
);
ResponseEntity
<
String
>
entityResponse
=
restTemplate
.
exchange
(
url
,
HttpMethod
.
POST
,
entity
,
String
.
class
);
// System.out.println(entityResponse);
String
responseStr
=
entityResponse
.
getBody
();
int
begin
=
responseStr
.
indexOf
(
"{"
);
int
end
=
responseStr
.
lastIndexOf
(
"}"
)
+
1
;
responseStr
=
responseStr
.
substring
(
begin
,
end
);
// System.out.println(responseStr);
JSONObject
passbook_payload
=
new
JSONObject
(
responseStr
);
JSONObject
passbook_result
=
passbook_payload
.
getJSONObject
(
"result"
);
return
(
JSONObject
)
passbook_result
.
get
(
"response"
);
}
// private void GetPassbook(JSONArray userId) throws JsonProcessingException {
//
// String url = "https://sphere.aastrika.org/api/user/v1/passbook";
// String Api_key = "bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJTNHNNVFdjZUZqYkxUWGxiczkzUzk4dmFtODBhdkRPUiJ9.nPOCY0-bVX28iNcxxnYbGpihY3ZzfNwx0-SFCnJwjas";
// for (int i=0; i<userId.length();i++){
// System.out.println("Passbook Update loop");
// System.out.println(userId.get(i));
// HttpHeaders header = new HttpHeaders();
// header.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
// header.add("Authorization", Api_key);
// header.add("x-authenticated-userid", (String) userId.get(i));
// String req = "{\"request\":{\"typeName\":\"competency\"}}";
// Map<String, Object> mapping = new ObjectMapper().readValue(req, HashMap.class);
//
// System.out.println(mapping);
//
// HttpEntity<Map<String, Object>> entity = new HttpEntity<>(mapping, header);
// ResponseEntity<String> entityResponse = restTemplate.exchange(url , HttpMethod.POST, entity, String.class);
// System.out.println(entityResponse);
// String responseStr = entityResponse.getBody();
// int begin = responseStr.indexOf("{");
// int end = responseStr.lastIndexOf("}") + 1;
// responseStr = responseStr.substring(begin, end);
// System.out.println(responseStr);
// JSONObject passbook_payload = new JSONObject(responseStr);
// JSONObject passbook_result=passbook_payload.getJSONObject("result");
// System.out.println(passbook_result);
//
// }
}
This diff is collapsed.
Click to expand it.
src/main/java/com/sphere/compentency/kafka/consumer/api/kafkaConsumer.java
+
59
−
42
View file @
029c0442
package
com.sphere.compentency.kafka.consumer.api
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.sphere.compentency.common.utils.propertiesCache
;
import
org.apache.kafka.clients.consumer.ConsumerConfig
;
import
org.apache.kafka.clients.consumer.ConsumerRecord
;
import
org.apache.kafka.clients.consumer.ConsumerRecords
;
import
org.apache.kafka.clients.consumer.KafkaConsumer
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
org.apache.kafka.clients.producer.Producer
;
import
org.apache.kafka.clients.producer.ProducerConfig
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.common.serialization.StringDeserializer
;
import
org.apache.kafka.common.serialization.StringSerializer
;
import
org.json.JSONArray
;
import
org.json.JSONObject
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
org.springframework.context.annotation.ComponentScan
;
import
java.io.IOException
;
import
java.time.Duration
;
import
java.util.Arrays
;
import
java.util.Properties
;
@SpringBootApplication
public
class
kafkaConsumer
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
kafkaConsumer
.
class
,
args
);
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
kafkaConsumer
.
class
);
private
static
ApiService
api_services
=
new
ApiService
();
public
static
void
startKafkaConsumer
()
{
propertiesCache
env
=
new
propertiesCache
();
propertiesCache
env
=
new
propertiesCache
();
Api_services
api_services
=
new
Api_services
();
Logger
logger
=
LoggerFactory
.
getLogger
(
kafkaConsumer
.
class
.
getName
());
String
bootstrapServers
=
"localhost:9092"
;
String
groupId
=
"dev-activity-aggregate-updater-group"
;
String
topic
=
"dev.issue.certificate.request"
;
System
.
out
.
println
(
"Bootstrap Servers: "
+
bootstrapServers
);
String
bootstrapServers
=
env
.
getProperty
(
"kafka.bootstrapServers"
);
String
grp_id
=
env
.
getProperty
(
"kafka.groupID"
);
String
topic
=
env
.
getProperty
(
"kafka.topic"
);;
System
.
out
.
println
(
"topic --- > "
+
topic
);
System
.
out
.
println
(
"bootstrapServers --- > "
+
bootstrapServers
);
System
.
out
.
println
(
"grp_id --- > "
+
grp_id
);
// Creating consumer properties
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
properties
.
setProperty
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
.
getName
());
properties
.
setProperty
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
.
getName
());
properties
.
setProperty
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
groupId
);
properties
.
setProperty
(
ConsumerConfig
.
AUTO_OFFSET_RESET_CONFIG
,
"latest"
);
//Creating consumer properties
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServers
);
properties
.
setProperty
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
.
getName
());
properties
.
setProperty
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
.
getName
());
properties
.
setProperty
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
grp_id
);
properties
.
setProperty
(
ConsumerConfig
.
AUTO_OFFSET_RESET_CONFIG
,
"latest"
);
//creating consumer
KafkaConsumer
<
String
,
String
>
consumer
=
new
KafkaConsumer
<
String
,
String
>(
properties
);
//Subscribing
// Creating consumer
KafkaConsumer
<
String
,
String
>
consumer
=
new
KafkaConsumer
<>(
properties
);
// Subscribing to the topic
consumer
.
subscribe
(
Arrays
.
asList
(
topic
));
System
.
out
.
println
(
"kafka consumer subscribe & running"
);
//polling
while
(
true
){
ConsumerRecords
<
String
,
String
>
records
=
consumer
.
poll
(
Duration
.
ofMillis
(
100
));
for
(
ConsumerRecord
<
String
,
String
>
record:
records
)
{
// logger.info("Key: " + record.key() + ", Value:" + record.value());
String
msg
=
record
.
value
();
if
(
msg
.
length
()
!=
0
&
msg
!=
null
&&
!
msg
.
isEmpty
()
&&
!
msg
.
trim
().
isEmpty
())
{
JSONObject
json
=
new
JSONObject
(
record
.
value
());
JSONObject
edata
=
json
.
getJSONObject
(
"edata"
);
JSONArray
userId
=
(
JSONArray
)
edata
.
get
(
"userIds"
);
String
courseId
=
(
String
)
edata
.
get
(
"courseId"
);
System
.
out
.
println
(
edata
.
get
(
"userIds"
));
System
.
out
.
println
(
edata
.
get
(
"courseId"
));
api_services
.
get_hierarchy
(
courseId
,
userId
);
}
else
{
System
.
out
.
println
(
"null check condition : "
+
record
.
value
());
// logger.info("Partition:" + record.partition()+",Offset:"+record.offset());
}
logger
.
info
(
"Kafka consumer subscribed and running"
);
// Polling for messages
try
{
while
(
true
)
{
ConsumerRecords
<
String
,
String
>
records
=
consumer
.
poll
(
Duration
.
ofMillis
(
100
));
for
(
ConsumerRecord
<
String
,
String
>
record
:
records
)
{
processKafkaMessage
(
record
);
}
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"Error in Kafka consumer: {}"
,
e
.
getMessage
(),
e
);
}
finally
{
// Close the consumer when done
consumer
.
close
();
}
}
private
static
void
processKafkaMessage
(
ConsumerRecord
<
String
,
String
>
record
)
throws
IOException
,
InterruptedException
{
logger
.
info
(
"Inside processKafkaMessage with record "
+
record
);
String
msg
=
record
.
value
();
if
(
msg
!=
null
&&
!
msg
.
isEmpty
()
&&
!
msg
.
trim
().
isEmpty
())
{
JSONObject
json
=
new
JSONObject
(
record
.
value
());
JSONObject
edata
=
json
.
getJSONObject
(
"edata"
);
String
userIds
=
edata
.
getString
(
"userIds"
);
// Now, you can pass userIds to your method
JSONObject
relatedObject
=
json
.
getJSONObject
(
"edata"
).
getJSONObject
(
"related"
);
String
courseId
=
relatedObject
.
getString
(
"courseId"
);
//do_1139628834519941121286,do_11394806141846323211
logger
.
info
(
"Processing Kafka message - userId: {}, courseId: {}"
,
userIds
,
courseId
);
api_services
.
get_hierarchy
(
courseId
,
userIds
);
}
else
{
logger
.
warn
(
"Received empty or null message from Kafka"
);
}
}
}
This diff is collapsed.
Click to expand it.
src/main/resources/application.properties
+
10
−
9
View file @
029c0442
...
...
@@ -3,7 +3,7 @@ spring.datasource.username= postgres
spring.datasource.password
=
postgres
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation
=
true
spring.jpa.properties.hibernate.dialect
=
org.hibernate.dialect.PostgreSQLDialect
spring.jpa.properties.hibernate.dialect
=
org.hibernate.dialect.PostgreSQLDialect
# Hibernate ddl auto (create, create-drop, validate, update)
spring.jpa.hibernate.ddl-auto
=
update
...
...
@@ -14,19 +14,20 @@ spring.datasource.initialization-mode=always
user.read.api
=
https://uphrh.in/learner/user/v5/read
content.read.api
=
https://uphrh.in/content/v2/read
kafka.bootstrapServers
=
localhost:9092
kafka.topic
=
dev.issue.certificate.request
kafka.groupID
=
dev-activity-aggregate-updater-group
kafka.bootstrapServers
=
localhost:9092
kafka.topic
=
dev.issue.certificate.request
kafka.groupID
=
dev-activity-aggregate-updater-group
get.hierarchy
=
https://uphrh.in/course/v1/hierarchy
get.hierarchy
=
https://uphrh.in/api/course/v1/hierarchy/
Api.key
=
bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJSR3RkMkZzeG1EMnJER3I4dkJHZ0N6MVhyalhZUzBSSyJ9.kMLn6177rvY53i0RAN3SPD5m3ctwaLb32pMYQ65nBdA
get.entityById
=
http://localhost:8087/getEntityById/
passbook.update.url
=
https://uphrh.in/api/user/v1/passbook
get.entityById
=
http://localhost:8087/getEntityById/
passbook.update.url
=
http://lms-service:9000/private/v1/passbook/update
host.url
=
https://uphrh.in/
framework.read
=
https://uphrh.in/api/framework/v1/read/nirayama_frccl_fw?categories=difficultyLevel,subject
server.port
=
8087
#
logging.level.org.springframework=
OFF
logging.level.org.springframework
=
DEBUG
#logging.level.root=OFF
#
logging.level.org.springframework.web
=
ERROR
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment
Menu
Explore
Projects
Groups
Topics
Snippets