Skip to content

Commit 7988270

Browse files
author
suyh
committed
读写消息
1 parent 04de350 commit 7988270

File tree

3 files changed

+253
-0
lines changed

3 files changed

+253
-0
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import requests
2+
3+
base_url = "http://192.168.8.56:8102"
4+
5+
def get_jobs_overview():
6+
try:
7+
url = base_url + "/jobs/overview"
8+
response = requests.get(url)
9+
if response.status_code == 200:
10+
return response.json()
11+
else:
12+
print(f"请求失败,状态码: {response.status_code}")
13+
return None
14+
except requests.RequestException as e:
15+
print(f"请求发生异常: {e}")
16+
return None
17+
18+
if __name__ == "__main__":
19+
jobs_info = get_jobs_overview()
20+
if jobs_info:
21+
flink_cluster_idle = True
22+
if 'jobs' in jobs_info:
23+
for job in jobs_info['jobs']:
24+
# INITIALIZING, CREATED, RUNNING, FAILING, CANCELLING, RESTARTING, SUSPENDED, RECONCILING
25+
# FAILED, CANCELED, FINISHED
26+
job_status = job.get('state')
27+
match job_status:
28+
case "INITIALIZING" | "CREATED" | "RUNNING" | "FAILING" | "CANCELLING" | "RESTARTING" | "SUSPENDED" | "RECONCILING":
29+
flink_cluster_idle = False
30+
print(f"集群中的作业,ID: {job.get('jid')},名称:{job.get('name')},状态:{job_status}")
31+
# case "FAILED" | "CANCELED" | "FINISHED":
32+
# case _:
33+
34+
if flink_cluster_idle:
35+
print("Flink cluster is idle")
36+
else:
37+
print("返回的数据中未包含 'jobs' 字段,无法解析作业信息。")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#!/bin/bash
2+
3+
#./ipcmqs -w --pns hy --date 20241102
4+
5+
# 1. 查询flink 集群是否空闲
6+
FLINK_CLUSTER_IDLE=false
7+
while read line; do
8+
if [[ $line =~ "Flink cluster is idle" ]]; then
9+
echo "Flink cluster is idle"
10+
FLINK_CLUSTER_IDLE=true
11+
fi
12+
done < <(python3 ./flink_cluster_job_info.py)
13+
14+
if [[ ! "${FLINK_CLUSTER_IDLE}x" = "true"x ]]; then
15+
echo "Flink cluster is busy"
16+
exit 0
17+
fi
18+
19+
# flink 集群空闲中
20+
# 从消息队列中取出 flink 作业启动的参数,并提交作业
21+
DATES=""
22+
PNS=""
23+
CHANNEL_LIST=""
24+
while read line; do
25+
if [[ $line =~ ^dates:\ (.*)$ ]]; then
26+
DATES=$(echo ${BASH_REMATCH[1]})
27+
elif [[ $line =~ ^pns:\ (.*)$ ]]; then
28+
PNS=$(echo ${BASH_REMATCH[1]})
29+
elif [[ $line =~ ^channelList:\ (.*)$ ]]; then
30+
CHANNEL_LIST=$(echo ${BASH_REMATCH[1]})
31+
fi
32+
done < <(./ipcmqs -r)
33+
34+
# 消息队列中没有数据
35+
if [[ "${DATES}"x = ""x ]]; then
36+
echo "ipcs mq is empty or dates is null"
37+
exit 0
38+
fi
39+
40+
echo "DATES: ${DATES}, PNS: ${PNS}, CHANNEL_LIST: ${CHANNEL_LIST}"
41+
42+
cd /home/suyunhong/flink/flink-repetition/flink-1.18.0
43+
./bin/flink run -p 8 -d job-jar/cdap-repetition-job-1.8.0.jar
44+
cd -
45+
46+
47+
#TZ='Asia/Shanghai' date +%Y%m%d
48+
#TZ='Asia/Tokyo' date +%Y%m%d
49+
#TZ='Asia/Kolkata' date +%Y%m%d

