Unverified Commit 23a686d1 authored by AMIT KUMAR's avatar AMIT KUMAR Committed by GitHub
Browse files

Issue #SC-2161 feat:removed quartz scheduler and shadow user related code (#908)

Showing with 1 addition and 1791 deletions
+1 -1791
package controllers.scheduler;
import controllers.BaseController;
import java.util.concurrent.CompletionStage;
import org.sunbird.operations.ActorOperations;
import play.mvc.Http;
import play.mvc.Result;
public class SchedulerController extends BaseController {
public CompletionStage<Result> startScheduler(Http.Request httpRequest) {
return handleRequest(
ActorOperations.ONDEMAND_START_SCHEDULER.getValue(),
httpRequest.body().asJson(),
null,
null,
null,
true,
httpRequest);
}
}
......@@ -27,9 +27,7 @@ public class ApplicationStart {
// initialize HttpClientUtil class
HttpClientUtil.getInstance();
applicationLifecycle.addStopHook(
() -> {
return CompletableFuture.completedFuture(null);
});
() -> CompletableFuture.completedFuture(null));
KeyManager.init();
}
......@@ -46,8 +44,5 @@ public class ApplicationStart {
private static void checkCassandraConnections() {
Util.checkCassandraDbConnections();
SchedulerManager.schedule();
// Run quartz scheduler in a separate thread as it waits for 4 minutes
// before scheduling various jobs.
new Thread(() -> org.sunbird.quartzscheduler.SchedulerManager.getInstance()).start();
}
}
package controllers.scheduler;
import static org.junit.Assert.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import controllers.BaseApplicationTest;
import controllers.DummyActor;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import modules.OnRequestHandler;
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.sunbird.exception.ResponseCode;
import org.sunbird.keys.JsonKey;
import org.sunbird.request.HeaderParam;
import play.libs.Json;
import play.mvc.Http;
import play.mvc.Result;
import play.test.Helpers;
@Ignore
@PrepareForTest(OnRequestHandler.class)
@PowerMockIgnore({"javax.management.*", "jdk.internal.reflect.*", "javax.crypto.*"})
public class SchedulerControllerTest extends BaseApplicationTest {
public static Map<String, List<String>> headerMap;
@Before
public void before() {
setup(DummyActor.class);
headerMap = new HashMap<>();
headerMap.put(HeaderParam.X_Consumer_ID.getName(), Arrays.asList("Some consumer ID"));
headerMap.put(HeaderParam.X_Device_ID.getName(), Arrays.asList("Some device ID"));
headerMap.put(
HeaderParam.X_Authenticated_Userid.getName(), Arrays.asList("Some authenticated user ID"));
headerMap.put(JsonKey.MESSAGE_ID, Arrays.asList("Some message ID"));
headerMap.put(HeaderParam.X_APP_ID.getName(), Arrays.asList("Some app Id"));
}
@Test
public void testStartScheduler() {
System.out.println("applied");
Result result =
performTest("/private/user/v1/scheduler", "POST", (Map) createOnDemandSchedulerRequest());
assertEquals(getResponseCode(result), ResponseCode.success.getErrorCode().toLowerCase());
}
public Result performTest(String url, String method, Map map) {
String data = mapToJson(map);
Http.RequestBuilder req;
if (StringUtils.isNotBlank(data)) {
JsonNode json = Json.parse(data);
req = new Http.RequestBuilder().bodyJson(json).uri(url).method(method);
} else {
req = new Http.RequestBuilder().uri(url).method(method);
}
Result result = Helpers.route(application, req);
return result;
}
public String mapToJson(Map map) {
ObjectMapper mapperObj = new ObjectMapper();
String jsonResp = "";
if (map != null) {
try {
jsonResp = mapperObj.writeValueAsString(map);
} catch (IOException e) {
e.printStackTrace();
}
}
return jsonResp;
}
private Object createOnDemandSchedulerRequest() {
Map<String, Object> requestMap = new HashMap<>();
Map<String, Object> innerMap = new HashMap<>();
innerMap.put(JsonKey.TYPE, "shadowuser");
requestMap.put(JsonKey.REQUEST, innerMap);
return requestMap;
}
}
......@@ -35,7 +35,6 @@ public enum ActorOperations {
UPDATE_USER_ROLES_ES("updateUserRoles"),
SYNC("sync"),
BACKGROUND_SYNC("backgroundSync"),
SCHEDULE_BULK_UPLOAD("scheduleBulkUpload"),
EMAIL_SERVICE("emailService"),
FILE_STORAGE_SERVICE("fileStorageService"),
HEALTH_CHECK("healthCheck"),
......@@ -84,7 +83,6 @@ public enum ActorOperations {
CREATE_SSU_USER("createSSUUser"),
CREATE_USER_V4("createUserV4"),
CREATE_MANAGED_USER("createManagedUser"),
ONDEMAND_START_SCHEDULER("onDemandStartScheduler"),
V2_NOTIFICATION("v2Notification"),
GET_MANAGED_USERS("getManagedUsers"),
CHECK_USER_EXISTENCEV2("checkUserExistenceV2"),
......
......@@ -39,7 +39,6 @@ public class ActorOperationTest {
Assert.assertEquals("updateUserOrgES", ActorOperations.UPDATE_USER_ORG_ES.getValue());
Assert.assertEquals("updateUserRoles", ActorOperations.UPDATE_USER_ROLES_ES.getValue());
Assert.assertEquals("sync", ActorOperations.SYNC.getValue());
Assert.assertEquals("scheduleBulkUpload", ActorOperations.SCHEDULE_BULK_UPLOAD.getValue());
Assert.assertEquals("fileStorageService", ActorOperations.FILE_STORAGE_SERVICE.getValue());
Assert.assertEquals("healthCheck", ActorOperations.HEALTH_CHECK.getValue());
Assert.assertEquals("sendMail", ActorOperations.SEND_MAIL.getValue());
......
......@@ -73,18 +73,6 @@
<version>4.1</version>
</dependency>
<!-- adding dependency for quartz scheduler job -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<!-- this postgres driver is used for making connection to postgres by quartz scheduler-->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.1.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......
......@@ -2,6 +2,5 @@ package org.sunbird.actor;
/** @author Mahesh Kumar Gangula */
public enum BackgroundOperations {
registerChannel,
emailService;
}
package org.sunbird.actor;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.sunbird.actor.core.BaseActor;
import org.sunbird.actor.router.ActorConfig;
import org.sunbird.cassandra.CassandraOperation;
import org.sunbird.common.ElasticSearchHelper;
import org.sunbird.common.factory.EsClientFactory;
import org.sunbird.common.inf.ElasticSearchService;
import org.sunbird.dto.SearchDTO;
import org.sunbird.helper.ServiceFactory;
import org.sunbird.http.HttpClientUtil;
import org.sunbird.keys.JsonKey;
import org.sunbird.util.Util;
import org.sunbird.request.Request;
import org.sunbird.request.RequestContext;
import org.sunbird.response.Response;
import org.sunbird.util.ProjectUtil;
import org.sunbird.util.PropertiesCache;
import scala.concurrent.Future;
/** @author Amit Kumar */
@ActorConfig(
tasks = {},
asyncTasks = {"registerChannel"}
)
public class ChannelRegistrationActor extends BaseActor {
private CassandraOperation cassandraOperation = ServiceFactory.getInstance();
private ElasticSearchService esService = EsClientFactory.getInstance(JsonKey.REST);
@Override
public void onReceive(Request request) throws Throwable {
if (request.getOperation().equalsIgnoreCase(BackgroundOperations.registerChannel.name())) {
registerChannel(request);
} else {
onReceiveUnsupportedOperation(request.getOperation());
}
}
private void registerChannel(Request request) {
List<String> ekstepChannelList = getEkstepChannelList(request.getRequestContext());
List<Map<String, Object>> sunbirdChannelList = null;
if (null != ekstepChannelList) {
logger.info(
request.getRequestContext(),
"channel list size from ekstep : " + ekstepChannelList.size());
sunbirdChannelList = getSunbirdChannelList(request.getRequestContext());
logger.info(
request.getRequestContext(),
"channel list size from sunbird : " + sunbirdChannelList.size());
if (!ekstepChannelList.isEmpty()) {
processChannelReg(ekstepChannelList, sunbirdChannelList, request.getRequestContext());
}
}
}
private void processChannelReg(
List<String> ekstepChannelList,
List<Map<String, Object>> sunbirdChannelList,
RequestContext context) {
Boolean bool = true;
for (Map<String, Object> map : sunbirdChannelList) {
logger.info(context, "processing start for hashTagId " + map.get(JsonKey.ID));
if (!StringUtils.isBlank((String) map.get(JsonKey.ID))
&& (!ekstepChannelList.contains(map.get(JsonKey.ID)))
&& (!Util.registerChannel(map, context))) {
bool = false;
}
}
if (bool) {
updateSystemSettingTable(bool, context);
}
}
private void updateSystemSettingTable(Boolean bool, RequestContext context) {
Map<String, Object> map = new HashMap<>();
map.put(JsonKey.ID, JsonKey.CHANNEL_REG_STATUS_ID);
map.put(JsonKey.FIELD, JsonKey.CHANNEL_REG_STATUS);
map.put(JsonKey.VALUE, String.valueOf(bool));
Response response =
cassandraOperation.upsertRecord("sunbird", JsonKey.SYSTEM_SETTINGS_DB, map, context);
logger.info(
context,
"Upsert operation result for channel reg status = "
+ response.getResult().get(JsonKey.RESPONSE));
}
private List<Map<String, Object>> getSunbirdChannelList(RequestContext context) {
logger.info(context, "start call for getting List of channel from sunbird ES");
SearchDTO searchDto = new SearchDTO();
List<String> list = new ArrayList<>();
list.add(JsonKey.ID);
list.add(JsonKey.DESCRIPTION);
list.add(JsonKey.CHANNEL);
searchDto.setFields(list);
Map<String, Object> filter = new HashMap<>();
filter.put(JsonKey.IS_TENANT, true);
searchDto.getAdditionalProperties().put(JsonKey.FILTERS, filter);
Future<Map<String, Object>> esResponseF =
esService.search(searchDto, ProjectUtil.EsType.organisation.getTypeName(), context);
Map<String, Object> esResponse =
(Map<String, Object>) ElasticSearchHelper.getResponseFromFuture(esResponseF);
List<Map<String, Object>> orgList = (List<Map<String, Object>>) esResponse.get(JsonKey.CONTENT);
logger.info(context, "End call for getting List of channel from sunbird ES");
return orgList;
}
private List<String> getEkstepChannelList(RequestContext context) {
List<String> channelList = new ArrayList<>();
Map<String, String> headerMap = new HashMap<>();
String header = System.getenv(JsonKey.EKSTEP_AUTHORIZATION);
if (StringUtils.isBlank(header)) {
header = PropertiesCache.getInstance().getProperty(JsonKey.EKSTEP_AUTHORIZATION);
} else {
header = JsonKey.BEARER + header;
}
headerMap.put(JsonKey.AUTHORIZATION, header);
headerMap.put("Content-Type", "application/json");
headerMap.put("user-id", "");
String reqString = "";
String response = "";
JSONObject data;
JSONObject jObject;
Object[] result = null;
try {
logger.info(context, "start call for getting List of channel from Ekstep");
String ekStepBaseUrl = System.getenv(JsonKey.EKSTEP_BASE_URL);
if (StringUtils.isBlank(ekStepBaseUrl)) {
ekStepBaseUrl = PropertiesCache.getInstance().getProperty(JsonKey.EKSTEP_BASE_URL);
}
Map<String, Object> map = new HashMap<>();
Map<String, Object> reqMap = new HashMap<>();
map.put(JsonKey.REQUEST, reqMap);
ObjectMapper mapper = new ObjectMapper();
reqString = mapper.writeValueAsString(map);
response =
HttpClientUtil.post(
(ekStepBaseUrl
+ PropertiesCache.getInstance().getProperty(JsonKey.EKSTEP_GET_CHANNEL_LIST)),
reqString,
headerMap);
jObject = new JSONObject(response);
data = jObject.getJSONObject(JsonKey.RESULT);
logger.info(
context,
"Total number of content fetched from Ekstep while getting List of channel : "
+ data.get("count"));
JSONArray contentArray = data.getJSONArray(JsonKey.CHANNELS);
result = mapper.readValue(contentArray.toString(), Object[].class);
for (Object object : result) {
Map<String, Object> tempMap = (Map<String, Object>) object;
channelList.add((String) tempMap.get(JsonKey.CODE));
}
logger.info(context, "end call for getting List of channel from Ekstep");
} catch (Exception e) {
logger.error(context, e.getMessage(), e);
channelList = null;
}
return channelList;
}
}
package org.sunbird.actor.bulkupload;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.sunbird.actor.core.BaseActor;
import org.sunbird.actor.router.ActorConfig;
import org.sunbird.client.org.OrganisationClient;
import org.sunbird.client.org.impl.OrganisationClientImpl;
import org.sunbird.exception.ResponseCode;
import org.sunbird.keys.JsonKey;
import org.sunbird.model.SelfDeclaredUser;
import org.sunbird.model.bulkupload.BulkMigrationUser;
import org.sunbird.model.bulkupload.SelfDeclaredErrorTypeEnum;
import org.sunbird.model.bulkupload.SelfDeclaredStatusEnum;
import org.sunbird.model.user.UserDeclareEntity;
import org.sunbird.models.organisation.Organisation;
import org.sunbird.operations.ActorOperations;
import org.sunbird.operations.BulkUploadActorOperation;
import org.sunbird.request.Request;
import org.sunbird.request.RequestContext;
import org.sunbird.response.Response;
import org.sunbird.util.ProjectUtil;
import org.sunbird.util.bulkupload.UserUploadUtil;
@ActorConfig(
tasks = {},
asyncTasks = {"processUserBulkSelfDeclared"}
)
public class DeclaredExternalIdActor extends BaseActor {
@Override
public void onReceive(Request request) throws Throwable {
String operation = request.getOperation();
if (operation.equalsIgnoreCase(
BulkUploadActorOperation.PROCESS_USER_BULK_SELF_DECLARED.getValue())) {
processSelfDeclaredExternalId(request);
} else {
onReceiveUnsupportedOperation("userBulkMigration");
}
}
private void processSelfDeclaredExternalId(Request request) {
Response response = new Response();
response.setResponseCode(ResponseCode.OK);
Map requestMap = request.getRequest();
String processId = (String) requestMap.get(JsonKey.PROCESS_ID);
String rootOrgId = (String) requestMap.get(JsonKey.ROOT_ORG_ID);
Map<String, Object> row =
UserUploadUtil.getFullRecordFromProcessId(processId, request.getRequestContext());
BulkMigrationUser bulkMigrationUser =
UserUploadUtil.convertRowToObject(row, request.getRequestContext());
List<SelfDeclaredUser> userList =
UserUploadUtil.getMigrationUserAsList(bulkMigrationUser, request.getRequestContext());
userList
.parallelStream()
.forEach(
migrateUser -> {
// add entry in usr_external_id
// modify status to validated to user_declarations
// call to migrate api
migrateUser.setOrgId(rootOrgId);
if (migrateUser.getPersona().equals(JsonKey.TEACHER_PERSONA)) {
switch (migrateUser.getInputStatus()) {
case JsonKey.VALIDATED:
migrateDeclaredUser(request, migrateUser);
break;
case JsonKey.REJECTED:
rejectDeclaredDetail(request, migrateUser);
break;
case JsonKey.SELF_DECLARED_ERROR:
updateErrorDetail(request, migrateUser);
break;
default:
}
}
});
UserUploadUtil.updateStatusInUserBulkTable(
bulkMigrationUser.getId(),
ProjectUtil.BulkProcessStatus.COMPLETED.getValue(),
request.getRequestContext());
logger.info(
request.getRequestContext(),
"DeclaredExternalIdActor:processSelfDeclaredExternalId: processing the DeclaredUser of processId: "
+ bulkMigrationUser.getId()
+ "is completed");
sender().tell(response, self());
}
private void updateErrorDetail(Request request, SelfDeclaredUser declaredUser) {
Request req = new Request();
try {
req.setRequestContext(request.getRequestContext());
req.setOperation("updateUserSelfDeclarationsErrorType");
Map<String, Object> requestMap = new HashMap();
UserDeclareEntity userDeclareEntity = new UserDeclareEntity();
userDeclareEntity.setOrgId(declaredUser.getOrgId());
userDeclareEntity.setPersona(declaredUser.getPersona());
userDeclareEntity.setUserId(declaredUser.getUserId());
userDeclareEntity.setErrorType(declaredUser.getErrorType());
userDeclareEntity.setStatus(declaredUser.getInputStatus());
requestMap.put(JsonKey.DECLARATIONS, userDeclareEntity);
req.setRequest(requestMap);
tellToAnother(req);
} catch (Exception e) {
logger.error(
req.getRequestContext(),
"DeclaredExternalIdActor:updateErrorDetail:Exception in processing the DeclaredUser: "
+ e.getCause()
+ declaredUser.getUserId(),
e);
}
}
private void rejectDeclaredDetail(Request request, SelfDeclaredUser declaredUser) {
Request req = new Request();
try {
req.setRequestContext(request.getRequestContext());
req.setOperation("upsertUserSelfDeclarations");
Map<String, Object> requestMap = new HashMap();
UserDeclareEntity userDeclareEntity = new UserDeclareEntity();
userDeclareEntity.setOrgId(declaredUser.getOrgId());
userDeclareEntity.setPersona(declaredUser.getPersona());
userDeclareEntity.setUserId(declaredUser.getUserId());
userDeclareEntity.setOperation(JsonKey.REMOVE);
List userDeclareEntityLst = new ArrayList<UserDeclareEntity>();
userDeclareEntityLst.add(userDeclareEntity);
requestMap.put(JsonKey.DECLARATIONS, userDeclareEntityLst);
req.setRequest(requestMap);
tellToAnother(req);
} catch (Exception e) {
logger.error(
req.getRequestContext(),
"DeclaredExternalIdActor:rejectDeclaredDetail:Exception in processing the DeclaredUser: "
+ e.getCause()
+ declaredUser.getUserId(),
e);
}
}
private void migrateDeclaredUser(Request request, SelfDeclaredUser declaredUser) {
Request req = new Request();
try {
req.setRequestContext(request.getRequestContext());
req.setOperation(ActorOperations.USER_SELF_DECLARED_TENANT_MIGRATE.getValue());
if (StringUtils.isNotEmpty(declaredUser.getSubOrgExternalId())) {
Organisation org =
getOrgDetails(
declaredUser.getSubOrgExternalId(),
declaredUser.getChannel(),
req.getRequestContext());
if (org == null || (org != null && !org.getRootOrgId().equals(declaredUser.getOrgId()))) {
declaredUser.setErrorType(
SelfDeclaredErrorTypeEnum.ERROR_STATE.getErrorType().replace("_", "-"));
declaredUser.setInputStatus(SelfDeclaredStatusEnum.ERROR.name());
updateErrorDetail(req, declaredUser);
return;
}
}
Map<String, Object> requestMap = new HashMap();
Map<String, String> externalIdMap = new HashMap();
List<Map<String, String>> externalIdLst = new ArrayList();
requestMap.put(JsonKey.USER_ID, declaredUser.getUserId());
requestMap.put(JsonKey.CHANNEL, declaredUser.getChannel());
requestMap.put(JsonKey.ORG_EXTERNAL_ID, declaredUser.getSubOrgExternalId());
externalIdMap.put(JsonKey.ID, declaredUser.getUserExternalId());
externalIdMap.put(JsonKey.ID_TYPE, declaredUser.getChannel());
externalIdMap.put(JsonKey.PROVIDER, declaredUser.getChannel());
externalIdLst.add(externalIdMap);
requestMap.put(JsonKey.EXTERNAL_IDS, externalIdLst);
req.setRequest(requestMap);
tellToAnother(req);
} catch (Exception e) {
logger.error(
req.getRequestContext(),
"DeclaredExternalIdActor:migrateDeclaredUser:Exception in processing the DeclaredUser: "
+ e.getCause()
+ declaredUser.getUserId(),
e);
}
}
private Organisation getOrgDetails(
String orgExternalId, String provider, RequestContext context) {
OrganisationClient organisationClient = OrganisationClientImpl.getInstance();
return organisationClient.esGetOrgByExternalId(orgExternalId, provider, context);
}
}
package org.sunbird.actor.scheduler;
import static org.sunbird.validator.orgvalidator.BaseOrgRequestValidator.ERROR_CODE;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.sunbird.actor.core.BaseActor;
import org.sunbird.actor.router.ActorConfig;
import org.sunbird.exception.ProjectCommonException;
import org.sunbird.exception.ResponseCode;
import org.sunbird.keys.JsonKey;
import org.sunbird.operations.ActorOperations;
import org.sunbird.quartzscheduler.OnDemandSchedulerManager;
import org.sunbird.request.Request;
import org.sunbird.response.Response;
/** @author Amit Kumar */
@ActorConfig(
tasks = {"onDemandStartScheduler"},
asyncTasks = {}
)
public class OnDemandSchedulerActor extends BaseActor {
private static final String TYPE = "type";
@Override
public void onReceive(Request actorMessage) throws Throwable {
if (actorMessage
.getOperation()
.equalsIgnoreCase(ActorOperations.ONDEMAND_START_SCHEDULER.getValue())) {
startSchedular(actorMessage);
} else {
onReceiveUnsupportedOperation(actorMessage.getOperation());
}
}
private void startSchedular(Request actorMessage) {
Map<String, Object> req = actorMessage.getRequest();
ArrayList<String> jobTypes = (ArrayList<String>) req.get(TYPE);
if (jobTypes.size() > 0) {
String[] jobs = jobTypes.stream().toArray(String[]::new);
validateJobs(jobs);
scheduleJob(jobs);
} else {
throw new ProjectCommonException(
ResponseCode.mandatoryParamsMissing.getErrorCode(),
ResponseCode.mandatoryParamsMissing.getErrorMessage(),
ERROR_CODE,
TYPE);
}
}
private void validateJobs(String[] jobs) {
List<String> jobsAllowed = new ArrayList<String>();
jobsAllowed.add("bulkupload");
jobsAllowed.add("shadowuser");
for (String job : jobs) {
if (!jobsAllowed.contains(job)) {
throw new ProjectCommonException(
ResponseCode.invalidParameter.getErrorCode(),
ResponseCode.invalidParameter.getErrorMessage(),
ERROR_CODE,
TYPE);
}
}
}
private void scheduleJob(String[] jobs) {
Response response = new Response();
OnDemandSchedulerManager onDemandSchedulerManager = OnDemandSchedulerManager.getInstance();
new Thread(() -> onDemandSchedulerManager.triggerScheduler(jobs)).start();
Map result = new HashMap<String, Object>();
result.put(JsonKey.STATUS, JsonKey.SUCCESS);
response.put(JsonKey.RESULT, result);
sender().tell(response, self());
}
}
package org.sunbird.actor.scheduler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.sunbird.actor.core.BaseActor;
import org.sunbird.actor.router.ActorConfig;
import org.sunbird.cassandra.CassandraOperation;
import org.sunbird.helper.ServiceFactory;
import org.sunbird.keys.JsonKey;
import org.sunbird.operations.ActorOperations;
import org.sunbird.operations.BulkUploadActorOperation;
import org.sunbird.request.Request;
import org.sunbird.util.ProjectUtil;
import org.sunbird.util.UserUtility;
import org.sunbird.util.Util;
/** @author Amit Kumar */
@ActorConfig(
tasks = {},
asyncTasks = {"scheduleBulkUpload"}
)
public class SchedularActor extends BaseActor {
@Override
public void onReceive(Request actorMessage) throws Throwable {
if (actorMessage
.getOperation()
.equalsIgnoreCase(ActorOperations.SCHEDULE_BULK_UPLOAD.getValue())) {
schedule(actorMessage);
} else {
onReceiveUnsupportedOperation("SchedularActor");
}
}
@SuppressWarnings("unchecked")
private void schedule(Request request) {
List<Map<String, Object>> result = (List<Map<String, Object>>) request.get(JsonKey.DATA);
Util.DbInfo bulkDb = Util.dbInfoMap.get(JsonKey.BULK_OP_DB);
CassandraOperation cassandraOperation = ServiceFactory.getInstance();
for (Map<String, Object> map : result) {
int retryCount = 0;
if (null != map.get(JsonKey.RETRY_COUNT)) {
retryCount = (int) map.get(JsonKey.RETRY_COUNT);
}
if (retryCount > 2) {
String data = (String) map.get(JsonKey.DATA);
try {
Map<String, Object> bulkMap = new HashMap<>();
bulkMap.put(JsonKey.DATA, UserUtility.encryptData(data));
bulkMap.put(JsonKey.PROCESS_ID, map.get(JsonKey.ID));
bulkMap.put(JsonKey.STATUS, ProjectUtil.BulkProcessStatus.FAILED.getValue());
cassandraOperation.updateRecord(
bulkDb.getKeySpace(), bulkDb.getTableName(), bulkMap, request.getRequestContext());
} catch (Exception e) {
logger.error(
request.getRequestContext(),
"Exception occurred while encrypting data while running scheduler for bulk upload process : ",
e);
}
} else {
Map<String, Object> bulkMap = new HashMap<>();
bulkMap.put(JsonKey.RETRY_COUNT, retryCount + 1);
bulkMap.put(JsonKey.ID, map.get(JsonKey.ID));
bulkMap.put(JsonKey.STATUS, ProjectUtil.BulkProcessStatus.IN_PROGRESS.getValue());
cassandraOperation.updateRecord(
bulkDb.getKeySpace(), bulkDb.getTableName(), bulkMap, request.getRequestContext());
Request req = new Request();
req.put(JsonKey.PROCESS_ID, map.get(JsonKey.ID));
req.setRequestContext(request.getRequestContext());
logger.info(
request.getRequestContext(),
"SchedularActor: scheduleBulkUpload called with processId "
+ map.get(JsonKey.ID)
+ " and type "
+ map.get(JsonKey.OBJECT_TYPE));
if (JsonKey.LOCATION.equalsIgnoreCase((String) map.get(JsonKey.OBJECT_TYPE))) {
req.setOperation(BulkUploadActorOperation.LOCATION_BULK_UPLOAD_BACKGROUND_JOB.getValue());
} else if (JsonKey.ORGANISATION.equalsIgnoreCase((String) map.get(JsonKey.OBJECT_TYPE))) {
req.setOperation(BulkUploadActorOperation.ORG_BULK_UPLOAD_BACKGROUND_JOB.getValue());
} else if (JsonKey.USER.equals((String) map.get(JsonKey.OBJECT_TYPE))) {
req.setOperation(BulkUploadActorOperation.USER_BULK_UPLOAD_BACKGROUND_JOB.getValue());
} else {
req.setOperation(ActorOperations.PROCESS_BULK_UPLOAD.getValue());
}
tellToAnother(req);
}
}
}
}
package org.sunbird.actor.user.validator;
import java.util.HashSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.sunbird.model.MigrationUser;
import org.sunbird.model.SelfDeclaredUser;
import org.sunbird.model.ShadowUserUpload;
import org.sunbird.error.CsvError;
import org.sunbird.error.CsvRowErrorDetails;
import org.sunbird.error.ErrorEnum;
import org.sunbird.error.IErrorDispatcher;
import org.sunbird.error.factory.ErrorDispatcherFactory;
import org.sunbird.exception.ProjectCommonException;
import org.sunbird.exception.ResponseCode;
import org.sunbird.keys.JsonKey;
import org.sunbird.model.bulkupload.SelfDeclaredErrorTypeEnum;
import org.sunbird.model.bulkupload.SelfDeclaredStatusEnum;
import org.sunbird.util.ProjectUtil;
/**
* this class will validate the csv file for shadow db
*
* @author anmolgupta
*/
public class UserBulkMigrationRequestValidator {
private ShadowUserUpload shadowUserMigration;
private HashSet<String> userExternalIdsSet = new HashSet<>();
private CsvError csvRowsErrors = new CsvError();
private static final int MAX_ROW_SUPPORTED = 20000;
private static final String NAME_REG_MATCHER = "^[^.][^^;\\-()<>|!=’%_#$]+";
private UserBulkMigrationRequestValidator(ShadowUserUpload migration) {
this.shadowUserMigration = migration;
}
public static UserBulkMigrationRequestValidator getInstance(ShadowUserUpload migration) {
return new UserBulkMigrationRequestValidator(migration);
}
public void validate() {
checkCsvRows();
}
public void validateDeclaredUsers() {
checkDeclaredCsvRows();
}
private void checkCsvRows() {
validateRowsCount();
shadowUserMigration
.getValues()
.stream()
.forEach(
onCsvRow -> {
int index = shadowUserMigration.getValues().indexOf(onCsvRow);
validateMigrationUser(onCsvRow, index);
});
if (csvRowsErrors.getErrorsList().size() > 0) {
IErrorDispatcher errorDispatcher = ErrorDispatcherFactory.getErrorDispatcher(csvRowsErrors);
errorDispatcher.dispatchError();
}
}
private void checkDeclaredCsvRows() {
validateRowsCount();
shadowUserMigration
.getUser()
.stream()
.forEach(
onCsvRow -> {
int index = shadowUserMigration.getUser().indexOf(onCsvRow);
validateSelfDeclaredUser(onCsvRow, index);
});
if (CollectionUtils.isNotEmpty(csvRowsErrors.getErrorsList())) {
IErrorDispatcher errorDispatcher = ErrorDispatcherFactory.getErrorDispatcher(csvRowsErrors);
errorDispatcher.dispatchError();
}
}
private void validateRowsCount() {
int ROW_BEGINNING_INDEX = 1;
if ((shadowUserMigration.getValues() != null
&& shadowUserMigration.getValues().size() >= MAX_ROW_SUPPORTED)
|| (shadowUserMigration.getUser() != null
&& shadowUserMigration.getUser().size() >= MAX_ROW_SUPPORTED)) {
throw new ProjectCommonException(
ResponseCode.csvRowsExceeds.getErrorCode(),
ResponseCode.csvRowsExceeds.getErrorMessage().concat("supported:" + MAX_ROW_SUPPORTED),
ResponseCode.CLIENT_ERROR.getResponseCode());
} else if ((shadowUserMigration.getValues() != null
&& shadowUserMigration.getValues().size() < ROW_BEGINNING_INDEX)
|| (shadowUserMigration.getUser() != null
&& shadowUserMigration.getUser().size() < ROW_BEGINNING_INDEX)) {
throw new ProjectCommonException(
ResponseCode.noDataForConsumption.getErrorCode(),
ResponseCode.noDataForConsumption.getErrorMessage(),
ResponseCode.CLIENT_ERROR.getResponseCode());
}
}
private void validateMigrationUser(MigrationUser migrationUser, int index) {
checkEmailAndPhone(migrationUser.getEmail(), migrationUser.getPhone(), index);
checkUserExternalId(migrationUser.getUserExternalId(), index);
checkName(migrationUser.getName(), index);
checkInputStatus(migrationUser.getInputStatus(), index);
}
private void validateSelfDeclaredUser(SelfDeclaredUser migrationUser, int index) {
checkUserDeclaredExternalId(migrationUser.getUserExternalId(), index);
checkSelfDeclaredInputStatus(migrationUser.getInputStatus(), index);
if ((migrationUser.getInputStatus().equals(JsonKey.SELF_DECLARED_ERROR))) {
checkSelfDeclaredErrorTypeIfPresent(migrationUser.getErrorType(), index);
}
checkValue(migrationUser.getUserId(), index, JsonKey.DIKSHA_UUID);
checkValue(migrationUser.getChannel(), index, JsonKey.CHANNEL);
checkValue(migrationUser.getPersona(), index, JsonKey.PERSONA);
}
private void checkUserDeclaredExternalId(String userExternalId, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
errorDetails.setHeader(JsonKey.STATE_PROVIDED_EXT_ID);
if (StringUtils.isBlank(userExternalId)) {
errorDetails.setErrorEnum(ErrorEnum.missing);
}
if (!userExternalIdsSet.add(userExternalId)) {
errorDetails.setErrorEnum(ErrorEnum.duplicate);
}
if (errorDetails.getErrorEnum() != null) {
addErrorToList(errorDetails);
}
}
private void checkSelfDeclaredErrorTypeIfPresent(String errorType, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
if (StringUtils.isBlank(errorType)) {
errorDetails.setHeader("Error Type");
errorDetails.setErrorEnum(ErrorEnum.missing);
addErrorToList(errorDetails);
} else if (!Stream.of(SelfDeclaredErrorTypeEnum.values())
.map(Enum::name)
.collect(Collectors.toList())
.contains((errorType.replace("-", "_")).replace(" ", "_"))) {
errorDetails.setHeader("Error Type");
errorDetails.setErrorEnum(ErrorEnum.invalid);
addErrorToList(errorDetails);
}
}
private void addErrorToList(CsvRowErrorDetails errorDetails) {
csvRowsErrors.setError(errorDetails);
}
public void checkEmailAndPhone(String email, String phone, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
boolean isEmailBlank = StringUtils.isBlank(email);
boolean isPhoneBlank = StringUtils.isBlank(phone);
if (isEmailBlank && isPhoneBlank) {
errorDetails.setErrorEnum(ErrorEnum.missing);
errorDetails.setHeader(JsonKey.EMAIL);
}
if (!isEmailBlank) {
checkEmail(email, index);
}
if (!isPhoneBlank) {
checkPhone(phone, index);
}
if (errorDetails.getErrorEnum() != null) {
addErrorToList(errorDetails);
}
}
private void checkEmail(String email, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
errorDetails.setHeader(JsonKey.EMAIL);
boolean isEmailValid = ProjectUtil.isEmailvalid(email);
if (!isEmailValid) {
errorDetails.setErrorEnum(ErrorEnum.invalid);
}
if (errorDetails.getErrorEnum() != null) {
addErrorToList(errorDetails);
}
}
private void checkPhone(String phone, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
errorDetails.setHeader(JsonKey.PHONE);
boolean isPhoneValid = ProjectUtil.validatePhoneNumber(phone);
if (!isPhoneValid) {
errorDetails.setErrorEnum(ErrorEnum.invalid);
}
if (errorDetails.getErrorEnum() != null) {
addErrorToList(errorDetails);
}
}
private void checkUserExternalId(String userExternalId, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
errorDetails.setHeader(JsonKey.USER_EXTERNAL_ID);
if (StringUtils.isBlank(userExternalId)) {
errorDetails.setErrorEnum(ErrorEnum.missing);
}
if (!userExternalIdsSet.add(userExternalId)) {
errorDetails.setErrorEnum(ErrorEnum.duplicate);
}
if (errorDetails.getErrorEnum() != null) {
addErrorToList(errorDetails);
}
}
private void checkName(String name, int index) {
if (StringUtils.isNotBlank(name) && !(Pattern.matches(NAME_REG_MATCHER, name))) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
errorDetails.setHeader(JsonKey.NAME);
errorDetails.setErrorEnum(ErrorEnum.invalid);
addErrorToList(errorDetails);
}
checkValue(name, index, JsonKey.NAME);
}
private void checkInputStatus(String inputStatus, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
if (StringUtils.isBlank(inputStatus)) {
errorDetails.setHeader(JsonKey.INPUT_STATUS);
errorDetails.setErrorEnum(ErrorEnum.missing);
addErrorToList(errorDetails);
} else if (!(inputStatus.equalsIgnoreCase(JsonKey.ACTIVE)
|| inputStatus.equalsIgnoreCase(JsonKey.INACTIVE))) {
errorDetails.setHeader(JsonKey.INPUT_STATUS);
errorDetails.setErrorEnum(ErrorEnum.invalid);
addErrorToList(errorDetails);
}
}
private void checkSelfDeclaredInputStatus(String inputStatus, int index) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(index);
if (StringUtils.isBlank(inputStatus)) {
errorDetails.setHeader(JsonKey.STATUS);
errorDetails.setErrorEnum(ErrorEnum.missing);
addErrorToList(errorDetails);
} else if (!Stream.of(SelfDeclaredStatusEnum.values())
.map(Enum::name)
.collect(Collectors.toList())
.contains(inputStatus)) {
errorDetails.setHeader(JsonKey.STATUS);
errorDetails.setErrorEnum(ErrorEnum.invalid);
addErrorToList(errorDetails);
}
}
private void checkValue(String column, int rowIndex, String header) {
if (StringUtils.isBlank(column)) {
CsvRowErrorDetails errorDetails = new CsvRowErrorDetails();
errorDetails.setRowId(rowIndex);
errorDetails.setHeader(header);
errorDetails.setErrorEnum(ErrorEnum.missing);
addErrorToList(errorDetails);
}
}
}
package org.sunbird.error;
import java.util.ArrayList;
import java.util.List;
public class CsvError {
private List<CsvRowErrorDetails> errorsList = new ArrayList<>();
public CsvError() {}
public List<CsvRowErrorDetails> getErrorsList() {
return errorsList;
}
public void setErrorsList(List<CsvRowErrorDetails> errorsList) {
this.errorsList = errorsList;
}
public void setError(CsvRowErrorDetails errorDetails) {
errorsList.add(errorDetails);
}
}
package org.sunbird.error;
import org.sunbird.exception.ProjectCommonException;
import org.sunbird.exception.ResponseCode;
/**
* this class will dispatch the errors in the csv format
*
* @author anmolgupta
*/
public class CsvErrorDispatcher implements IErrorDispatcher {
private CsvError error;
private CsvErrorDispatcher(CsvError error) {
this.error = error;
}
public static CsvErrorDispatcher getInstance(CsvError error) {
return new CsvErrorDispatcher(error);
}
@Override
public void dispatchError() {
throw new ProjectCommonException(
ResponseCode.invalidRequestData.getErrorCode(),
error.getErrorsList().toString(),
ResponseCode.CLIENT_ERROR.getResponseCode());
}
}
package org.sunbird.error;
public class CsvRowErrorDetails {
int rowId;
private String header;
private ErrorEnum errorEnum;
public CsvRowErrorDetails(int rowId, String header, ErrorEnum errorEnum) {
this.rowId = rowId;
this.header = header;
this.errorEnum = errorEnum;
}
public CsvRowErrorDetails() {}
public int getRowId() {
return rowId;
}
public void setRowId(int rowId) {
this.rowId = rowId;
}
public String getHeader() {
return header;
}
public void setHeader(String header) {
this.header = header;
}
public ErrorEnum getErrorEnum() {
return errorEnum;
}
public void setErrorEnum(ErrorEnum errorEnum) {
this.errorEnum = errorEnum;
}
@Override
public String toString() {
return "ErrorDetails{"
+ "rowId="
+ rowId
+ ", header='"
+ header
+ '\''
+ ", errorEnum="
+ errorEnum
+ '}';
}
}
package org.sunbird.error;
public enum ErrorEnum {
invalid("invalid"),
duplicate("duplicate"),
missing("missing");
private String value;
ErrorEnum(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
package org.sunbird.error;
/**
* this is an interface class for the error dispatcher
*
* @author anmolgupta
*/
public interface IErrorDispatcher {
/** this method will prepare the error and will throw ProjectCommonException. */
void dispatchError();
}
package org.sunbird.error;
import com.mchange.v1.util.ArrayUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.sunbird.exception.ProjectCommonException;
import org.sunbird.exception.ResponseCode;
/**
* this class will dispatch error in list format
*
* @author anmolgupta
*/
public class ListErrorDispatcher implements IErrorDispatcher {
private CsvError error;
private ListErrorDispatcher(CsvError error) {
this.error = error;
}
public static ListErrorDispatcher getInstance(CsvError error) {
return new ListErrorDispatcher(error);
}
@Override
public void dispatchError() {
Collections.sort(error.getErrorsList(), new RowComparator());
List<String> errors = new ArrayList<>();
error
.getErrorsList()
.stream()
.forEach(
errorDetails -> {
errors.add(
String.format(
"In Row %s:the Column %s:is %s",
errorDetails.getRowId() + 1,
errorDetails.getHeader(),
errorDetails.getErrorEnum().getValue()));
});
throw new ProjectCommonException(
ResponseCode.invalidRequestData.getErrorCode(),
ArrayUtils.stringifyContents(errors.toArray()),
ResponseCode.CLIENT_ERROR.getResponseCode());
}
}
package org.sunbird.error;
import java.util.Comparator;
public class RowComparator implements Comparator<CsvRowErrorDetails> {
@Override
public int compare(CsvRowErrorDetails o1, CsvRowErrorDetails o2) {
return o1.rowId - o2.rowId;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment