Skip to content

Commit 3cf17f4

Browse files
Merge pull request #40 from WeDataSphere/dev-0.2.0-version-compatible
Version compatible before launching task to Linkis server
2 parents ef659d5 + 651f80b commit 3cf17f4

File tree

5 files changed

+160
-65
lines changed

5 files changed

+160
-65
lines changed

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/conf/JobLauncherConfiguration.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ object JobLauncherConfiguration {
3535
*/
3636
val FLINK_CHECKPOINT_PATH: CommonVars[String] = CommonVars("wds.streamis.launch.flink.checkpoint.dir", "/flink/flink-checkpoints")
3737

38+
/**
39+
* Linkis release version
40+
*/
41+
val FLINK_LINKIS_RELEASE_VERSION: CommonVars[String] = CommonVars("wds.streamis.launch.flink.linkis.release.version", "")
3842
/**
3943
* Variable: savepoint path
4044
*/
@@ -44,4 +48,5 @@ object JobLauncherConfiguration {
4448
* Variable: flink app
4549
*/
4650
val VAR_FLINK_APP_NAME: CommonVars[String] = CommonVars("wds.streamis.launch.variable.flink.app.name", "flink.app.name")
51+
4752
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
4848
* @param jobState job state used to launch
4949
* @return the job id.
5050
*/
51-
override def launch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = {
51+
override def innerLaunch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = {
5252
// Transform the JobState into the params in LaunchJob
5353
Option(jobState).foreach(state => {
5454
val startUpParams = TaskUtils.getStartupMap(job.getParams)
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,97 @@
11
package com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager
22

33
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager
4+
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.JobState
5+
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.{JobClient, LaunchJob}
6+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.conf.JobLauncherConfiguration
47
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.LinkisJobInfo
8+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.manager.LinkisJobLaunchManager.LINKIS_JAR_VERSION_PATTERN
9+
import org.apache.commons.io.IOUtils
10+
import org.apache.commons.lang3.StringUtils
11+
import org.apache.linkis.common.utils.{Logging, Utils}
12+
import org.apache.linkis.computation.client.LinkisJob
13+
import org.apache.linkis.protocol.utils.TaskUtils
514

6-
trait LinkisJobLaunchManager extends JobLaunchManager[LinkisJobInfo] {
15+
import java.util
16+
import scala.collection.JavaConverters._
17+
import scala.util.matching.Regex
718

19+
trait LinkisJobLaunchManager extends JobLaunchManager[LinkisJobInfo] with Logging{
20+
/**
21+
* This method is used to launch a new job.
22+
*
23+
* @param job a StreamisJob wanted to be launched.
24+
* @param jobState job state used to launch
25+
* @return the job id.
26+
*/
27+
override def launch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo] = {
28+
// Support different version of Linkis
29+
var linkisVersion = JobLauncherConfiguration.FLINK_LINKIS_RELEASE_VERSION.getValue
30+
if (StringUtils.isBlank(linkisVersion)) {
31+
val linkisJarPath = classOf[LinkisJob].getProtectionDomain.getCodeSource.getLocation.getPath;
32+
val lastSplit = linkisJarPath.lastIndexOf(IOUtils.DIR_SEPARATOR);
33+
if (lastSplit >= 0) {
34+
linkisVersion = linkisJarPath.substring(lastSplit + 1)
35+
}
36+
}
37+
if (StringUtils.isNotBlank(linkisVersion)) {
38+
Utils.tryAndWarn {
39+
val LINKIS_JAR_VERSION_PATTERN(version) = linkisVersion
40+
linkisVersion = version
41+
}
42+
}
43+
if (StringUtils.isNotBlank(linkisVersion)){
44+
val versionSplitter: Array[String] = linkisVersion.split("\\.")
45+
val major = Integer.valueOf(versionSplitter(0))
46+
val sub = Integer.valueOf(versionSplitter(1))
47+
val fix = Integer.valueOf(versionSplitter(2))
48+
val versionNum = major * 10000 + sub * 100 + fix
49+
info(s"Recognized the linkis release version: [${linkisVersion}, version number: [${versionNum}]")
50+
if (versionNum <= 10101){
51+
warn("Linkis version number is less than [10101], should compatible the startup params in launcher.")
52+
val startupParams = TaskUtils.getStartupMap(job.getParams)
53+
// Change the unit of memory params for linkis older version
54+
changeUnitOfMemoryToG(startupParams, "flink.taskmanager.memory")
55+
changeUnitOfMemoryToG(startupParams, "flink.jobmanager.memory")
56+
// Avoid the _FLINK_CONFIG_. prefix for linkis older version
57+
val newParams = avoidParamsPrefix(startupParams, "_FLINK_CONFIG_.")
58+
startupParams.clear();
59+
startupParams.putAll(newParams)
60+
}
61+
}
62+
innerLaunch(job, jobState)
63+
}
864

65+
private def changeUnitOfMemoryToG(params: util.Map[String, Any], name: String): Unit = {
66+
params.get(name) match {
67+
case memory: String =>
68+
var actualMem = Integer.valueOf(memory) / 1024
69+
actualMem = if (actualMem <= 0) 1 else actualMem
70+
info(s"Change the unit of startup param: [${name}], value [${memory}] => [${actualMem}]")
71+
params.put(name, actualMem)
72+
case _ => // Ignores
73+
}
74+
}
75+
76+
/**
77+
* Avoid params prefix
78+
* @param params params
79+
* @param prefix prefix
80+
*/
81+
private def avoidParamsPrefix(params: util.Map[String, Any], prefix: String): util.Map[String, Any] = {
82+
params.asScala.map{
83+
case (key, value) =>
84+
if (key.startsWith(prefix)){
85+
info(s"Avoid the prefix of startup param: [${key}] => [${key.substring(prefix.length)}]")
86+
(key.substring(prefix.length), value)
87+
} else {
88+
(key, value)
89+
}
90+
}.toMap.asJava
91+
}
92+
def innerLaunch(job: LaunchJob, jobState: JobState): JobClient[LinkisJobInfo]
93+
}
94+
95+
object LinkisJobLaunchManager{
96+
val LINKIS_JAR_VERSION_PATTERN: Regex = "^[\\s\\S]*([\\d]+\\.[\\d]+\\.[\\d]+)[\\s\\S]*$".r
997
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,4 @@ class SimpleFlinkJobLaunchManager extends FlinkJobLaunchManager {
130130
object SimpleFlinkJobLaunchManager{
131131

132132
val INSTANCE_NAME = "simpleFlink";
133-
134133
}
Lines changed: 65 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,65 @@
1-
#
2-
# Copyright 2021 WeBank
3-
# Licensed under the Apache License, Version 2.0 (the "License");
4-
# you may not use this file except in compliance with the License.
5-
# You may obtain a copy of the License at
6-
#
7-
# http://www.apache.org/licenses/LICENSE-2.0
8-
#
9-
# Unless required by applicable law or agreed to in writing, software
10-
# distributed under the License is distributed on an "AS IS" BASIS,
11-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12-
# See the License for the specific language governing permissions and
13-
# limitations under the License.
14-
#
15-
#wds.linkis.test.mode=true
16-
wds.linkis.server.mybatis.datasource.url=jdbc:mysql://localhost:3306/streamis?characterEncoding=UTF-8
17-
wds.linkis.server.mybatis.datasource.username=user1
18-
19-
wds.linkis.server.mybatis.datasource.password=pwd1
20-
wds.linkis.gateway.ip=
21-
wds.linkis.gateway.port=
22-
wds.linkis.gateway.url=http://localhost:9001
23-
24-
wds.linkis.mysql.is.encrypt=false
25-
##restful
26-
wds.linkis.log.clear=true
27-
wds.linkis.server.version=v1
28-
#wds.linkis.test.user=user1
29-
30-
31-
32-
##restful
33-
wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.streamis.datasource.server.restful.api,\
34-
com.webank.wedatasphere.streamis.project.server.restful,\
35-
com.webank.wedatasphere.streamis.jobmanager.restful.api,\
36-
com.webank.wedatasphere.streamis.datasource.execute.rest,\
37-
com.webank.wedatasphere.streamis.projectmanager.restful.api
38-
##mybatis
39-
wds.linkis.server.mybatis.mapperLocations=\
40-
classpath*:com/webank/wedatasphere/streamis/datasource/manager/dao/impl/*.xml,\
41-
classpath*:com/webank/wedatasphere/streamis/project/server/dao/impl/*.xml,\
42-
classpath*:com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/*.xml,\
43-
classpath*:com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/*.xml,\
44-
classpath*:com/webank/wedatasphere/streamis/projectmanager/dao/impl/*.xml
45-
46-
wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.streamis.datasource.manager.domain,\
47-
com.webank.wedatasphere.streamis.jobmanager.launcher.entity,\
48-
com.webank.wedatasphere.streamis.jobmanager.manager.entity,\
49-
com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo,\
50-
com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo,\
51-
com.webank.wedatasphere.streamis.projectmanager.entity
52-
53-
54-
wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasource.manager.dao,\
55-
org.apache.linkis.bml.dao,\
56-
com.webank.wedatasphere.streamis.project.server.dao,\
57-
com.webank.wedatasphere.streamis.jobmanager.launcher.dao,\
58-
com.webank.wedatasphere.streamis.jobmanager.manager.dao,\
59-
com.webank.wedatasphere.streamis.projectmanager.dao
60-
61-
# Make sure that can fetch the application info finally
62-
wds.streamis.application.info.fetch.max=20
1+
#
2+
# Copyright 2021 WeBank
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
#wds.linkis.test.mode=true
16+
wds.linkis.server.mybatis.datasource.url=jdbc:mysql://localhost:3306/streamis?characterEncoding=UTF-8
17+
wds.linkis.server.mybatis.datasource.username=user1
18+
19+
wds.linkis.server.mybatis.datasource.password=pwd1
20+
wds.linkis.gateway.ip=
21+
wds.linkis.gateway.port=
22+
wds.linkis.gateway.url=http://localhost:9001
23+
24+
wds.linkis.mysql.is.encrypt=false
25+
##restful
26+
wds.linkis.log.clear=true
27+
wds.linkis.server.version=v1
28+
#wds.linkis.test.user=user1
29+
30+
31+
32+
##restful
33+
wds.linkis.server.restful.scan.packages=com.webank.wedatasphere.streamis.datasource.server.restful.api,\
34+
com.webank.wedatasphere.streamis.project.server.restful,\
35+
com.webank.wedatasphere.streamis.jobmanager.restful.api,\
36+
com.webank.wedatasphere.streamis.datasource.execute.rest,\
37+
com.webank.wedatasphere.streamis.projectmanager.restful.api
38+
##mybatis
39+
wds.linkis.server.mybatis.mapperLocations=\
40+
classpath*:com/webank/wedatasphere/streamis/datasource/manager/dao/impl/*.xml,\
41+
classpath*:com/webank/wedatasphere/streamis/project/server/dao/impl/*.xml,\
42+
classpath*:com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/*.xml,\
43+
classpath*:com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/*.xml,\
44+
classpath*:com/webank/wedatasphere/streamis/projectmanager/dao/impl/*.xml
45+
46+
wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.streamis.datasource.manager.domain,\
47+
com.webank.wedatasphere.streamis.jobmanager.launcher.entity,\
48+
com.webank.wedatasphere.streamis.jobmanager.manager.entity,\
49+
com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo,\
50+
com.webank.wedatasphere.streamis.jobmanager.launcher.entity.vo,\
51+
com.webank.wedatasphere.streamis.projectmanager.entity
52+
53+
54+
wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.streamis.datasource.manager.dao,\
55+
org.apache.linkis.bml.dao,\
56+
com.webank.wedatasphere.streamis.project.server.dao,\
57+
com.webank.wedatasphere.streamis.jobmanager.launcher.dao,\
58+
com.webank.wedatasphere.streamis.jobmanager.manager.dao,\
59+
com.webank.wedatasphere.streamis.projectmanager.dao
60+
61+
# Make sure that can fetch the application info finally
62+
wds.streamis.application.info.fetch.max=20
63+
64+
# To use the complete features of streamis in linkis 1.1.2
65+
#wds.streamis.launch.flink.linkis.release.version=1.1.2

0 commit comments

Comments
 (0)