1
- #!/usr/bin/env python3
2
- # -*- coding: utf-8 -*-
3
- """
4
- Created on Fri Sep 9 20:54:46 2016
5
-
6
- @author: chernov
7
- """
1
+ """Dataforge envelope format parser utils."""
8
2
3
+ import json
9
4
import os
10
- import sys
11
5
import re
12
- import json
13
6
import struct
7
+ import sys
14
8
import time
15
9
16
- cur_dir = os .path .dirname (os .path .realpath (__file__ ))
17
- if not cur_dir in sys .path : sys .path .append (cur_dir )
18
- del cur_dir
10
+ CUR_DIR = os .path .dirname (os .path .realpath (__file__ ))
11
+ if CUR_DIR not in sys .path :
12
+ sys .path .append (CUR_DIR )
13
+ del CUR_DIR
14
+
15
+ from df_data .type_codes import header_types
16
+ from df_data .type_codes import meta_types
17
+
19
18
20
- from df_data .type_codes import meta_types , header_types
21
-
22
19
def create_message (json_meta : dict , data : bytearray = b'' ,
23
20
data_type : "binary_types" = 0 ) -> bytearray :
24
- """
25
- Create message, containing meta and data in df-envelope format
26
- @json_meta - metadata
27
- @data - binary data
28
- @data_type - data type code for binary data
29
- @return - message as bytearray
30
-
21
+ """Create message, containing meta and data in df-envelope format.
22
+
23
+ @json_meta - metadata
24
+ @data - binary data
25
+ @data_type - data type code for binary data
26
+ @return - message as bytearray
27
+
31
28
"""
32
29
__check_data (data )
33
-
30
+
34
31
header = __create_machine_header (json_meta , data , data_type )
35
32
meta = __prepare_meta (json_meta )
36
-
33
+
37
34
return header + meta + data
38
-
39
-
35
+
36
+
40
37
def parse_from_file (filename : str , nodata : bool = False ) \
41
- -> [dict , dict , bytearray ]:
42
- """
43
- Parse df message from file
44
- @filename - path to file
45
- @nodata - do not load data
46
- @return - [binary header, metadata, binary data]
47
-
38
+ -> [dict , dict , bytearray ]:
39
+ """Parse df message from file.
40
+
41
+ @filename - path to file
42
+ @nodata - do not load data
43
+ @return - [binary header, metadata, binary data]
44
+
48
45
"""
49
-
50
46
header = None
51
47
with open (filename , "rb" ) as file :
52
48
header = read_machine_header (file .read (30 ))
@@ -55,19 +51,18 @@ def parse_from_file(filename: str, nodata: bool=False) \
55
51
data = b''
56
52
if not nodata :
57
53
data = file .read (header ['data_len' ])
58
- return header , meta , data
59
-
60
-
54
+ return header , meta , data
55
+
56
+
61
57
def parse_message (message : bytearray , nodata : bool = False ) \
62
- -> [dict , dict , bytearray ]:
63
- """
64
- Parse df message from bytearray
65
- @message - message data
66
- @nodata - do not load data
67
- @return - [binary header, metadata, binary data]
68
-
58
+ -> [dict , dict , bytearray ]:
59
+ """Parse df message from bytearray.
60
+
61
+ @message - message data
62
+ @nodata - do not load data
63
+ @return - [binary header, metadata, binary data]
64
+
69
65
"""
70
-
71
66
header = read_machine_header (message )
72
67
meta_raw = message [30 :30 + header ['meta_len' ]]
73
68
meta = __parse_meta (meta_raw , header )
@@ -77,32 +72,32 @@ def parse_message(message: bytearray, nodata: bool=False) \
77
72
data = message [data_start :data_start + header ['data_len' ]]
78
73
return header , meta , data
79
74
80
-
75
+
81
76
def read_machine_header (data : bytearray ) -> dict :
77
+ """Parse binary header.
78
+
79
+ @data - bytearray, contains binary header
80
+ @return - parsed binary header
81
+
82
82
"""
83
- Parse binary header
84
- @data - bytearray, contains binary header
85
- @return - parsed binary header
86
- """
87
-
88
83
header = dict ()
89
84
header ['type' ] = struct .unpack ('>I' , data [2 :6 ])[0 ]
90
85
header ['time' ] = struct .unpack ('>I' , data [6 :10 ])[0 ]
91
86
header ['meta_type' ] = struct .unpack ('>I' , data [10 :14 ])[0 ]
92
87
header ['meta_len' ] = struct .unpack ('>I' , data [14 :18 ])[0 ]
93
88
header ['data_type' ] = struct .unpack ('>I' , data [18 :22 ])[0 ]
94
89
header ['data_len' ] = struct .unpack ('>I' , data [22 :26 ])[0 ]
95
-
90
+
96
91
return header
97
-
98
-
92
+
93
+
99
94
def get_messages_from_stream (data : bytearray ) \
100
- -> [[{dict , dict , bytearray }], bytearray ]:
101
- """
102
- Extract complete messages from stream and cut out them from stream
103
- @data - stream binary data
104
- @return - [list of messages, choped stream data]
105
-
95
+ -> [[{dict , dict , bytearray }], bytearray ]:
96
+ """Extract complete messages from stream and cut out them from stream.
97
+
98
+ @data - stream binary data
99
+ @return - [list of messages, choped stream data]
100
+
106
101
"""
107
102
messages = []
108
103
iterator = get_messages_from_stream .header_re .finditer (data )
@@ -115,63 +110,63 @@ def get_messages_from_stream(data: bytearray) \
115
110
116
111
if cur_last_pos > len (data ):
117
112
break
118
-
113
+
119
114
header , meta , bin_data = parse_message (data [pos :])
120
115
messages .append ({'header' : header , 'meta' : meta , 'data' : bin_data })
121
-
116
+
122
117
last_pos = cur_last_pos
123
-
118
+
124
119
data = data [last_pos :]
125
120
return messages , data
126
-
121
+
127
122
get_messages_from_stream .header_re = re .compile (b"#!.{24}!#" , re .DOTALL )
128
-
123
+
129
124
130
125
def __parse_meta (meta_raw , header ):
131
126
if header ["meta_type" ] == meta_types ["JSON_METATYPE" ]:
132
127
return json .loads (meta_raw .decode ())
133
128
else :
134
- err = "Parsing meta type %s not implemented" % (bin (header ["meta_type" ]))
129
+ err = "Parsing meta type %s not implemented" % \
130
+ (bin (header ["meta_type" ]))
135
131
raise NotImplementedError (err )
136
-
137
-
132
+
133
+
138
134
def __prepare_meta (json_meta ):
139
- if type (json_meta ) is dict :
135
+ if isinstance (json_meta , dict ) :
140
136
json_meta = json .dumps (json_meta , indent = 4 ).encode ()
141
137
json_meta += b'\r \n \r \n '
142
- elif not type (json_meta ) is str :
138
+ elif isinstance (json_meta , str ) :
143
139
raise ValueError ("Input meta should be dict or str" )
144
140
return json_meta
145
141
146
142
147
143
def __check_data (data ):
148
- if not type (data ) is bytes :
144
+ if isinstance (data , bytes ) :
149
145
raise ValueError ("Input data should have bytes type" )
150
-
146
+
151
147
152
148
def __create_machine_header (json_meta : dict , data : bytearray = b'' ,
153
149
data_type : "binary_types" = 0 ) -> bytearray :
154
-
150
+
155
151
json_meta = __prepare_meta (json_meta )
156
152
__check_data (data )
157
-
153
+
158
154
binary_header = b'#!'
159
-
160
- #binary header type
155
+
156
+ # binary header type
161
157
binary_header += struct .pack ('>I' , header_types ["DEFAULT" ])
162
158
millis = int (round (time .time () * 1000 ))
163
- #current time
159
+ # current time
164
160
binary_header += struct .pack ('>Q' , millis )[4 :]
165
- #meta type
161
+ # meta type
166
162
binary_header += struct .pack ('>I' , meta_types ["JSON_METATYPE" ])
167
- #meta length
163
+ # meta length
168
164
binary_header += struct .pack ('>I' , len (json_meta ))
169
- #data type
165
+ # data type
170
166
binary_header += struct .pack ('>I' , data_type )
171
- #data length
167
+ # data length
172
168
binary_header += struct .pack ('>I' , len (data ))
173
-
169
+
174
170
binary_header += b'!#\r \n '
175
-
176
- return binary_header
177
171
172
+ return binary_header
0 commit comments