tmp-flink-queue/v002/ipcmqs.c

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <sys/types.h>
4+
#include <sys/ipc.h>
5+
#include <sys/msg.h>
6+
#include <unistd.h>
7+
#include <getopt.h>
8+
#include <string.h>
9+
#include <errno.h>
10+
11+
struct msgbuf {
12+
long mtype;
13+
char mdata[4098];
14+
};
15+
16+
#define MSG_TYPE 1
17+
#define OTHER_OPERATION 'o'
18+
#define READ_OPERATION 'r'
19+
#define WRITE_OPERATION 'w'
20+
21+
// 打印使用帮助信息
22+
void usage() {
23+
printf("Usage:./ipc_msg_queue [-r|-w] [--dates <date>] [--pns <pns>] [--channelList <channels>]\n");
24+
printf("Options:\n");
25+
printf(" -r, --read Read from the message queue\n");
26+
printf(" -w, --write Write to the message queue\n");
27+
printf(" -d, --dates Specify date\n");
28+
printf(" -p, --pns Specify pns\n");
29+
printf(" -c, --channelList Specify channel list\n");
30+
exit(EXIT_FAILURE);
31+
}
32+
33+
// 格式化消息并返回消息长度
34+
int formatMessage(char *buffer, const char *date, const char *pns, const char *channelList) {
35+
int offset = 0;
36+
int dateLen = strlen(date);
37+
*((int*)(buffer + offset)) = dateLen;
38+
offset += sizeof(int);
39+
strcpy(buffer + offset, date);
40+
offset += dateLen;
41+
42+
int pnsLen = strlen(pns);
43+
*((int*)(buffer + offset)) = pnsLen;
44+
offset += sizeof(int);
45+
strcpy(buffer + offset, pns);
46+
offset += pnsLen;
47+
48+
int channelListLen = strlen(channelList);
49+
*((int*)(buffer + offset)) = channelListLen;
50+
offset += sizeof(int);
51+
strcpy(buffer + offset, channelList);
52+
offset += channelListLen;
53+
54+
return offset;
55+
}
56+
57+
// 解析读取到的消息
58+
void parseReceivedMessage(const char *buffer, char *date, char *pns, char *channelList) {
59+
int offset = 0;
60+
int dateLen = *((int*)(buffer + offset));
61+
offset += sizeof(int);
62+
strncpy(date, buffer + offset, dateLen);
63+
date[dateLen] = '\0';
64+
offset += dateLen;
65+
66+
int pnsLen = *((int*)(buffer + offset));
67+
offset += sizeof(int);
68+
strncpy(pns, buffer + offset, pnsLen);
69+
pns[pnsLen] = '\0';
70+
offset += pnsLen;
71+
72+
int channelListLen = *((int*)(buffer + offset));
73+
offset += sizeof(int);
74+
strncpy(channelList, buffer + offset, channelListLen);
75+
channelList[channelListLen] = '\0';
76+
}
77+
78+
// ./ipcmqs -w --pns hy --date 20241102
79+
int main(int argc, char *argv[]) {
80+
int opt;
81+
int operation = OTHER_OPERATION;
82+
char date[9] = "";
83+
char pns[800] = "";
84+
char channelList[2048] = "";
85+
struct option long_options[] = {
86+
{"read", no_argument, 0, 'r'},
87+
{"write", no_argument, 0, 'w'},
88+
{"dates", required_argument, 0, 'd'},
89+
{"pns", required_argument, 0, 'p'},
90+
{"channelList", no_argument, 0, 'c'},
91+
{0, 0, 0, 0}
92+
};
93+
94+
while ((opt = getopt_long(argc, argv, "rw", long_options, NULL))!= -1) {
95+
switch (opt) {
96+
case 'r':
97+
operation = READ_OPERATION;
98+
break;
99+
case 'w':
100+
operation = WRITE_OPERATION;
101+
break;
102+
case 'd':
103+
strcpy(date, optarg);
104+
break;
105+
case 'p':
106+
strcpy(pns, optarg);
107+
break;
108+
case 'c':
109+
strcpy(channelList, optarg);
110+
break;
111+
default:
112+
usage();
113+
}
114+
}
115+
116+
if (operation == OTHER_OPERATION) {
117+
usage();
118+
}
119+
120+
// 使用ftok生成key,这里使用当前目录和一个字符常量作为参数,可根据实际情况调整
121+
key_t key = ftok("/tmp", 'a');
122+
if (key == -1) {
123+
perror("ftok failed");
124+
exit(EXIT_FAILURE);
125+
}
126+
127+
// 获取消息队列标识符,如果不存在则创建
128+
int msgid = msgget(key, IPC_CREAT | 0666);
129+
if (msgid == -1) {
130+
perror("msgget failed");
131+
exit(EXIT_FAILURE);
132+
}
133+
134+
if (operation == READ_OPERATION) {
135+
struct msgbuf buffer;
136+
if (msgrcv(msgid, &buffer, sizeof(buffer.mdata), 0, IPC_NOWAIT) == -1) {
137+
perror("msgrcv");
138+
if (errno!= ENOMSG) {
139+
// 如果errno不是ENOMSG,表示是其他错误,以失败状态退出
140+
exit(EXIT_FAILURE);
141+
}
142+
// 如果errno是ENOMSG,表示消息队列为空,输出提示信息并以成功状态退出
143+
printf("There is no message in the queue, exiting...\n");
144+
exit(EXIT_SUCCESS);
145+
}
146+
147+
parseReceivedMessage(buffer.mdata, date, pns, channelList);
148+
printf("dates: %s\n", date);
149+
printf("pns: %s\n", pns);
150+
printf("channelList: %s\n", channelList);
151+
} else if (operation == WRITE_OPERATION) {
152+
if (strlen(date) == 0 || strlen(pns) == 0) {
153+
fprintf(stderr, "Dates and PNs cannot be empty when writing to the message queue.\n");
154+
exit(EXIT_FAILURE);
155+
}
156+
struct msgbuf buffer;
157+
buffer.mtype = MSG_TYPE;
158+
int msgLength = formatMessage(buffer.mdata, date, pns, channelList);
159+
if (msgsnd(msgid, &buffer, msgLength, 0) == -1) {
160+
perror("msgsnd");
161+
exit(EXIT_FAILURE);
162+
}
163+
printf("Message sent successfully.\n");
164+
}
165+
166+
return 0;
167+
}

0 commit comments

Comments
 (0)