Skip to content

Commit 6c18e79

Browse files
committed
Merge branch 'dev-0.2.5-jobmanager' of github.com:WeDataSphere/Streamis into dev-0.2.5-jobmanager
2 parents 9c8e9b3 + a7d0200 commit 6c18e79

File tree

18 files changed

+137
-47
lines changed

18 files changed

+137
-47
lines changed

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-service/src/main/java/com/webank/wedatasphere/streamis/jobmanager/launcher/dao/impl/StreamJobConfMapper.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
</delete>
6161

6262
<delete id="deleteTemporaryConfValue">
63-
DELETE c FROM `linkis_stream_job_config` c INNER JOIN `linkis_stream_job_config_def` d ON c.job_id = 0 AND d.id = c.ref_def_id AND d.is_temp = 1;
63+
DELETE c FROM `linkis_stream_job_config` c INNER JOIN `linkis_stream_job_config_def` d ON c.job_id = #{jobId} AND d.id = c.ref_def_id AND d.is_temp = 1;
6464
</delete>
6565
<insert id="batchInsertValues">
6666
INSERT INTO `linkis_stream_job_config`(`job_id`, `job_name`, `key`, `value`, `ref_def_id`) VALUES

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/conf/JobConfKeyConstants.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,9 @@ object JobConfKeyConstants {
8989
* Alert level
9090
*/
9191
val ALERT_LEVEL: CommonVars[String] = CommonVars("wds.streamis.job.config.key.alert.level", "wds.linkis.flink.alert.level")
92+
93+
/**
94+
* Material model
95+
*/
96+
val MATERIAL_MODEL: CommonVars[String] = CommonVars("wds.streamis.job.config.key.material.model", "wds.streamis.job.material.model")
9297
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.dao;
2+
3+
/**
4+
* Mapper of stream file(material)
5+
*/
6+
public interface StreamFileMapper {
7+
8+
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,17 @@
3131
<properties>
3232
<maven.compiler.source>8</maven.compiler.source>
3333
<maven.compiler.target>8</maven.compiler.target>
34+
<junit.version>4.12</junit.version>
3435
</properties>
3536

3637
<dependencies>
38+
<!--Junit-->
39+
<dependency>
40+
<groupId>junit</groupId>
41+
<artifactId>junit</artifactId>
42+
<version>${junit.version}</version>
43+
<scope>test</scope>
44+
</dependency>
3745
<dependency>
3846
<groupId>com.webank.wedatasphere.streamis</groupId>
3947
<artifactId>streamis-job-manager-base</artifactId>

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/conf/JobConf.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ object JobConf {
3939
*/
4040
val STREAMIS_JOB_LOG_COLLECT_PATH: CommonVars[String] = CommonVars("wds.streamis.job.log.collect.path", "/api/rest_j/v1/streamis/streamJobManager/log/collect/events")
4141

42+
/**
43+
* Enable to use material container
44+
*/
45+
val STREAMIS_JOB_MATERIAL_CONTAINER_ENABLE: CommonVars[Boolean] = CommonVars("wds.streamis.job.material.container.enable", false)
46+
4247
val FLINK_JOB_STATUS_NOT_STARTED: CommonVars[Int] = CommonVars("wds.streamis.job.status.not-started", 0,"Not Started")
4348

4449
val FLINK_JOB_STATUS_COMPLETED: CommonVars[Int] = CommonVars("wds.streamis.job.status.completed", 1,"Completed")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.material
2+
3+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamisFile
4+
import java.util
5+
/**
6+
* Define the stream file container
7+
*/
8+
trait StreamFileContainer {
9+
/**
10+
* Container name
11+
* @return
12+
*/
13+
def getContainerName: String
14+
15+
/**
16+
* Get stream files
17+
* @return
18+
*/
19+
def getStreamFiles: util.List[StreamisFile]
20+
21+
/**
22+
* Get stream files by match function
23+
* @param matchFunc match function
24+
* @return
25+
*/
26+
def getStreamFiles(matchFunc: StreamisFile => Boolean): util.List[StreamisFile]
27+
28+
/**
29+
* Get stream file by basename, model name and suffix
30+
* @param name name
31+
* @param model model
32+
* @param suffix suffix
33+
* @return
34+
*/
35+
def getStreamFile(name: String, model: String, suffix: String): StreamisFile
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.material
2+
3+
import org.apache.linkis.common.conf.CommonVars
4+
5+
trait StreamFileLocalContainer extends StreamFileContainer {
6+
7+
}
8+
9+
object StreamFileLocalContainer{
10+
11+
val STORE_PATH: CommonVars[String] = CommonVars("wds.streamis.job.material.container.local.store-path", "material")
12+
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/TaskMonitorService.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class TaskMonitorService extends Logging {
111111
// 连续三次还是出现异常,说明Linkis的Manager已经不能正常提供服务,告警并不再尝试获取状态,等待下次尝试
112112
val users = getAlertUsers(job)
113113
users.add(job.getCreateBy)
114-
alert(jobService.getAlertLevel(job), s"请求LinkisManager失败,Linkis集群出现异常,请关注!影响任务[${job.getName}]", users, streamTask)
114+
alert(jobService.getAlertLevel(job), s"请求LinkisManager失败,Linkis集群出现异常,请关注!影响任务[${job.getName}],请联系用户[${job.getSubmitUser}]进行处理!", users, streamTask)
115115
}
116116
}
117117
streamTaskMapper.updateTask(streamTask)
@@ -163,6 +163,8 @@ class TaskMonitorService extends Logging {
163163
var users = jobService.getAlertUsers(job)
164164
if (users == null) {
165165
users = new util.ArrayList[String]()
166+
} else {
167+
users = new util.ArrayList[String](users)
166168
}
167169
users.addAll(util.Arrays.asList(JobConf.STREAMIS_DEVELOPER.getValue.split(","):_*))
168170
users
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.material
2+
3+
import org.junit.Test
4+
5+
6+
class StreamFileContainerTest {
7+
@Test
8+
def scanAndLoadTheFile(): Unit = {
9+
print("hello")
10+
}
11+
}

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobBulkRestfulApi.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.apache.commons.lang3.StringUtils;
1515
import org.apache.linkis.scheduler.queue.SchedulerEventState;
1616
import org.apache.linkis.server.Message;
17-
import org.apache.linkis.server.security.SecurityFilter;
17+
import org.apache.linkis.server.utils.ModuleUserUtils;
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
2020
import org.springframework.web.bind.annotation.RequestBody;
@@ -60,7 +60,7 @@ public Message bulkExecution(@RequestBody JobBulkRequest execBulkRequest, HttpSe
6060
}
6161
Message result = Message.ok("success");
6262
try{
63-
String username = SecurityFilter.getLoginUsername(request);
63+
String username = ModuleUserUtils.getOperationUser(request, "bulk execute job");
6464
LOG.info("Bulk execution[operator: {} sbj_type: {}, subjects: ({})]", username,
6565
execBulkRequest.getBulkSubjectType(), StringUtils.join(execBulkRequest.getBulkSubject(), ","));
6666
// TODO Check the permission of task id
@@ -113,7 +113,7 @@ public Message bulkPause(@RequestBody JobBulkPauseRequest pauseRequest, HttpServ
113113
}
114114
Message result = Message.ok("success");
115115
try{
116-
String username = SecurityFilter.getLoginUsername(request);
116+
String username = ModuleUserUtils.getOperationUser(request, "bulk pause job");
117117
LOG.info("Bulk pause[operator: {}, sbj_type: {}, snapshot: {}, subjects: ({})]",
118118
username, pauseRequest.getBulkSubjectType(), pauseRequest.isSnapshot(),
119119
StringUtils.join(pauseRequest.getBulkSubject(), ","));

0 commit comments

Comments
 (0)