88from datetime import datetime
99from typing import List , Optional , Tuple , Union
1010
11- import snap7
12- from snap7 . common import check_error , ipv4 , load_library
13- from snap7 .types import S7SZL , Areas , BlocksList , S7CpInfo , S7CpuInfo , S7DataItem
14- from snap7 .types import S7OrderCode , S7Protection , S7SZLList , TS7BlockInfo , WordLen
15- from snap7 .types import S7Object , buffer_size , buffer_type , cpu_statuses , param_types
11+ from . common import check_error , ipv4 , load_library
12+ from . types import S7SZL , Areas , BlocksList , S7CpInfo , S7CpuInfo , S7DataItem
13+ from .types import S7OrderCode , S7Protection , S7SZLList , TS7BlockInfo , WordLen
14+ from .types import S7Object , buffer_size , buffer_type , cpu_statuses , param_types
15+ from .types import S7CpuInfo , RemotePort , wordlen_to_ctypes , block_types
1616logger = logging .getLogger (__name__ )
1717
1818
@@ -154,7 +154,7 @@ def get_cpu_info(self) -> S7CpuInfo:
154154 ASName: b'SNAP7-SERVER' Copyright: b'Original Siemens Equipment'
155155 ModuleName: b'CPU 315-2 PN/DP'>
156156 """
157- info = snap7 . types . S7CpuInfo ()
157+ info = S7CpuInfo ()
158158 result = self ._library .Cli_GetCpuInfo (self ._pointer , byref (info ))
159159 check_error (result , context = "client" )
160160 return info
@@ -189,7 +189,7 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int
189189 """
190190 logger .info (f"connecting to { address } :{ tcpport } rack { rack } slot { slot } " )
191191
192- self .set_param (snap7 . types . RemotePort , tcpport )
192+ self .set_param (RemotePort , tcpport )
193193 return self ._library .Cli_ConnectTo (
194194 self ._pointer , c_char_p (address .encode ()),
195195 c_int (rack ), c_int (slot ))
@@ -218,7 +218,7 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray:
218218 """
219219 logger .debug (f"db_read, db_number:{ db_number } , start:{ start } , size:{ size } " )
220220
221- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
221+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
222222 data = (type_ * size )()
223223 result = (self ._library .Cli_DBRead (
224224 self ._pointer , db_number , start , size ,
@@ -246,7 +246,7 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int:
246246 >>> client.db_write(1, 10, buffer) # writes the bit number 0 from the byte 10 to TRUE.
247247 """
248248 wordlen = WordLen .Byte
249- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
249+ type_ = wordlen_to_ctypes [wordlen .value ]
250250 size = len (data )
251251 cdata = (type_ * size ).from_buffer_copy (data )
252252 logger .debug (f"db_write db_number:{ db_number } start:{ start } size:{ size } data:{ data } " )
@@ -264,7 +264,7 @@ def delete(self, block_type: str, block_num: int) -> int:
264264 Error code from snap7 library.
265265 """
266266 logger .info ("deleting block" )
267- blocktype = snap7 . types . block_types [block_type ]
267+ blocktype = block_types [block_type ]
268268 result = self ._library .Cli_Delete (self ._pointer , blocktype , block_num )
269269 return result
270270
@@ -282,7 +282,7 @@ def full_upload(self, _type: str, block_num: int) -> Tuple[bytearray, int]:
282282 """
283283 _buffer = buffer_type ()
284284 size = c_int (sizeof (_buffer ))
285- block_type = snap7 . types . block_types [_type ]
285+ block_type = block_types [_type ]
286286 result = self ._library .Cli_FullUpload (self ._pointer , block_type ,
287287 block_num , byref (_buffer ),
288288 byref (size ))
@@ -302,7 +302,7 @@ def upload(self, block_num: int) -> bytearray:
302302 Buffer with the uploaded block.
303303 """
304304 logger .debug (f"db_upload block_num: { block_num } " )
305- block_type = snap7 . types . block_types ['DB' ]
305+ block_type = block_types ['DB' ]
306306 _buffer = buffer_type ()
307307 size = c_int (sizeof (_buffer ))
308308
@@ -383,19 +383,19 @@ def read_area(self, area: Areas, dbnumber: int, start: int, size: int) -> bytear
383383 >>> import snap7
384384 >>> client = snap7.client.Client()
385385 >>> client.connect("192.168.0.1", 0, 0)
386- >>> buffer = client.read_area(snap7.types. Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14.
386+ >>> buffer = client.read_area(Areas.DB, 1, 10, 4) # Reads the DB number 1 from the byte 10 to the byte 14.
387387 >>> buffer
388388 bytearray(b'\\ x00\\ x00')
389389 """
390390 if area not in Areas :
391- raise ValueError (f"{ area } is not implemented in snap7. types" )
391+ raise ValueError (f"{ area } is not implemented in types" )
392392 elif area == Areas .TM :
393393 wordlen = WordLen .Timer
394394 elif area == Areas .CT :
395395 wordlen = WordLen .Counter
396396 else :
397397 wordlen = WordLen .Byte
398- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
398+ type_ = wordlen_to_ctypes [wordlen .value ]
399399 logger .debug (f"reading area: { area .name } dbnumber: { dbnumber } start: { start } : amount { size } : wordlen: { wordlen .name } ={ wordlen .value } " )
400400 data = (type_ * size )()
401401 result = self ._library .Cli_ReadArea (self ._pointer , area .value , dbnumber , start ,
@@ -421,15 +421,15 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) ->
421421 >>> client = snap7.client.Client()
422422 >>> client.connect("192.168.0.1", 0, 0)
423423 >>> buffer = bytearray([0b00000001])
424- >>> client.write_area(snap7.types. Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE.
424+ >>> client.write_area(Areas.DB, 1, 10, buffer) # Writes the bit 0 of the byte 10 from the DB number 1 to TRUE.
425425 """
426426 if area == Areas .TM :
427427 wordlen = WordLen .Timer
428428 elif area == Areas .CT :
429429 wordlen = WordLen .Counter
430430 else :
431431 wordlen = WordLen .Byte
432- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
432+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
433433 size = len (data )
434434 logger .debug (f"writing area: { area .name } dbnumber: { dbnumber } start: { start } : size { size } : "
435435 f"wordlen { wordlen .name } ={ wordlen .value } type: { type_ } " )
@@ -483,7 +483,7 @@ def list_blocks_of_type(self, blocktype: str, size: int) -> Union[int, Array]:
483483 :obj:`ValueError`: if the `blocktype` is not valid.
484484 """
485485
486- _blocktype = snap7 . types . block_types .get (blocktype )
486+ _blocktype = block_types .get (blocktype )
487487 if not _blocktype :
488488 raise ValueError ("The blocktype parameter was invalid" )
489489
@@ -536,7 +536,7 @@ def get_block_info(self, blocktype: str, db_number: int) -> TS7BlockInfo:
536536 Family: b''
537537 Header: b''
538538 """
539- blocktype_ = snap7 . types . block_types .get (blocktype )
539+ blocktype_ = block_types .get (blocktype )
540540
541541 if not blocktype_ :
542542 raise ValueError ("The blocktype parameter was invalid" )
@@ -639,7 +639,7 @@ def ab_read(self, start: int, size: int) -> bytearray:
639639 Buffer with the data read.
640640 """
641641 wordlen = WordLen .Byte
642- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
642+ type_ = wordlen_to_ctypes [wordlen .value ]
643643 data = (type_ * size )()
644644 logger .debug (f"ab_read: start: { start } : size { size } : " )
645645 result = self ._library .Cli_ABRead (self ._pointer , start , size ,
@@ -658,7 +658,7 @@ def ab_write(self, start: int, data: bytearray) -> int:
658658 Snap7 code.
659659 """
660660 wordlen = WordLen .Byte
661- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
661+ type_ = wordlen_to_ctypes [wordlen .value ]
662662 size = len (data )
663663 cdata = (type_ * size ).from_buffer_copy (data )
664664 logger .debug (f"ab write: start: { start } : size: { size } : " )
@@ -693,7 +693,7 @@ def as_ab_write(self, start: int, data: bytearray) -> int:
693693 Snap7 code.
694694 """
695695 wordlen = WordLen .Byte
696- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
696+ type_ = wordlen_to_ctypes [wordlen .value ]
697697 size = len (data )
698698 cdata = (type_ * size ).from_buffer_copy (data )
699699 logger .debug (f"ab write: start: { start } : size: { size } : " )
@@ -754,7 +754,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
754754 Returns:
755755 Snap7 code.
756756 """
757- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Counter .value ]
757+ type_ = wordlen_to_ctypes [WordLen .Counter .value ]
758758 cdata = (type_ * amount ).from_buffer_copy (data )
759759 result = self ._library .Cli_AsCTWrite (self ._pointer , start , amount , byref (cdata ))
760760 check_error (result , context = "client" )
@@ -991,14 +991,14 @@ def wait_as_completion(self, timeout: int) -> int:
991991
992992 def _prepare_as_read_area (self , area : Areas , size : int ) -> Tuple [WordLen , Array ]:
993993 if area not in Areas :
994- raise ValueError (f"{ area } is not implemented in snap7. types" )
994+ raise ValueError (f"{ area } is not implemented in types" )
995995 elif area == Areas .TM :
996996 wordlen = WordLen .Timer
997997 elif area == Areas .CT :
998998 wordlen = WordLen .Counter
999999 else :
10001000 wordlen = WordLen .Byte
1001- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
1001+ type_ = wordlen_to_ctypes [wordlen .value ]
10021002 usrdata = (type_ * size )()
10031003 return wordlen , usrdata
10041004
@@ -1024,14 +1024,14 @@ def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordle
10241024
10251025 def _prepare_as_write_area (self , area : Areas , data : bytearray ) -> Tuple [WordLen , Array ]:
10261026 if area not in Areas :
1027- raise ValueError (f"{ area } is not implemented in snap7. types" )
1027+ raise ValueError (f"{ area } is not implemented in types" )
10281028 elif area == Areas .TM :
10291029 wordlen = WordLen .Timer
10301030 elif area == Areas .CT :
10311031 wordlen = WordLen .Counter
10321032 else :
10331033 wordlen = WordLen .Byte
1034- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1034+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
10351035 cdata = (type_ * len (data )).from_buffer_copy (data )
10361036 return wordlen , cdata
10371037
@@ -1049,7 +1049,7 @@ def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordl
10491049 Returns:
10501050 Snap7 code.
10511051 """
1052- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1052+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
10531053 logger .debug (f"writing area: { area .name } dbnumber: { dbnumber } start: { start } : size { size } : "
10541054 f"wordlen { wordlen } type: { type_ } " )
10551055 cdata = (type_ * len (pusrdata )).from_buffer_copy (pusrdata )
@@ -1083,7 +1083,7 @@ def as_eb_write(self, start: int, size: int, data: bytearray) -> int:
10831083 Returns:
10841084 Snap7 code.
10851085 """
1086- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1086+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
10871087 cdata = (type_ * size ).from_buffer_copy (data )
10881088 result = self ._library .Cli_AsEBWrite (self ._pointer , start , size , byref (cdata ))
10891089 check_error (result , context = "client" )
@@ -1104,7 +1104,7 @@ def as_full_upload(self, _type: str, block_num: int) -> int:
11041104 """
11051105 _buffer = buffer_type ()
11061106 size = c_int (sizeof (_buffer ))
1107- block_type = snap7 . types . block_types [_type ]
1107+ block_type = block_types [_type ]
11081108 result = self ._library .Cli_AsFullUpload (self ._pointer , block_type , block_num , byref (_buffer ), byref (size ))
11091109 check_error (result , context = "client" )
11101110 return result
@@ -1123,7 +1123,7 @@ def as_list_blocks_of_type(self, blocktype: str, data, count) -> int:
11231123 Raises:
11241124 :obj:`ValueError`: if the `blocktype` is invalid
11251125 """
1126- _blocktype = snap7 . types . block_types .get (blocktype )
1126+ _blocktype = block_types .get (blocktype )
11271127 if not _blocktype :
11281128 raise ValueError ("The blocktype parameter was invalid" )
11291129 result = self ._library .Cli_AsListBlocksOfType (self ._pointer , _blocktype , byref (data ), byref (count ))
@@ -1156,7 +1156,7 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
11561156 Returns:
11571157 Snap7 code.
11581158 """
1159- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1159+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
11601160 cdata = (type_ * size ).from_buffer_copy (data )
11611161 result = self ._library .Cli_AsMBWrite (self ._pointer , start , size , byref (cdata ))
11621162 check_error (result , context = "client" )
@@ -1218,7 +1218,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int:
12181218 Returns:
12191219 Snap7 code.
12201220 """
1221- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Timer .value ]
1221+ type_ = wordlen_to_ctypes [WordLen .Timer .value ]
12221222 cdata = (type_ * amount ).from_buffer_copy (data )
12231223 result = self ._library .Cli_AsTMWrite (self ._pointer , start , amount , byref (cdata ))
12241224 check_error (result )
@@ -1238,7 +1238,7 @@ def as_upload(self, block_num: int, _buffer, size) -> int:
12381238 Returns:
12391239 Snap7 code.
12401240 """
1241- block_type = snap7 . types . block_types ['DB' ]
1241+ block_type = block_types ['DB' ]
12421242 result = self ._library .Cli_AsUpload (self ._pointer , block_type , block_num , byref (_buffer ), byref (size ))
12431243 check_error (result , context = "client" )
12441244 return result
@@ -1266,7 +1266,7 @@ def ct_read(self, start: int, amount: int) -> bytearray:
12661266 Returns:
12671267 Buffer read.
12681268 """
1269- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Counter .value ]
1269+ type_ = wordlen_to_ctypes [WordLen .Counter .value ]
12701270 data = (type_ * amount )()
12711271 result = self ._library .Cli_CTRead (self ._pointer , start , amount , byref (data ))
12721272 check_error (result , context = "client" )
@@ -1283,7 +1283,7 @@ def ct_write(self, start: int, amount: int, data: bytearray) -> int:
12831283 Returns:
12841284 Snap7 code.
12851285 """
1286- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Counter .value ]
1286+ type_ = wordlen_to_ctypes [WordLen .Counter .value ]
12871287 cdata = (type_ * amount ).from_buffer_copy (data )
12881288 result = self ._library .Cli_CTWrite (self ._pointer , start , amount , byref (cdata ))
12891289 check_error (result )
@@ -1313,7 +1313,7 @@ def eb_read(self, start: int, size: int) -> bytearray:
13131313 Returns:
13141314 Data read.
13151315 """
1316- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1316+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
13171317 data = (type_ * size )()
13181318 result = self ._library .Cli_EBRead (self ._pointer , start , size , byref (data ))
13191319 check_error (result , context = "client" )
@@ -1330,7 +1330,7 @@ def eb_write(self, start: int, size: int, data: bytearray) -> int:
13301330 Returns:
13311331 Snap7 code.
13321332 """
1333- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1333+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
13341334 cdata = (type_ * size ).from_buffer_copy (data )
13351335 result = self ._library .Cli_EBWrite (self ._pointer , start , size , byref (cdata ))
13361336 check_error (result )
@@ -1450,7 +1450,7 @@ def mb_read(self, start: int, size: int) -> bytearray:
14501450 Returns:
14511451 Buffer with the data read.
14521452 """
1453- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1453+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
14541454 data = (type_ * size )()
14551455 result = self ._library .Cli_MBRead (self ._pointer , start , size , byref (data ))
14561456 check_error (result , context = "client" )
@@ -1467,7 +1467,7 @@ def mb_write(self, start: int, size: int, data: bytearray) -> int:
14671467 Returns:
14681468 Snap7 code.
14691469 """
1470- type_ = snap7 . types . wordlen_to_ctypes [WordLen .Byte .value ]
1470+ type_ = wordlen_to_ctypes [WordLen .Byte .value ]
14711471 cdata = (type_ * size ).from_buffer_copy (data )
14721472 result = self ._library .Cli_MBWrite (self ._pointer , start , size , byref (cdata ))
14731473 check_error (result )
@@ -1523,7 +1523,7 @@ def tm_read(self, start: int, amount: int) -> bytearray:
15231523 Buffer read.
15241524 """
15251525 wordlen = WordLen .Timer
1526- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
1526+ type_ = wordlen_to_ctypes [wordlen .value ]
15271527 data = (type_ * amount )()
15281528 result = self ._library .Cli_TMRead (self ._pointer , start , amount , byref (data ))
15291529 check_error (result , context = "client" )
@@ -1541,7 +1541,7 @@ def tm_write(self, start: int, amount: int, data: bytearray) -> int:
15411541 Snap7 code.
15421542 """
15431543 wordlen = WordLen .Timer
1544- type_ = snap7 . types . wordlen_to_ctypes [wordlen .value ]
1544+ type_ = wordlen_to_ctypes [wordlen .value ]
15451545 cdata = (type_ * amount ).from_buffer_copy (data )
15461546 result = self ._library .Cli_TMWrite (self ._pointer , start , amount , byref (cdata ))
15471547 check_error (result )
0 commit comments