11# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22# SPDX-License-Identifier: Apache-2.0
33#
4- from typing import TYPE_CHECKING , List , Optional
4+ import re
5+ from copy import deepcopy
6+ from typing import TYPE_CHECKING , List , Optional , Tuple
57
8+ from aft_common .aft_utils import get_logger
69from boto3 .session import Session
710
811if TYPE_CHECKING :
2124 OrganizationalUnitTypeDef = object
2225 AccountTypeDef = object
2326
27+ logger = get_logger ()
28+
2429
2530class OrganizationsAgent :
2631 ROOT_OU = "Root"
32+ # https://docs.aws.amazon.com/organizations/latest/APIReference/API_OrganizationalUnit.html
33+ # Ex: Sandbox (ou-1234-zxcv)
34+ OU_ID_PATTERN = r"\(ou-.*\)"
35+ OU_NAME_PATTERN = r".{1,128}"
36+ NESTED_OU_NAME_PATTERN = (
37+ rf"{ OU_NAME_PATTERN } \s{ OU_ID_PATTERN } " # <Name> space (<Id>)
38+ )
2739
2840 def __init__ (self , ct_management_session : Session ):
2941 self .orgs_client : OrganizationsClient = ct_management_session .client (
3042 "organizations"
3143 )
3244
45+ # Memoize expensive all-org traversal
46+ self .org_ous : Optional [List [OrganizationalUnitTypeDef ]] = None
47+
48+ @staticmethod
49+ def ou_name_is_nested_format (ou_name : str ) -> bool :
50+ pattern = re .compile (OrganizationsAgent .NESTED_OU_NAME_PATTERN )
51+ if pattern .match (ou_name ) is not None :
52+ return True
53+ return False
54+
55+ @staticmethod
56+ def get_name_and_id_from_nested_ou (
57+ nested_ou_name : str ,
58+ ) -> Optional [Tuple [str , str ]]:
59+ if not OrganizationsAgent .ou_name_is_nested_format (ou_name = nested_ou_name ):
60+ return None
61+
62+ pattern = re .compile (OrganizationsAgent .OU_ID_PATTERN )
63+ match = pattern .search (nested_ou_name )
64+ if match is None :
65+ return None
66+ first_id_idx , last_id_idx = match .span ()
67+
68+ # Grab the matched ID from the nested-ou-string using the span,
69+ id = nested_ou_name [first_id_idx :last_id_idx ]
70+ id = id .strip ("()" )
71+
72+ # The name is what remains of the nested OU without the ID, minus
73+ # the whitespace between the name and ID
74+ name = nested_ou_name [: first_id_idx - 1 ]
75+ return (name , id )
76+
3377 def get_root_ou_id (self ) -> str :
3478 return self .orgs_client .list_roots ()["Roots" ][0 ]["Id" ]
3579
3680 def get_ous_for_root (self ) -> List [OrganizationalUnitTypeDef ]:
81+ return self .get_children_ous_from_parent_id (parent_id = self .get_root_ou_id ())
82+
83+ def get_all_org_ous (self ) -> List [OrganizationalUnitTypeDef ]:
84+ # Memoize calls / cache previous results
85+ # Cache is not shared between invocations so staleness due to org updates is unlikely
86+ if self .org_ous is not None :
87+ return self .org_ous
88+
89+ # Including the root OU
90+ list_root_response = self .orgs_client .list_roots ()
91+ root_ou : OrganizationalUnitTypeDef = {
92+ "Id" : list_root_response ["Roots" ][0 ]["Id" ],
93+ "Arn" : list_root_response ["Roots" ][0 ]["Arn" ],
94+ "Name" : list_root_response ["Roots" ][0 ]["Name" ],
95+ }
96+
97+ org_ous = [root_ou ]
98+
99+ # Get the children OUs of the root as the first pass
100+ root_children = self .get_ous_for_root ()
101+ org_ous .extend (root_children )
102+
103+ # Exclude root to avoid double counting children
104+ ous_to_query = deepcopy (root_children )
105+
106+ # Recursively search all children OUs for further children
107+ while len (ous_to_query ) > 0 :
108+ parent_id : str = ous_to_query .pop ()["Id" ]
109+ children_ous = self .get_children_ous_from_parent_id (parent_id = parent_id )
110+ org_ous .extend (children_ous )
111+ ous_to_query .extend (children_ous )
112+
113+ self .org_ous = org_ous
114+
115+ return self .org_ous
116+
117+ def get_children_ous_from_parent_id (
118+ self , parent_id : str
119+ ) -> List [OrganizationalUnitTypeDef ]:
120+
37121 paginator = self .orgs_client .get_paginator (
38122 "list_organizational_units_for_parent"
39123 )
40- pages = paginator .paginate (ParentId = self . get_root_ou_id () )
41- ous_under_root = []
124+ pages = paginator .paginate (ParentId = parent_id )
125+ children_ous = []
42126 for page in pages :
43- ous_under_root .extend (page ["OrganizationalUnits" ])
44- return ous_under_root
127+ children_ous .extend (page ["OrganizationalUnits" ])
128+ return children_ous
45129
46- def list_tags_for_resource (self , resource : str ) -> List [TagTypeDef ]:
47- return self .orgs_client .list_tags_for_resource (ResourceId = resource )["Tags" ]
130+ def get_ou_ids_from_ou_names (self , target_ou_names : List [str ]) -> List [str ]:
131+ ous = self .get_all_org_ous ()
132+ ou_map = {}
133+
134+ # Convert list of OUs to name->id map for constant time lookups
135+ for ou in ous :
136+ ou_map [ou ["Name" ]] = ou ["Id" ]
48137
49- def get_ou_id_for_account_id (
138+ # Search the map for every target exactly once
139+ matched_ou_ids = []
140+ for target_name in target_ou_names :
141+ # Only match nested OU targets if both name and ID are the same
142+ nested_parsed = OrganizationsAgent .get_name_and_id_from_nested_ou (
143+ nested_ou_name = target_name
144+ )
145+ if nested_parsed is not None : # Nested OU pattern matched!
146+ target_name , target_id = nested_parsed
147+ if ou_map [target_name ] == target_id :
148+ matched_ou_ids .append (ou_map [target_name ])
149+ else :
150+ if target_name in ou_map :
151+ matched_ou_ids .append (ou_map [target_name ])
152+
153+ return matched_ou_ids
154+
155+ def get_ou_from_account_id (
50156 self , account_id : str
51- ) -> Optional [DescribeOrganizationalUnitResponseTypeDef ]:
157+ ) -> Optional [OrganizationalUnitTypeDef ]:
52158 if self .account_id_is_member_of_root (account_id = account_id ):
53- return self .orgs_client .describe_organizational_unit (
54- OrganizationalUnitId = self .get_root_ou_id ()
55- )
56- ou_ids = [ou ["Id" ] for ou in self .get_ous_for_root ()]
57- for ou_id in ou_ids :
58- ou_accounts = self .get_accounts_for_ou (ou_id = ou_id )
59- if account_id in [account_object ["Id" ] for account_object in ou_accounts ]:
60- return self .orgs_client .describe_organizational_unit (
61- OrganizationalUnitId = ou_id
62- )
159+ list_root_response = self .orgs_client .list_roots ()
160+ root_ou : OrganizationalUnitTypeDef = {
161+ "Id" : list_root_response ["Roots" ][0 ]["Id" ],
162+ "Arn" : list_root_response ["Roots" ][0 ]["Arn" ],
163+ "Name" : list_root_response ["Roots" ][0 ]["Name" ],
164+ }
165+ return root_ou
166+
167+ ous = self .get_all_org_ous ()
168+ for ou in ous :
169+ account_ids = [acct ["Id" ] for acct in self .get_accounts_for_ou (ou ["Id" ])]
170+ if account_id in account_ids :
171+ return ou
63172 return None
64173
65174 def get_accounts_for_ou (self , ou_id : str ) -> List [AccountTypeDef ]:
@@ -70,6 +179,15 @@ def get_accounts_for_ou(self, ou_id: str) -> List[AccountTypeDef]:
70179 accounts .extend (page ["Accounts" ])
71180 return accounts
72181
182+ def get_account_ids_in_ous (self , ou_names : List [str ]) -> List [str ]:
183+ ou_ids = self .get_ou_ids_from_ou_names (target_ou_names = ou_names )
184+ account_ids = []
185+ for ou_id in ou_ids :
186+ account_ids .extend (
187+ [acct ["Id" ] for acct in self .get_accounts_for_ou (ou_id = ou_id )]
188+ )
189+ return account_ids
190+
73191 def account_id_is_member_of_root (self , account_id : str ) -> bool :
74192 root_id = self .get_root_ou_id ()
75193 accounts_under_root = self .get_accounts_for_ou (ou_id = root_id )
@@ -78,9 +196,11 @@ def account_id_is_member_of_root(self, account_id: str) -> bool:
78196 def ou_contains_account (self , ou_name : str , account_id : str ) -> bool :
79197 if ou_name == OrganizationsAgent .ROOT_OU :
80198 return self .account_id_is_member_of_root (account_id = account_id )
81- current_ou = self .get_ou_id_for_account_id (account_id = account_id )
199+ current_ou = self .get_ou_from_account_id (account_id = account_id )
82200 if current_ou :
83- current_ou_name = current_ou ["OrganizationalUnit" ]["Name" ]
84- if current_ou_name == ou_name :
201+ if ou_name == current_ou ["Name" ]:
85202 return True
86203 return False
204+
205+ def list_tags_for_resource (self , resource : str ) -> List [TagTypeDef ]:
206+ return self .orgs_client .list_tags_for_resource (ResourceId = resource )["Tags" ]
0 commit comments