Commit 79f0a888 authored by Mahesh Kumar Gangula's avatar Mahesh Kumar Gangula
Browse files

Issue #000 fix: route path issue fix.

parent ee34dfb4
taxonomy-api 3.9.0-prod-fix 4.1.0_fixes AmiableAnil-patch-1 Gcloud_copy Gcloud_fix Remove_unwantedCode_Gcloud_fix actors-test bulk-upload-comptenecy-mapping bulk-upload-excelsheet bulk-upload-test_excel bulk_upload code-cleanup csp-migration dependabot/maven/platform-core/platform-telemetry/ch.qos.logback-logback-core-1.2.9 dependabot/maven/search-api/search-core/org.apache.httpcomponents-httpclient-4.5.13 flag-api ft-spike knowlg-friday knowlg-oneclick loadtest-release-2.10 local-setup-fix local-setup-kube m-4.2.0 master master-data neo4j-3.4.9 neo4j-4-code new_objecttype org-target-enhancement patch-1 patch-2 patch-3 poc_bulk_upload qs-schema rahul_bulk_upload_postgres release-2.10.0 release-2.9.0 release-3.0.0 release-3.0.1 release-3.1.0 release-3.2.0 release-3.3.0 release-3.4.0 release-3.5.0 release-3.6.0 release-3.6.0.1 release-3.7.0 release-3.8.0 release-3.9.0 release-4.0.0 release-4.1.0 release-4.10.0 release-4.10.1 release-4.2.0 release-4.3.0 release-4.4.0 release-4.5.0 release-4.6.0 release-4.7.0 release-4.7.0-debug release-4.8.0 release-4.8.0-debug release-4.9.0 release-4.9.1 release-5.0.0 release-5.0.1 release-5.1.0 release-5.1.0-content release-5.2.0 release-5.3.0 retire-api revert-718-visiblity-private-api revert-719-assessment-private-read-api revert-720-visiblity-public-search-api revert-721-visiblity-private-search-api review-4.1.0 s-debug schema-update-4.0 schema-updates release-5.2.0_RC2 release-5.2.0_RC1 release-5.1.0_RC1 release-5.0.0_RC2 release-5.0.0_RC1 release-4.10.0_RC2 release-4.10.0_RC1 release-4.9.1_RC1 release-4.9.0_RC8 release-4.9.0_RC7 release-4.9.0_RC6 release-4.9.0_RC5 release-4.9.0_RC4 release-4.9.0_RC3 release-4.9.0_RC2 release-4.9.0_RC1 release-4.8.0_RC5 release-4.8.0_RC4 release-4.8.0_RC3 release-4.8.0_RC2 release-4.8.0_RC1 release-4.7.0_RC6 release-4.7.0_RC5 release-4.7.0_RC4 release-4.7.0_RC3 release-4.7.0_RC2 release-4.7.0_RC1 release-4.6.0_RC2 release-4.6.0_RC1 release-4.5.0_RC2 release-4.5.0_RC1 release-4.4.0_RC6 release-4.4.0_RC5 release-4.4.0_RC4 release-4.4.0_RC3 release-4.4.0_RC2 release-4.4.0_RC1 release-4.3.0_RC10 release-4.3.0_RC9 release-4.3.0_RC8 release-4.3.0_RC7 release-4.3.0_RC6 release-4.3.0_RC5 release-4.3.0_RC4 release-4.3.0_RC3 release-4.3.0_RC2 release-4.3.0_RC1 release-4.2.0_RC13 release-4.2.0_RC12 release-4.2.0_RC11 release-4.2.0_RC10 release-4.2.0_RC9 release-4.2.0_RC8 release-4.2.0_RC7 release-4.2.0_RC6 release-4.2.0_RC5 release-4.2.0_RC4 release-4.2.0_RC3 release-4.2.0_RC2 release-4.2.0_RC1 release-4.1.0_RC13 release-4.1.0_RC12 release-4.1.0_RC11 release-4.1.0_RC10 release-4.1.0_RC9 release-4.1.0_RC8 release-4.1.0_RC7 release-4.1.0_RC6 release-4.1.0_RC5 release-4.1.0_RC4 release-4.1.0_RC3 release-4.1.0_RC2 release-4.1.0_RC1 release-4.0.0_RC9 release-4.0.0_RC8 release-4.0.0_RC7 release-4.0.0_RC6 release-4.0.0_RC5 release-4.0.0_RC4 release-4.0.0_RC3 release-4.0.0_RC2 release-4.0.0_RC1 release-3.9.0_RC12 release-3.9.0_RC11 release-3.9.0_RC10 release-3.9.0_RC9 release-3.9.0_RC8 release-3.9.0_RC7 release-3.9.0_RC6 release-3.9.0_RC5 release-3.9.0_RC4 release-3.9.0_RC3 release-3.9.0_RC2 release-3.9.0_RC1 release-3.8.0_RC9 release-3.8.0_RC8 release-3.8.0_RC7 release-3.8.0_RC6 release-3.8.0_RC5 release-3.8.0_RC4 release-3.8.0_RC3 release-3.8.0_RC2 release-3.8.0_RC1 release-3.7.0_RC1 release-3.6.0_RC5 release-3.6.0_RC4 release-3.6.0_RC3 release-3.6.0_RC2 release-3.6.0_RC1 release-3.5.0 release-3.5.0_RC2 release-3.5.0_RC1 release-3.4.0 release-3.4.0_RC2 release-3.4.0_RC1 release-3.3.0_RC10 release-3.3.0_RC9 release-3.3.0_RC8 release-3.3.0_RC7 release-3.3.0_RC6 release-3.3.0_RC5 release-3.3.0_RC4 release-3.3.0_RC3 release-3.3.0_RC2 release-3.3.0_RC1 release-3.2.0_RC9 release-3.2.0_RC8 release-3.2.0_RC7 release-3.2.0_RC6 release-3.2.0_RC5 release-3.2.0_RC4 release-3.2.0_RC3 release-3.2.0_RC2 release-3.2.0_RC1 release-3.1.0 release-3.1.0_RC8 release-3.1.0_RC7 release-3.1.0_RC6 release-3.1.0_RC5 release-3.1.0_RC4 release-3.1.0_RC3 release-3.1.0_RC2 release-3.1.0_RC1 release-3.0.1_RC3 release-3.0.1_RC2 release-3.0.1_RC1 release-3.0.0 release-3.0.0_RC11 release-3.0.0_RC10 release-3.0.0_RC9 release-3.0.0_RC8 release-3.0.0_RC7 release-3.0.0_RC6 release-3.0.0_RC5 release-3.0.0_RC4 release-3.0.0_RC3 release-3.0.0_RC2 release-3.0.0_RC1 release-2.10.0 release-2.10.0_RC19 release-2.10.0_RC18 release-2.10.0_RC17 release-2.10.0_RC16 release-2.10.0_RC15 release-2.10.0_RC14 release-2.10.0_RC13 release-2.10.0_RC12 release-2.10.0_RC11 release-2.10.0_RC10 release-2.10.0_RC9 release-2.10.0_RC8 release-2.10.0_RC7 release-2.10.0_RC6 release-2.10.0_RC5 release-2.10.0_RC4 release-2.10.0_RC3 release-2.10.0_RC2 release-2.10.0_RC1 release-2.9.0_RC9 release-2.9.0_RC8 release-2.9.0_RC7
No related merge requests found
Showing with 666 additions and 1 deletion
+666 -1
......@@ -30,7 +30,7 @@ PATCH /content/v3/hierarchy/add controllers.v3.ContentController.add
DELETE /content/v3/hierarchy/remove controllers.v3.ContentController.removeHierarchy
PATCH /content/v3/hierarchy/update controllers.v3.ContentController.updateHierarchy
GET /content/v3/hierarchy/:identifier controllers.v3.ContentController.getHierarchy(identifier:String, mode:Option[String])
GET /content/v3/hierarch/:identifier/:bookmarkId controllers.v3.ContentController.getBookmarkHierarchy(identifier: String, bookmarkId: String, mode: Option[String])
GET /content/v3/hierarchy/:identifier/:bookmarkId controllers.v3.ContentController.getBookmarkHierarchy(identifier: String, bookmarkId: String, mode: Option[String])
POST /content/v3/upload/:identifier controllers.v3.ContentController.upload(identifier:String)
POST /content/v3/copy/:identifier controllers.v3.ContentController.copy(identifier:String, mode:Option[String], type:String ?= "deep")
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.sunbird</groupId>
<artifactId>flink-stream-sample</artifactId>
<name>Flink Quickstart Job</name>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer>
<mainClass>org.sunbird.async.job.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
</args>
</configuration>
</plugin>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-shaded-asm-7</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.10.0</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.sunbird</groupId>
<artifactId>flink-stream-sample</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.4</version>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.async.job.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-nobootcp</arg>
</args>
</configuration>
</plugin>
<!-- Eclipse Scala Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
kafka {
input.topic = "k8s.telemetry.valid"
output.success.topic = "k8s.telemetry.unique.flink"
output.duplicate.topic = "k8s.telemetry.duplicate.flink"
output.malformed.topic = "k8s.telemetry.malformed"
broker-servers = "192.168.1.5:9092"
zookeeper = "192.168.1.5:2181"
# dev-environment
# broker-servers = "11.2.1.15:9092"
# zookeeper = "11.2.1.15:2181"
groupId = "telemetry-deduplication-group"
}
task {
parallelism = 3
checkpointing.interval = 60000
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
package org.sunbird.async.core
import java.util.Properties
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.clients.producer.ProducerConfig
trait BaseJobConfig extends Serializable {
val config: Config = ConfigFactory.load()
val kafkaBrokerServers: String = config.getString("kafka.broker-servers")
val zookeeper: String = config.getString("kafka.zookeeper")
val groupId: String = config.getString("kafka.groupId")
val checkpointingInterval: Int = config.getInt("task.checkpointing.interval")
val parallelism: Int = config.getInt("task.parallelism")
def kafkaConsumerProperties: Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokerServers)
properties.setProperty("group.id", groupId)
// properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, new Integer(524288))
properties
}
def kafkaProducerProperties: Properties = {
val properties = new Properties()
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerServers)
properties.put(ProducerConfig.LINGER_MS_CONFIG, new Integer(10))
// properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new Integer(67108864))
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, new Integer(16384 * 4))
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
properties
}
}
package org.sunbird.async.function
import java.util
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
import org.sunbird.async.job.TestJobConfig
class PrintFunction(config: TestJobConfig, taskName: String)(implicit val eventTypeInfo: TypeInformation[util.Map[String, AnyRef]]) extends ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]] {
private[this] val logger = LoggerFactory.getLogger(this.getClass.getCanonicalName+"."+taskName)
// lazy val outputTag: OutputTag[util.Map[String, AnyRef]] = new OutputTag[util.Map[String, AnyRef]](id = "kafka-output")
override def processElement(i: util.Map[String, AnyRef],
context: ProcessFunction[util.Map[String, AnyRef], util.Map[String, AnyRef]]#Context,
collector: Collector[util.Map[String, AnyRef]]): Unit = {
logger.info("Event reached to PrintFunction: " + taskName)
Thread.sleep(1000)
logger.info("Triggered print function for event: " + i.get("mid") + " :via: " + taskName + " :at: " + System.currentTimeMillis)
collector.collect(i)
}
}
package org.sunbird.async.job
import java.util
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.sunbird.async.function.PrintFunction
import org.sunbird.async.task.BaseStreamTask
/**
* Skeleton for a Flink Streaming Job.
*
* For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>.
*
* To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*/
class StreamingJob(config: TestJobConfig) extends BaseStreamTask(config) {
def process = {
implicit val eventTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]])
implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(config.checkpointingInterval)
try {
val kafkaConsumer = createKafkaStreamConsumer(config.kafkaInputTopic)
val dataStream = env.addSource(kafkaConsumer).process(new PrintFunction(config, "input-task")).setParallelism(1).name("kafka-input")
val firstTaskStream = dataStream.process(new PrintFunction(config, "first-task")).name("first-task").setParallelism(2).name("first-task")
val secondTaskStream = firstTaskStream.process(new PrintFunction(config, "second-task")).name("second-task").setParallelism(1).name("second-task")
secondTaskStream.print()
env.execute("TestStreamingJob")
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
}
object StreamingJob {
val config = new TestJobConfig
def apply(): StreamingJob = new StreamingJob(config)
def main(args: Array[String]): Unit = {
StreamingJob.apply().process
}
}
package org.sunbird.async.job
import org.sunbird.async.core.BaseJobConfig
class TestJobConfig extends BaseJobConfig {
val kafkaInputTopic: String = config.getString("kafka.input.topic")
val kafkaSuccessTopic: String = config.getString("kafka.output.success.topic")
}
package org.sunbird.async.task
import java.nio.charset.StandardCharsets
import java.util
import com.google.gson.Gson
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.sunbird.async.core.BaseJobConfig
abstract class BaseStreamTask(config: BaseJobConfig) extends Serializable{
def createKafkaStreamConsumer(kafkaTopic: String): FlinkKafkaConsumer[util.Map[String, AnyRef]] = {
new FlinkKafkaConsumer[util.Map[String, AnyRef]](kafkaTopic, new ConsumerStringDeserializationSchema, config.kafkaConsumerProperties)
}
def createKafkaStreamProducer(kafkaTopic: String): FlinkKafkaProducer[String] = {
new FlinkKafkaProducer(kafkaTopic,
new ProducerStringSerializationSchema(kafkaTopic), config.kafkaProducerProperties, Semantic.AT_LEAST_ONCE)
}
}
class ConsumerStringDeserializationSchema extends KafkaDeserializationSchema[util.Map[String, AnyRef]] {
override def isEndOfStream(nextElement: util.Map[String, AnyRef]): Boolean = false
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): util.Map[String, AnyRef] = {
val parsedString = new String(record.value(), StandardCharsets.UTF_8)
val gson: Gson = new Gson()
gson.fromJson(parsedString, new util.HashMap[String, AnyRef]().getClass)
}
override def getProducedType: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]])
}
class ProducerStringSerializationSchema(topic: String) extends KafkaSerializationSchema[String] {
override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
// implicit val formats: DefaultFormats = DefaultFormats
new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment