-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopcua_client.py
125 lines (97 loc) · 4.79 KB
/
opcua_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# inspired in: https://github.com/nallamuthu/ModBus/blob/master/register_scanner.py
import asyncio
import logging
from pathlib import Path
from asyncua import Client, ua
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
from asyncua.crypto.truststore import TrustStore
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.primitives import serialization
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
class OPCUAClient:
def __init__(self):
self.client = None
def extract_node_info(self, node, object_name):
"""Extracts relevant node information from the OPC UA server."""
result = {}
if object_name in node:
node = node[object_name]['children']
for key, value in node.items():
result[key] = {
'NodeId': value['NodeId'],
'QualifiedName': value['QualifiedName'],
}
return result
async def connect_and_discover(self, server_address, p12_path, p12_pass, server_cert_path, user, password, use_cert, use_login, USE_TRUST_STORE=False):
client = Client(url=server_address)
if use_login:
client.set_user(user)
client.set_password(password)
if use_cert:
client_certificate, client_private_key = self.load_p12_certificate(p12_path, p12_pass)
cert_file = Path("client_cert.pem")
key_file = Path("client_key.pem")
with cert_file.open("wb") as cert_out, key_file.open("wb") as key_out:
cert_out.write(client_certificate.public_bytes(
encoding=serialization.Encoding.PEM
))
key_out.write(client_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
client.application_uri = "urn:client.local:OPCUA:client.local"
await client.set_security(
SecurityPolicyBasic256Sha256,
certificate=str(cert_file),
private_key=str(key_file),
server_certificate=str(server_cert_path),
mode=ua.MessageSecurityMode.SignAndEncrypt,
)
if USE_TRUST_STORE:
trust_store = TrustStore([Path('examples') / 'certificates' / 'trusted' / 'certs'], [])
await trust_store.load()
validator = CertificateValidator(
CertificateValidatorOptions.TRUSTED_VALIDATION | CertificateValidatorOptions.PEER_SERVER,
trust_store
)
else:
validator = CertificateValidator(
CertificateValidatorOptions.EXT_VALIDATION | CertificateValidatorOptions.PEER_SERVER
)
client.certificate_validator = validator
try:
async with client:
root = client.get_root_node()
print("Connected to server. Root node:", root)
nodes_dict = {}
async def browse_nodes(node, parent_dict):
children = await node.get_children()
for child in children:
browse_name = await child.read_browse_name()
namespace_index = browse_name.NamespaceIndex
node_id = child.nodeid.to_string()
parent_dict[browse_name.Name] = {
'NamespaceIndex': namespace_index,
'NodeId': node_id,
'QualifiedName': browse_name.Name,
'children': {}
}
await browse_nodes(child, parent_dict[browse_name.Name]['children'])
objects = client.get_objects_node()
await browse_nodes(objects, nodes_dict)
return nodes_dict
except ua.UaError as e:
_logger.error(f"Error: {e}")
def load_p12_certificate(self, p12_path, password):
"""
Load the .p12 certificate and extract the certificate and private key, converting password to bytes if needed.
"""
if isinstance(password, str):
password = password.encode('utf-8')
with open(p12_path, "rb") as p12_file:
p12_data = p12_file.read()
private_key, certificate, additional_certs = pkcs12.load_key_and_certificates(p12_data, password)
return certificate, private_key