Unverified Commit 95162648 authored by Reshmi V Nair's avatar Reshmi V Nair Committed by GitHub
Browse files

Merge branch 'release-5.0.1' into LR-269

Showing with 30 additions and 300 deletions
+30 -300
......@@ -225,17 +225,12 @@ localhost:8081
### Setting up Cloud storage connection:
Setup cloud storage specific variables as environment variables.
```shell
export cloud_storage_type= #values can be 'aws' or 'azure'
export cloud_storage_type= #values can be any cloud service provider
For AWS Cloud Storage connectivity:
export aws_storage_key=
export aws_storage_secret=
export aws_storage_container=
For Azure Cloud Storage connectivity:
export azure_storage_key=
export azure_storage_secret=
export azure_storage_container=
For Cloud Storage connectivity:
export cloud_storage_key=
export cloud_storage_secret=
export cloud_storage_container=
export content_youtube_apikey= #key to fetch metadata of youtube videos
......
......@@ -85,8 +85,8 @@ search_es7_host: "{{ groups['es7']|join(':9200,')}}:9200"
mlworkbench: "{{ groups['mlworkbench'][0]}}"
azure_account: "{{ sunbird_public_storage_account_name }}"
azure_secret: "{{ sunbird_public_storage_account_key }}"
cloud_account: "{{ sunbird_public_storage_account_name }}"
cloud_secret: "{{ sunbird_public_storage_account_key }}"
dedup_redis_host: "{{ dp_redis_host }}"
kp_redis_host: "{{ groups['redisall'][0] }}"
neo4j_route_path: "bolt://{{ groups['learning-neo4j-node1'][0] }}:7687"
......@@ -109,9 +109,9 @@ cert_rc_entity: "TrainingCertificate"
certificate_base_path: "https://{{domain_name}}/certs"
cert_domain_url: "{{proto}}://{{domain_name}}"
cert_container_name: "{{cert_env_prefix}}-e-credentials"
cert_cloud_storage_type: "azure"
cert_azure_storage_secret: "{{sunbird_private_storage_account_key}}"
cert_azure_storage_key: "{{sunbird_private_storage_account_name}}"
cert_cloud_storage_type: "{{cloud_service_provider}}"
cert_cloud_storage_secret: "{{sunbird_private_storage_account_key}}"
cert_cloud_storage_key: "{{sunbird_private_storage_account_name}}"
cloud_store_base_path: "{{cloud_store_base_path}}"
default_channel: "org.sunbird"
......@@ -123,7 +123,7 @@ enable_rc_certificate: true
### kafka and zookeeper ip vars and being used in kafka topiccreation role.
processing_zookeeper_ip: "{{ groups['processing-cluster-zookeepers'][0] }}"
## MultiDC flag for LERN data pipeline jobs
cassandra_isMultiDCEnabled: false
cassandra_isMultiDCEnabled: "{{is_multidc_enabled}}"
# LERN BB specific kafka topics,have to be overridden in private devops repo to include BB Name for integrators.
kafka_topic_course_batch_job_request: "{{env_name}}.coursebatch.job.request"
......
......@@ -81,7 +81,7 @@
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>cloud-store-sdk_2.12</artifactId>
<version>1.4.0</version>
<version>1.4.3</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
......
package org.sunbird.job.util
import org.apache.commons.lang3.StringUtils
import org.sunbird.cloud.storage.BaseStorageService
import org.sunbird.cloud.storage.factory.{StorageConfig, StorageServiceFactory}
import org.sunbird.job.BaseJobConfig
import java.io.File
class CloudStorageUtil(config: BaseJobConfig) extends Serializable {
val cloudStorageType: String = config.getString("cloud_storage_type", "azure")
val supportedCloudStorageType: List[String] = List("azure", "aws", "gcloud")
var storageService: BaseStorageService = null
val container: String = getContainerName
@throws[Exception]
def getService: BaseStorageService = {
if (null == storageService) {
if (supportedCloudStorageType.contains(cloudStorageType)) {
val storageKey = config.getString(s"${cloudStorageType}_storage_key", "")
val storageSecret = config.getString(s"${cloudStorageType}_storage_secret", "")
storageService = StorageServiceFactory.getStorageService(StorageConfig(cloudStorageType, storageKey, storageSecret))
} else {
throw new Exception("Error while initialising cloud storage: " + cloudStorageType)
}
}
storageService
}
def getContainerName: String = {
cloudStorageType match {
case "azure" => config.getString("azure_storage_container", "")
case "aws" => config.getString("aws_storage_container", "")
case _ => throw new Exception("Container name not configured.")
}
}
def uploadFile(folderName: String, file: File, slug: Option[Boolean] = Option(true), container: String = container): Array[String] = {
val slugFile = if (slug.getOrElse(true)) Slug.createSlugFile(file) else file
val objectKey = folderName + "/" + slugFile.getName
val url = getService.upload(container, slugFile.getAbsolutePath, objectKey, Option.apply(false), Option.apply(1), Option.apply(5), Option.empty)
Array[String](objectKey, url)
}
def copyObjectsByPrefix(sourcePrefix: String, destinationPrefix: String, isFolder: Boolean): Unit = {
getService.copyObjects(container, sourcePrefix, container, destinationPrefix, Option.apply(isFolder))
}
def getURI(prefix: String, isDirectory: Option[Boolean]): String = {
getService.getUri(getContainerName, prefix, isDirectory)
}
def uploadDirectory(folderName: String, directory: File, slug: Option[Boolean] = Option(true)): Array[String] = {
val slugFile = if (slug.getOrElse(true)) Slug.createSlugFile(directory) else directory
val objectKey = folderName + File.separator
val url = getService.upload(getContainerName, slugFile.getAbsolutePath, objectKey, Option.apply(true), Option.apply(1), Option.apply(5), Option.empty)
Array[String](objectKey, url)
}
def deleteFile(key: String, isDirectory: Option[Boolean] = Option(false)): Unit = {
getService.deleteObject(getContainerName, key, isDirectory)
}
def downloadFile(downloadPath: String, file: String, slug: Option[Boolean] = Option(false)): Unit = {
getService.download(getContainerName, file, downloadPath, slug)
}
}
package org.sunbird.job.util
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.slf4j.LoggerFactory
import java.io.{File, FileInputStream, FileOutputStream, IOException}
import java.net.{HttpURLConnection, URL}
import java.nio.file.{Files, Path, Paths}
import java.util.zip.{ZipEntry, ZipFile, ZipOutputStream}
import scala.collection.JavaConverters._
object FileUtils {
private[this] val logger = LoggerFactory.getLogger(classOf[FileUtils])
def copyURLToFile(objectId: String, fileUrl: String, suffix: String): Option[File] = try {
val fileName = getBasePath(objectId) + "/" + suffix
val file = new File(fileName)
org.apache.commons.io.FileUtils.copyURLToFile(new URL(fileUrl), file)
Some(file)
} catch {
case e: IOException => logger.error(s"Please Provide Valid File Url. Url: $fileUrl and objectId: $objectId!", e)
None
}
def getBasePath(objectId: String): String = {
if (!StringUtils.isBlank(objectId))
s"/tmp/$objectId/${System.currentTimeMillis}_temp"
else s"/tmp/${System.currentTimeMillis}_temp"
}
def uploadFile(fileOption: Option[File], identifier: String, objType: String)(implicit cloudStorageUtil: CloudStorageUtil): Option[String] = {
fileOption match {
case Some(file: File) =>
logger.info("FileUtils :: uploadFile :: file path :: " + file.getAbsolutePath)
val folder = objType.toLowerCase + File.separator + identifier
val urlArray: Array[String] = cloudStorageUtil.uploadFile(folder, file, Some(false))
logger.info(s"FileUtils ::: uploadFile ::: url for $identifier is : ${urlArray(1)}")
Some(urlArray(1))
case _ => None
}
}
@throws[Exception]
def downloadFile(fileUrl: String, basePath: String): File = {
val url = new URL(fileUrl)
val httpConn = url.openConnection().asInstanceOf[HttpURLConnection]
val disposition = httpConn.getHeaderField("Content-Disposition")
httpConn.getContentType
httpConn.getContentLength
val fileName = if (StringUtils.isNotBlank(disposition)) {
val index = disposition.indexOf("filename=")
if (index > 0)
disposition.substring(index + 10, disposition.indexOf("\"", index + 10))
else
fileUrl.substring(fileUrl.lastIndexOf("/") + 1, fileUrl.length)
} else fileUrl.substring(fileUrl.lastIndexOf("/") + 1, fileUrl.length)
val saveFile = new File(basePath)
if (!saveFile.exists) saveFile.mkdirs
val saveFilePath = basePath + File.separator + fileName
val inputStream = httpConn.getInputStream
val outputStream = new FileOutputStream(saveFilePath)
IOUtils.copy(inputStream, outputStream)
val file = new File(saveFilePath)
logger.info("FileUtils :: downloadFile :: " + System.currentTimeMillis() + " ::: Downloaded file: " + file.getAbsolutePath)
file
}
def extractPackage(file: File, basePath: String): Unit = {
val zipFile = new ZipFile(file)
for (entry <- zipFile.entries().asScala) {
val path = Paths.get(basePath + File.separator + entry.getName)
if (entry.isDirectory) Files.createDirectories(path)
else {
Files.createDirectories(path.getParent)
Files.copy(zipFile.getInputStream(entry), path)
}
}
}
def createFile(fileName: String): File = {
val file = new File(fileName)
org.apache.commons.io.FileUtils.touch(file)
file
}
def writeStringToFile(file: File, data: String, encoding: String = "UTF-8"): Unit = {
org.apache.commons.io.FileUtils.writeStringToFile(file, data, encoding)
}
def copyFile(file: File, newFile: File): Unit = {
org.apache.commons.io.FileUtils.copyFile(file, newFile)
}
def readJsonFile(filePath: String, fileName: String): Map[String, AnyRef] = {
val source = scala.io.Source.fromFile(filePath + File.separator + fileName)
val lines = try source.mkString finally source.close()
JSONUtil.deserialize[Map[String, AnyRef]](lines)
}
def deleteDirectory(file: File): Unit = {
org.apache.commons.io.FileUtils.deleteDirectory(file)
}
def deleteQuietly(fileName: String): Unit = {
org.apache.commons.io.FileUtils.deleteQuietly(org.apache.commons.io.FileUtils.getFile(fileName).getParentFile)
}
def createZipPackage(basePath: String, zipFileName: String): Unit =
if (!StringUtils.isBlank(zipFileName)) {
logger.info("Creating Zip File: " + zipFileName)
val fileList: List[String] = generateFileList(basePath)
zipIt(zipFileName, fileList, basePath)
}
private def generateFileList(sourceFolder: String): List[String] =
Files.walk(Paths.get(new File(sourceFolder).getPath)).toArray()
.map(path => path.asInstanceOf[Path])
.filter(path => Files.isRegularFile(path))
.map(path => generateZipEntry(path.toString, sourceFolder)).toList
private def generateZipEntry(file: String, sourceFolder: String): String = file.substring(sourceFolder.length, file.length)
def zipIt(zipFile: String, fileList: List[String], sourceFolder: String): Unit = {
val buffer = new Array[Byte](1024)
var zos: ZipOutputStream = null
try {
zos = new ZipOutputStream(new FileOutputStream(zipFile))
logger.info("Creating Zip File: " + zipFile)
fileList.foreach(file => {
val ze = new ZipEntry(file)
zos.putNextEntry(ze)
val in = new FileInputStream(sourceFolder + File.separator + file)
try {
var len = in.read(buffer)
while (len > 0) {
zos.write(buffer, 0, len)
len = in.read(buffer)
}
} finally if (in != null) in.close()
zos.closeEntry()
})
} catch {
case e: IOException =>
logger.error("Error! Something Went Wrong While Creating the ZIP File: " + e.getMessage, e)
throw new Exception("Something Went Wrong While Creating the ZIP File", e)
} finally if (zos != null) zos.close()
}
}
class FileUtils {}
package org.sunbird.spec
import org.scalatest.{FlatSpec, Matchers}
import org.sunbird.job.util.FileUtils
import java.io.File
class FileUtilsSpec extends FlatSpec with Matchers {
"getBasePath with empty identifier" should "return the path" in {
val result = FileUtils.getBasePath("")
result.nonEmpty shouldBe (true)
}
"downloadFile " should " download the media source file starting with http or https " in {
val fileUrl: String = "https://preprodall.blob.core.windows.net/ntp-content-preprod/content/do_21273718766395392014320/artifact/book-image_1554832478631.jpg"
val downloadedFile: File = FileUtils.downloadFile(fileUrl, "/tmp/contentBundle")
assert(downloadedFile.exists())
}
}
......@@ -110,7 +110,7 @@ spec:
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
"--job-classname={{ .Values.job_classname }}",
"-Dfs.azure.account.key.{{ .Values.azure_account }}.blob.core.windows.net={{ .Values.azure_secret }}",
"-Dfs.cloud.account.key.{{ .Values.cloud_account }}.blob.core.windows.net={{ .Values.cloud_secret }}",
"-Dweb.submit.enable=false",
"-Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter",
"-Dmetrics.reporter.prom.port={{ .Values.jobmanager.prom_port }}",
......@@ -183,7 +183,7 @@ spec:
workingDir: {{ .Values.taskmanager.flink_work_dir }}
command: ["/opt/flink/bin/taskmanager.sh"]
args: ["start-foreground",
"-Dfs.azure.account.key.{{ .Values.azure_account }}.blob.core.windows.net={{ .Values.azure_secret }}",
"-Dfs.cloud.account.key.{{ .Values.cloud_account }}.blob.core.windows.net={{ .Values.cloud_secret }}",
"-Dweb.submit.enable=false",
"-Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter",
"-Dmetrics.reporter.prom.host={{ .Release.Name }}-taskmanager",
......
......@@ -3,8 +3,8 @@ imagepullsecrets: {{ imagepullsecrets }}
dockerhub: {{ dockerhub }}
repository: {{flink_repository|default('data-pipeline')}}
image_tag: {{ image_tag }}
azure_account: {{ azure_account }}
azure_secret: {{ azure_secret }}
cloud_account: {{ cloud_account }}
cloud_secret: {{ cloud_secret }}
serviceMonitor:
enabled: {{ service_monitor_enabled | lower}}
......@@ -88,7 +88,7 @@ base_config: |
statebackend {
blob {
storage {
account = "{{ azure_account }}.blob.core.windows.net"
account = "{{ cloud_account }}.blob.core.windows.net"
container = "{{ flink_container_name }}"
checkpointing.dir = "checkpoint"
}
......@@ -350,8 +350,8 @@ collection-certificate-generator:
cert_domain_url = "{{ cert_domain_url }}"
cert_container_name = "{{ cert_container_name }}"
cert_cloud_storage_type = "{{ cert_cloud_storage_type }}"
cert_azure_storage_secret = "{{ cert_azure_storage_secret }}"
cert_azure_storage_key = "{{ cert_azure_storage_key }}"
cert_cloud_storage_secret = "{{ cert_cloud_storage_secret }}"
cert_cloud_storage_key = "{{ cert_cloud_storage_key }}"
cloud_store_base_path = "{{ cloud_store_base_path }}"
service {
certreg.basePath = "{{ cert_reg_service_base_url }}"
......
......@@ -20,7 +20,7 @@
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>cloud-store-sdk_${scala.version}</artifactId>
<version>1.4.0</version>
<version>1.4.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
......
......@@ -20,17 +20,6 @@ object JsonKeys {
val TAG: String = "tag"
val ID: String = "id"
val ENTITY: String = "entity"
val CLOUD_STORAGE_TYPE: String = "CLOUD_STORAGE_TYPE"
val CLOUD_UPLOAD_RETRY_COUNT: String = "CLOUD_UPLOAD_RETRY_COUNT"
val AZURE_STORAGE_SECRET: String = "AZURE_STORAGE_SECRET"
val AZURE_STORAGE_KEY: String = "AZURE_STORAGE_KEY"
val AZURE: String = "azure"
val AWS: String = "aws"
val AWS_STORAGE_KEY: String = "AWS_STORAGE_KEY"
val AWS_STORAGE_SECRET: String = "AWS_STORAGE_SECRET"
val GCLOUD: String = "gcloud"
val GCLOUD_STORAGE_KEY: String = "GCLOUD_STORAGE_KEY"
val GCLOUD_STORAGE_SECRET: String = "GCLOUD_STORAGE_SECRET"
val SLUG: String = "sunbird_cert_slug"
val ACCESS_CODE_LENGTH: String = "ACCESS_CODE_LENGTH"
val PATH: String = "path"
......
package org.sunbird.incredible.processor.store
import java.io.File
import org.apache.commons.lang3.StringUtils
import org.sunbird.cloud.storage.BaseStorageService
import org.sunbird.cloud.storage.factory.StorageConfig
import org.sunbird.cloud.storage.factory.StorageServiceFactory
import org.sunbird.cloud.storage.factory.{StorageConfig, StorageServiceFactory}
import org.sunbird.incredible.pojos.exceptions.ServerException
import org.sunbird.incredible.{JsonKeys, StorageParams, UrlManager}
import org.sunbird.incredible.{StorageParams, UrlManager}
import java.io.File
class StorageService(storageParams: StorageParams) extends Serializable {
var storageService: BaseStorageService = _
val storageType: String = storageParams.cloudStorageType
val supportedCloudStorageType: List[String] = List(JsonKeys.AZURE, JsonKeys.AWS, JsonKeys.GCLOUD)
@throws[Exception]
def getService: BaseStorageService = {
if (null == storageService) {
if (supportedCloudStorageType.contains(storageType)) {
storageService = StorageServiceFactory.getStorageService(StorageConfig(storageType, storageParams.storageKey, storageParams.storageSecret))
storageService = StorageServiceFactory.getStorageService(StorageConfig(storageParams.cloudStorageType, storageParams.storageKey, storageParams.storageSecret))
} else {
throw new ServerException("ERR_INVALID_CLOUD_STORAGE", "Error while initialising cloud storage")
}
}
storageService
}
def getContainerName: String = {
if (supportedCloudStorageType.contains(storageType))
storageParams.containerName
else
throw new ServerException("ERR_INVALID_CLOUD_STORAGE", "Container name not configured.")
}
def uploadFile(path: String, file: File): String = {
val objectKey = path + file.getName
val containerName = getContainerName
val containerName = storageParams.containerName
val url = getService.upload(containerName, file.getAbsolutePath, objectKey, Option.apply(false), Option.apply(1), Option.apply(5), Option.empty)
UrlManager.getSharableUrl(url, containerName)
}
......
......@@ -70,8 +70,8 @@ class CertificateGeneratorConfig(override val config: Config) extends BaseJobCon
// env vars
val storageType: String = config.getString("cert_cloud_storage_type")
val containerName: String = config.getString("cert_container_name")
val storageSecret: String = config.getString(s"cert_${storageType}_storage_secret")
val storageKey: String = config.getString(s"cert_${storageType}_storage_key")
val storageSecret: String = config.getString("cert_cloud_storage_secret")
val storageKey: String = config.getString("cert_cloud_storage_key")
val domainUrl: String = config.getString("cert_domain_url")
val encServiceUrl: String = config.getString("service.enc.basePath")
val certRegistryBaseUrl: String = config.getString("service.certreg.basePath")
......
......@@ -21,9 +21,9 @@ service {
cert_domain_url="https://dev.sunbirded.org"
cert_cloud_storage_type="azure"
cert_azure_storage_secret="secret"
cert_cloud_storage_secret="secret"
cert_container_name="credential"
cert_azure_storage_key="key"
cert_cloud_storage_key="key"
lms-cassandra {
keyspace = "sunbird_courses"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment