-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_downlink_protocol.py
164 lines (130 loc) · 5.34 KB
/
test_downlink_protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from CCSDS_Packet import CCSDS_Chunk_Packet, CCSDS_Control_Packet
from Mission_Parameters import *
import subprocess
import serial
import pprint
import time
import os
# Given mission folder path, obtain list of images path
def obtain_downlink_images_filepaths(mission_folder_path):
list_filepaths = []
for file in os.listdir(mission_folder_path):
list_filepaths.append(mission_folder_path + '/' + file)
return list_filepaths
# Given filepath to image, compress and encode the image bytes and return the bytes
def extract_enc_img_bytes(img_filepath):
# Grant permission to script if not already done so
os.chmod("prep_test.sh", 0o777)
os.chmod("cleanup.sh", 0o777)
# Call bash script to execute prep script
# base64 + gzip
prep_filepath = './prep_test.sh ' + img_filepath
subprocess.call(prep_filepath, stdout=subprocess.DEVNULL, shell=True)
# Open and read in the image
with open('base_enc.gz', 'rb') as file:
compressed_enc = file.read()
file.close()
# Call bash script to remove currently created compressed files
subprocess.call('./cleanup.sh base_enc.gz',
stdout=subprocess.DEVNULL, shell=True)
return compressed_enc
def prepare_tx_batch(enc_img_bytes):
# Returns a list of chunks of bytes, given chunk size and bytearray
def chop_bytes(bytes_arr, chunk_size):
chunk_arr = []
idx = 0
while idx + chunk_size < len(bytes_arr):
chunk_arr.append(bytes_arr[idx:idx + chunk_size])
idx = idx + chunk_size
# Remaining odd sized chunk
chunk_arr.append(bytes_arr[idx:])
return chunk_arr
# Given an list of chunks, split them into list of batches given a batch size
def split_batch(chunks_arr, batch_size):
batch_arr = []
idx = 0
while idx + batch_size <= len(chunks_arr):
batch_arr.append(chunks_arr[idx:idx + batch_size])
idx = idx + batch_size
# Remaining odd sized chunk
batch_arr.append(chunks_arr[idx:])
return batch_arr
# Chop up bytes into chunks and prepare CCSDS packet
chunk_list = chop_bytes(enc_img_bytes, PRE_ENC_CHUNK_SIZE)
# Split chunks into batches
batch_list = split_batch(chunk_list, BATCH_SIZE)
# Create CCSDS Packets
packet_batch_list = []
packet_seq_num = 1
for batch_num in range(len(batch_list)):
batch = batch_list[batch_num]
chunk_num = 1
new_batch = []
for chunk in batch:
# Create CCSDS Packet for each chunk
packet = CCSDS_Chunk_Packet(
packet_seq_num, TELEMETRY_PACKET_TYPE_DOWNLINK_PACKET, batch_num+1, chunk_num, chunk)
new_batch.append(packet)
packet_seq_num += 1
chunk_num += 1
# Put stop packet at all batches but not the last batch
if batch_num != len(batch_list)-1:
stop_packet = CCSDS_Control_Packet(
packet_seq_num, TELEMETRY_PACKET_TYPE_DOWNLINK_STOP, 0, 0)
new_batch.append(stop_packet)
packet_seq_num += 1
packet_batch_list.append(new_batch)
return packet_batch_list
def main():
ser_downlink = serial.Serial(
"/dev/ttyUSB0", baudrate=115200, timeout=TIMEOUT_TX)
# Get list of files to downlink
filepath_list = obtain_downlink_images_filepaths(TEST_FILEPATH)
pprint.pprint(filepath_list) # Print list of files for debug
# Handle downlink of each image
total_img_num = len(filepath_list)
for filepath in filepath_list:
enc_img_bytes = extract_enc_img_bytes(filepath)
batches = prepare_tx_batch(enc_img_bytes)
print(f"Number of batches: {len(batches)}")
start_packet = CCSDS_Control_Packet(
0, TELEMETRY_PACKET_TYPE_DOWNLINK_START, len(enc_img_bytes), len(batches))
ser_downlink.write(start_packet.get_tx_packet())
time.sleep(TIME_SLEEP_AFTER_START)
is_resend = False
batch_num = 0
while batch_num < len(batches):
batch = batches[batch_num]
packet_count = 1
for i in range(len(batch)):
packet = batch[i]
# Do batch send - 5 packets then a stop packet
if isinstance(packet, CCSDS_Chunk_Packet):
print(
f"Sending: packet {packet_count} from batch {batch_num+1}")
elif isinstance(packet, CCSDS_Control_Packet):
print(f"Sending: Stop packet from batch {batch_num+1}")
print(packet)
packet_count += 1
ser_downlink.write(packet.get_tx_packet())
if batch_num < len(batches) - 1:
print("Wait for ack/nack")
ack = ser_downlink.readline()
print(ack)
if ack == b"nack\r\n" or ack == b"":
print("Nack or timeout")
is_resend = True
ser_downlink.flush()
else:
print(f"Received {ack}")
batch_num += 1
print()
time.sleep(TIME_BETWEEN_PACKETS * 5)
if is_resend:
is_resend = False
print(f"Resending = {batch_num+1}")
else:
break
print("done!")
if __name__ == "__main__":
main()