1
1
from functools import partial
2
2
from secrets import token_urlsafe
3
- from typing import List , Dict , Any , Optional , Iterable , Union
3
+ from typing import List , Dict , Optional , Iterable , Union
4
+ from typing_extensions import Annotated
4
5
from enum import Enum
5
6
from pathlib import Path
6
7
from ipaddress import IPv4Network , IPv6Network , IPv4Interface , IPv4Address
7
- from pydantic .v1 import BaseModel , BaseSettings , Field , validator , root_validator , constr , conint
8
+ from pydantic import field_validator , field_serializer , model_validator , Field , ConfigDict , BaseModel , ValidationInfo
9
+ from pydantic_settings import SettingsConfigDict , BaseSettings
8
10
from passlib .hash import sha512_crypt
9
11
from sshpubkeys import SSHKey , InvalidKeyError
10
12
from .validators import (formatted_string , unique_system_ip , constrained_cidr , cidr_subnet , subnet_interface ,
@@ -58,20 +60,23 @@ class GlobalConfigModel(BaseSettings):
58
60
"""
59
61
GlobalConfigModel is a special config block as field values can use environment variables as their default value
60
62
"""
61
- home_dir : str = Field (..., env = 'HOME' )
62
- project_root : str = Field (..., env = 'PROJ_ROOT' )
63
+ model_config = SettingsConfigDict (case_sensitive = True )
64
+
65
+ home_dir : Annotated [str , Field (validation_alias = 'HOME' )]
66
+ project_root : Annotated [str , Field (validation_alias = 'PROJ_ROOT' )]
63
67
common_tags : Dict [str , str ] = None
64
68
ubuntu_image : str
65
- ssh_public_key_file : str = Field (None , description = 'Can use python format string syntax to reference other '
66
- ' previous fields in this model' )
67
- ssh_public_key : Optional [str ] = None
68
- ssh_public_key_fp : Optional [str ] = None
69
+ ssh_public_key_file : Annotated [ Optional [ str ], Field (description = 'Can use python format string syntax to reference '
70
+ 'other previous fields in this model' )] = None
71
+ ssh_public_key : Annotated [ Optional [str ], Field ( validate_default = True ) ] = None
72
+ ssh_public_key_fp : Annotated [ Optional [str ], Field ( validate_default = True ) ] = None
69
73
70
- _validate_formatted_strings = validator ('ssh_public_key_file' , allow_reuse = True )(formatted_string )
74
+ _validate_formatted_strings = field_validator ('ssh_public_key_file' )(formatted_string )
71
75
72
- @validator ('ssh_public_key' , always = True )
73
- def resolve_ssh_public_key (cls , v , values : Dict [str , Any ]):
74
- pub_key = resolve_ssh_public_key (v , values .get ('ssh_public_key_file' ))
76
+ @field_validator ('ssh_public_key' )
77
+ @classmethod
78
+ def resolve_ssh_public_key (cls , v , info : ValidationInfo ):
79
+ pub_key = resolve_ssh_public_key (v , info .data .get ('ssh_public_key_file' ))
75
80
try :
76
81
ssh_key = SSHKey (pub_key , strict = True )
77
82
ssh_key .parse ()
@@ -82,10 +87,11 @@ def resolve_ssh_public_key(cls, v, values: Dict[str, Any]):
82
87
83
88
return pub_key
84
89
85
- @validator ('ssh_public_key_fp' , always = True )
86
- def resolve_ssh_public_key_fp (cls , v : Union [str , None ], values : Dict [str , Any ]) -> str :
90
+ @field_validator ('ssh_public_key_fp' )
91
+ @classmethod
92
+ def resolve_ssh_public_key_fp (cls , v : Union [str , None ], info : ValidationInfo ) -> str :
87
93
if v is None :
88
- pub_key = values .get ('ssh_public_key' )
94
+ pub_key = info . data .get ('ssh_public_key' )
89
95
if pub_key is None :
90
96
raise ValueError ("Field 'ssh_public_key' or 'ssh_public_key_file' not present" )
91
97
ssh_key = SSHKey (pub_key , strict = True )
@@ -96,19 +102,17 @@ def resolve_ssh_public_key_fp(cls, v: Union[str, None], values: Dict[str, Any])
96
102
97
103
return fp
98
104
99
- @validator ('project_root' )
105
+ @field_validator ('project_root' )
106
+ @classmethod
100
107
def resolve_project_root (cls , v : str ) -> str :
101
108
return str (Path (v ).resolve ())
102
109
103
- class Config :
104
- case_sensitive = True
105
-
106
110
107
111
#
108
112
# infra providers block
109
113
#
110
114
class InfraProviderConfigModel (BaseModel ):
111
- ntp_server : constr ( regex = r'^[a-zA-Z0-9.-]+$' )
115
+ ntp_server : Annotated [ str , Field ( pattern = r'^[a-zA-Z0-9.-]+$' )]
112
116
113
117
114
118
class GCPConfigModel (InfraProviderConfigModel ):
@@ -133,57 +137,57 @@ class InfraProvidersModel(BaseModel):
133
137
#
134
138
135
139
class ControllerCommonInfraModel (BaseModel ):
140
+ model_config = ConfigDict (use_enum_values = True )
141
+
136
142
provider : InfraProviderControllerOptionsEnum
137
143
region : str
138
- dns_domain : constr ( regex = r'^[a-zA-Z0-9.-]+$' ) = Field (
139
- '' , description = "If set, add A records for control plane element external addresses in AWS Route 53")
140
- sw_version : constr ( regex = r'^\d+(?:\.\d+)+$' )
144
+ dns_domain : Annotated [ str , Field (description = "If set, add A records for control plane element external "
145
+ " addresses in AWS Route 53", pattern = r'^[a-zA-Z0-9.-]+$' )] = ''
146
+ sw_version : Annotated [ str , Field ( pattern = r'^\d+(?:\.\d+)+$' )]
141
147
cloud_init_format : CloudInitEnum = CloudInitEnum .v1
142
148
143
- class Config :
144
- use_enum_values = True
145
-
146
149
147
150
class ControllerCommonConfigModel (BaseModel ):
148
151
organization_name : str
149
- site_id : conint (ge = 0 , le = 4294967295 )
152
+ site_id : Annotated [ int , Field (ge = 0 , le = 4294967295 )]
150
153
acl_ingress_ipv4 : List [IPv4Network ]
151
154
acl_ingress_ipv6 : List [IPv6Network ]
152
155
cidr : IPv4Network
153
156
vpn0_gateway : IPv4Address
154
157
155
- @validator ('acl_ingress_ipv4' , 'acl_ingress_ipv6' )
156
- def acl_str (cls , v : Iterable [IPv4Network ]) -> str :
158
+ @field_serializer ('acl_ingress_ipv4' , 'acl_ingress_ipv6' )
159
+ def acl_str (self , v : Iterable [IPv4Network ]) -> str :
157
160
return ', ' .join (f'"{ entry } "' for entry in v )
158
161
159
- _validate_cidr = validator ('cidr' , allow_reuse = True )(constrained_cidr (max_length = 23 ))
162
+ _validate_cidr = field_validator ('cidr' )(constrained_cidr (max_length = 23 ))
160
163
161
164
162
165
class CertAuthModel (BaseModel ):
163
166
passphrase : str = Field (default_factory = partial (token_urlsafe , 15 ))
164
167
cert_dir : str
165
- ca_cert : str = '{cert_dir}/myCA.pem'
168
+ ca_cert : Annotated [ str , Field ( validate_default = True )] = '{cert_dir}/myCA.pem'
166
169
167
- _validate_formatted_strings = validator ('ca_cert' , always = True , allow_reuse = True )(formatted_string )
170
+ _validate_formatted_strings = field_validator ('ca_cert' )(formatted_string )
168
171
169
172
170
173
class ControllerConfigModel (BaseModel ):
171
174
system_ip : IPv4Address
172
175
vpn0_interface_ipv4 : IPv4Interface
173
176
174
177
# Validators
175
- _validate_system_ip = validator ('system_ip' , allow_reuse = True )(unique_system_ip )
178
+ _validate_system_ip = field_validator ('system_ip' )(unique_system_ip )
176
179
177
180
178
181
class VmanageConfigModel (ControllerConfigModel ):
179
182
username : str = 'admin'
180
183
password : str = Field (default_factory = partial (token_urlsafe , 12 ))
181
- password_hashed : Optional [str ] = None
184
+ password_hashed : Annotated [ Optional [str ], Field ( validate_default = True ) ] = None
182
185
183
- @validator ('password_hashed' , always = True )
184
- def hash_password (cls , v : Union [str , None ], values : Dict [str , Any ]) -> str :
186
+ @field_validator ('password_hashed' )
187
+ @classmethod
188
+ def hash_password (cls , v : Union [str , None ], info : ValidationInfo ) -> str :
185
189
if v is None :
186
- clear_password = values .get ('password' )
190
+ clear_password = info . data .get ('password' )
187
191
if clear_password is None :
188
192
raise ValueError ("Field 'password' is not present" )
189
193
# Using 'openssl passwd -6' recipe
@@ -227,78 +231,74 @@ class InfraVmwareModel(BaseModel):
227
231
228
232
229
233
class EdgeInfraModel (ComputeInstanceModel ):
234
+ model_config = ConfigDict (use_enum_values = True )
235
+
230
236
provider : InfraProviderOptionsEnum
231
- region : Optional [str ] = None
232
- zone : Optional [str ] = None
233
- sw_version : constr ( regex = r'^\d+(?:\.\d+)+' )
237
+ region : Annotated [ Optional [str ], Field ( validate_default = True ) ] = None
238
+ zone : Annotated [ Optional [str ], Field ( validate_default = True ) ] = None
239
+ sw_version : Annotated [ str , Field ( pattern = r'^\d+(?:\.\d+)+' )]
234
240
cloud_init_format : CloudInitEnum = CloudInitEnum .v1
235
241
sdwan_model : str
236
242
sdwan_uuid : str
237
- vmware : Optional [InfraVmwareModel ] = None
238
-
239
- @validator ('region' , always = True )
240
- def region_validate (cls , v , values : Dict [str , Any ]):
241
- if v is None and values ['provider' ] != InfraProviderOptionsEnum .vmware :
242
- raise ValueError (f"{ values ['provider' ]} provider requires 'region' to be defined" )
243
- if v is not None and values ['provider' ] == InfraProviderOptionsEnum .vmware :
243
+ vmware : Annotated [Optional [InfraVmwareModel ], Field (validate_default = True )] = None
244
+
245
+ @field_validator ('region' )
246
+ @classmethod
247
+ def region_validate (cls , v , info : ValidationInfo ):
248
+ if v is None and info .data ['provider' ] != InfraProviderOptionsEnum .vmware :
249
+ raise ValueError (f"{ info .data ['provider' ]} provider requires 'region' to be defined" )
250
+ if v is not None and info .data ['provider' ] == InfraProviderOptionsEnum .vmware :
244
251
raise ValueError (f"'region' is not allowed when provider is { InfraProviderOptionsEnum .vmware } " )
245
252
246
253
return v
247
254
248
- @validator ('zone' , always = True )
249
- def zone_validate (cls , v , values : Dict [str , Any ]):
250
- if v is None and values ['provider' ] == InfraProviderOptionsEnum .gcp :
255
+ @field_validator ('zone' )
256
+ @classmethod
257
+ def zone_validate (cls , v , info : ValidationInfo ):
258
+ if v is None and info .data ['provider' ] == InfraProviderOptionsEnum .gcp :
251
259
raise ValueError ("GCP requires zone to be defined" )
252
260
253
261
return v
254
262
255
- @validator ('vmware' , always = True )
256
- def vmware_section (cls , v , values : Dict [str , Any ]):
257
- if v is None and values ['provider' ] == InfraProviderOptionsEnum .vmware :
263
+ @field_validator ('vmware' )
264
+ @classmethod
265
+ def vmware_section (cls , v , info : ValidationInfo ):
266
+ if v is None and info .data ['provider' ] == InfraProviderOptionsEnum .vmware :
258
267
raise ValueError (f"{ InfraProviderOptionsEnum .vmware } provider requires 'vmware' section to be defined" )
259
- if v is not None and values ['provider' ] != InfraProviderOptionsEnum .vmware :
268
+ if v is not None and info . data ['provider' ] != InfraProviderOptionsEnum .vmware :
260
269
raise ValueError (f"'vmware' section is only allowed when provider is { InfraProviderOptionsEnum .vmware } " )
261
270
262
271
return v
263
272
264
- @root_validator
265
- def instance_type_validate (cls , values : Dict [ str , Any ]) :
266
- if values [ ' instance_type' ] is None and values [ ' provider' ] != InfraProviderOptionsEnum .vmware :
267
- raise ValueError (f"{ values [ ' provider' ] } provider requires 'instance_type' to be defined" )
268
- if values [ ' instance_type' ] is not None and values [ ' provider' ] == InfraProviderOptionsEnum .vmware :
273
+ @model_validator ( mode = 'after' )
274
+ def instance_type_validate (self ) -> 'EdgeInfraModel' :
275
+ if self . instance_type is None and self . provider != InfraProviderOptionsEnum .vmware :
276
+ raise ValueError (f"{ self . provider } provider requires 'instance_type' to be defined" )
277
+ if self . instance_type is not None and self . provider == InfraProviderOptionsEnum .vmware :
269
278
raise ValueError (f"'instance_type' is not allowed when provider is { InfraProviderOptionsEnum .vmware } " )
270
279
271
- return values
272
-
273
- class Config :
274
- use_enum_values = True
280
+ return self
275
281
276
282
277
283
class EdgeConfigModel (BaseModel ):
278
- site_id : conint (ge = 0 , le = 4294967295 )
284
+ site_id : Annotated [ int , Field (ge = 0 , le = 4294967295 )]
279
285
system_ip : IPv4Address
280
286
cidr : Optional [IPv4Network ] = None
281
- vpn0_range : Optional [IPv4Network ] = None
282
- vpn0_interface_ipv4 : Optional [IPv4Interface ] = None
283
- vpn0_gateway : Optional [IPv4Address ] = None
284
- vpn1_range : Optional [IPv4Network ] = None
285
- vpn1_interface_ipv4 : Optional [IPv4Interface ] = None
287
+ vpn0_range : Annotated [ Optional [IPv4Network ], Field ( validate_default = True ) ] = None
288
+ vpn0_interface_ipv4 : Annotated [ Optional [IPv4Interface ], Field ( validate_default = True ) ] = None
289
+ vpn0_gateway : Annotated [ Optional [IPv4Address ], Field ( validate_default = True ) ] = None
290
+ vpn1_range : Annotated [ Optional [IPv4Network ], Field ( validate_default = True ) ] = None
291
+ vpn1_interface_ipv4 : Annotated [ Optional [IPv4Interface ], Field ( validate_default = True ) ] = None
286
292
287
293
# Validators
288
- _validate_system_ip = validator ('system_ip' , allow_reuse = True )(unique_system_ip )
289
- _validate_cidr = validator ('cidr' , allow_reuse = True )(constrained_cidr (max_length = 23 ))
290
- _validate_vpn_range = validator ('vpn0_range' , 'vpn1_range' , always = True , allow_reuse = True )(
291
- cidr_subnet (cidr_field = 'cidr' , prefix_len = 24 )
292
- )
293
- _validate_vpn0_ipv4 = validator ('vpn0_interface_ipv4' , always = True , allow_reuse = True )(
294
- subnet_interface (subnet_field = 'vpn0_range' , host_index = 10 )
295
- )
296
- _validate_vpn1_ipv4 = validator ('vpn1_interface_ipv4' , always = True , allow_reuse = True )(
297
- subnet_interface (subnet_field = 'vpn1_range' , host_index = 10 )
298
- )
299
- _validate_vpn0_gw = validator ('vpn0_gateway' , always = True , allow_reuse = True )(
300
- subnet_address (subnet_field = 'vpn0_range' , host_index = 0 )
301
- )
294
+ _validate_system_ip = field_validator ('system_ip' )(unique_system_ip )
295
+ _validate_cidr = field_validator ('cidr' )(constrained_cidr (max_length = 23 ))
296
+ _validate_vpn_range = field_validator ('vpn0_range' , 'vpn1_range' )(cidr_subnet (cidr_field = 'cidr' , prefix_len = 24 ))
297
+ _validate_vpn0_ipv4 = field_validator ('vpn0_interface_ipv4' )(subnet_interface (subnet_field = 'vpn0_range' ,
298
+ host_index = 10 ))
299
+ _validate_vpn1_ipv4 = field_validator ('vpn1_interface_ipv4' )(subnet_interface (subnet_field = 'vpn1_range' ,
300
+ host_index = 10 ))
301
+ _validate_vpn0_gw = field_validator ('vpn0_gateway' )(subnet_address (subnet_field = 'vpn0_range' , host_index = 0 ))
302
302
303
303
304
304
class EdgeModel (BaseModel ):
0 commit comments