forked from CiscoDevNet/dnac-python-path-trace
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpath_trace.py
executable file
·396 lines (331 loc) · 14.3 KB
/
path_trace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
#! /usr/bin/env python
from env_lab import apicem
from time import sleep
import json
import requests
import sys
import urllib3
from requests.auth import HTTPBasicAuth
# Silence the insecure warning due to SSL Certificate
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
headers = {
'content-type': "application/json",
'x-auth-token': ""
}
def apic_login(host, username, password):
"""
Use the REST API to Log into an DNA_CENTER and retrieve token
"""
url = "https://{}/api/system/v1/auth/token".format(host)
# payload = {"username": username, "password": password}
# Make Login request and return the response body
response = requests.request("POST", url, auth=HTTPBasicAuth(username, password),
headers=headers, verify=False)
# print response
return response.json()["Token"]
def host_list(apic, ticket, ip=None, mac=None, name=None):
"""
Use the REST API to retrieve the list of hosts.
Optional parameters to filter by:
IP address
MAC address
Hostname
"""
url = "https://{}/api/v1/host".format(apic)
headers["x-auth-token"] = ticket
filters = []
# Add filters if provided
if ip:
filters.append("hostIp={}".format(ip))
if mac:
filters.append("hostMac={}".format(mac))
if name:
filters.append("hostName={}".format(name))
if len(filters) > 0:
url += "?" + "&".join(filters)
# Make API request and return the response body
response = requests.request("GET", url, headers=headers, verify=False)
return response.json()["response"]
def verify_single_host(host, ip):
"""
Simple function to verify only a single host returned from query.
If no hosts, or multiple hosts are returned, an error message is printed
and the program exits.
"""
if len(host) == 0:
print("Error: No host with IP address {} was found".format(ip))
sys.exit(1)
if len(host) > 1:
print("Error: Multiple hosts with IP address {} were found".format(ip))
print(json.dumps(host, indent=2))
sys.exit(1)
def print_host_details(host):
"""
Print to screen interesting details about a given host.
Input Paramters are:
host_desc: string to describe this host. Example "Source"
host: dictionary object of a host returned from APIC-EM
Standard Output Details:
Host Name (hostName) - If available
Host IP (hostIp)
Host MAC (hostMac)
Network Type (hostType) - wired/wireless
Host Sub Type (subType)
VLAN (vlanId)
Connected Network Device (connectedNetworkDeviceIpAddress)
Wired Host Details:
Connected Interface Name (connectedInterfaceName)
Wireless Host Details:
Connected AP Name (connectedAPName)
"""
# If optional host details missing, add as "Unavailable"
if "hostName" not in host.keys():
host["hostName"] = "Unavailable"
# Print Standard Details
print("Host Name: {}".format(host["hostName"]))
print("Network Type: {}".format(host["hostType"]))
print("Connected Network Device: {}".format(host["connectedNetworkDeviceIpAddress"])) # noqa: E501
# Print Wired/Wireless Details
if host["hostType"] == "wired":
print("Connected Interface Name: {}".format(host["connectedInterfaceName"])) # noqa: E501
if host["hostType"] == "wireless":
print("Connected AP Name: {}".format(host["connectedAPName"]))
# Print More Standard Details
print("VLAN: {}".format(host["vlanId"]))
print("Host IP: {}".format(host["hostIp"]))
print("Host MAC: {}".format(host["hostMac"]))
print("Host Sub Type: {}".format(host["subType"]))
# Blank line at the end
print("")
def network_device_list(apic, ticket, id=None):
"""
Use the REST API to retrieve the list of network devices.
If a device id is provided, return only that device
"""
url = "https://{}/api/v1/network-device".format(apic)
headers["x-auth-token"] = ticket
# Change URL to single device given an id
if id:
url += "/{}".format(id)
# Make API request and return the response body
response = requests.request("GET", url, headers=headers, verify=False)
# Always return a list object, even if single device for consistency
if id:
return [response.json()["response"]]
return response.json()["response"]
def interface_details(apic, ticket, id):
"""
Use the REST API to retrieve details about an interface based on id.
"""
url = "https://{}/api/v1/interface/{}".format(apic, id)
headers["x-auth-token"] = ticket
response = requests.request("GET", url, headers=headers, verify=False)
return response.json()["response"]
def print_network_device_details(network_device):
"""
Print to screen interesting details about a network device.
Input Paramters are:
network_device: dict object of a network device returned from APIC-EM
Standard Output Details:
Device Hostname (hostname)
Management IP (managementIpAddress)
Device Location (locationName)
Device Type (type)
Platform Id (platformId)
Device Role (role)
Serial Number (serialNumber)
Software Version (softwareVersion)
Up Time (upTime)
Reachability Status (reachabilityStatus)
Error Code (errorCode)
Error Description (errorDescription)
"""
# Print Standard Details
print("Device Hostname: {}".format(network_device["hostname"]))
print("Management IP: {}".format(network_device["managementIpAddress"]))
print("Device Location: {}".format(network_device["locationName"]))
print("Device Type: {}".format(network_device["type"]))
print("Platform Id: {}".format(network_device["platformId"]))
print("Device Role: {}".format(network_device["role"]))
print("Serial Number: {}".format(network_device["serialNumber"]))
print("Software Version: {}".format(network_device["softwareVersion"]))
print("Up Time: {}".format(network_device["upTime"]))
print("Reachability Status: {}".format(network_device["reachabilityStatus"])) # noqa: E501
print("Error Code: {}".format(network_device["errorCode"]))
print("Error Description: {}".format(network_device["errorDescription"]))
# Blank line at the end
print("")
def print_interface_details(interface):
"""
Print to screen interesting details about an interface.
Input Paramters are:
interface: dictionary object of an interface returned from APIC-EM
Standard Output Details:
Port Name - (portName)
Interface Type (interfaceType) - Physical/Virtual
Admin Status - (adminStatus)
Operational Status (status)
Media Type - (mediaType)
Speed - (speed)
Duplex Setting (duplex)
Port Mode (portMode) - access/trunk/routed
Interface VLAN - (vlanId)
Voice VLAN - (voiceVlan)
"""
# Print Standard Details
print("Port Name: {}".format(interface["portName"]))
print("Interface Type: {}".format(interface["interfaceType"]))
print("Admin Status: {}".format(interface["adminStatus"]))
print("Operational Status: {}".format(interface["status"]))
print("Media Type: {}".format(interface["mediaType"]))
print("Speed: {}".format(interface["speed"]))
print("Duplex Setting: {}".format(interface["duplex"]))
print("Port Mode: {}".format(interface["portMode"]))
print("Interface VLAN: {}".format(interface["vlanId"]))
print("Voice VLAN: {}".format(interface["voiceVlan"]))
# Blank line at the end
print("")
def run_flow_analysis(apic, ticket, source_ip, destination_ip):
"""
Use the REST API to initiate a Flow Analysis (Path Trace) from a given
source_ip to destination_ip. Function will wait for analysis to complete,
and return the results.
"""
base_url = "https://{}/api/v1/flow-analysis".format(apic)
headers["x-auth-token"] = ticket
# initiate flow analysis
body = {"destIP": destination_ip, "sourceIP": source_ip}
initiate_response = requests.post(base_url, headers=headers, verify=False,
json=body)
# Verify successfully initiated. If not error and exit
if initiate_response.status_code != 202:
print("Error: Flow Analysis Initiation Failed")
print(initiate_response.text)
sys.exit(1)
# Check status of analysis and wait until completed
flowAnalysisId = initiate_response.json()["response"]["flowAnalysisId"]
detail_url = base_url + "/{}".format(flowAnalysisId)
detail_response = requests.get(detail_url, headers=headers, verify=False)
while not detail_response.json()["response"]["request"]["status"] == "COMPLETED": # noqa: E501
print("Flow analysis not complete yet, waiting 5 seconds")
sleep(5)
detail_response = requests.get(detail_url, headers=headers,
verify=False)
# Return the flow analysis details
return detail_response.json()["response"]
def print_flow_analysis_details(flow_analysis):
"""
Print to screen interesting details about the flow analysis.
Input Parameters are:
flow_analysis: dictionary object of a flow analysis returned from APIC-EM
"""
hops = flow_analysis["networkElementsInfo"]
print("Number of Hops from Source to Destination: {}".format(len(hops)))
print()
# Print Details per hop
print("Flow Details: ")
# Hop 1 (index 0) and the last hop (index len - 1) represent the endpoints
for i, hop in enumerate(hops):
if i == 0 or i == len(hops) - 1:
continue
print("*" * 40)
print("Hop {}: Network Device {}".format(i, hop["name"]))
# If the hop is "UNKNOWN" continue along
if hop["name"] == "UNKNOWN":
print()
continue
print("Device IP: {}".format(hop["ip"]))
print("Device Role: {}".format(hop["role"]))
# If type is an Access Point, skip interface details
if hop["type"] == "Unified AP":
continue
print()
# Step 4: Are there any problems along the path?
# Print details about each interface
print("Ingress Interface")
print("-" * 20)
ingress = interface_details(apicem["host"], login,
hop["ingressInterface"]["physicalInterface"]["id"]) # noqa: E501
print_interface_details(ingress)
print("Egress Interface")
print("-" * 20)
egress = interface_details(apicem["host"], login,
hop["egressInterface"]["physicalInterface"]["id"]) # noqa: E501
print_interface_details(egress)
# Print blank line at end
print("")
# Entry point for program
if __name__ == '__main__':
# Setup Arg Parse for Command Line parameters
import argparse
parser = argparse.ArgumentParser()
# Command Line Parameters for Source and Destination IP
parser.add_argument("source_ip", help = "Source IP Address")
parser.add_argument("destination_ip", help = "Destination IP Address")
args = parser.parse_args()
# Get Source and Destination IPs from Command Line
source_ip = args.source_ip
destination_ip = args.destination_ip
# Print Starting message
print("Running Troubleshooting Script for ")
print(" Source IP: {} ".format(source_ip))
print(" Destination IP: {}".format(destination_ip))
print("")
# Log into the APIC-EM Controller to get Ticket
login = apic_login(apicem["host"], apicem["username"], apicem["password"])
# Step 1: Identify involved hosts
# Retrieve Host Details from APIC-EM
source_host = host_list(apicem["host"], login,
ip=source_ip)
destination_host = host_list(apicem["host"], login,
ip=destination_ip)
# Verify single host found for each IP
verify_single_host(source_host, source_ip)
verify_single_host(destination_host, destination_ip)
# Print Out Host details
print("Source Host Details:")
print("-" * 25)
print_host_details(source_host[0])
print("Destination Host Details:")
print("-" * 25)
print_host_details(destination_host[0])
# Step 2: Where are they in the network?
# Retrieve and Print Source Device Details from APIC-EM
source_host_net_device = network_device_list(apicem["host"],
login,
id=source_host[0]["connectedNetworkDeviceId"]) # noqa: E501
print("Source Host Network Connection Details:")
print("-" * 45)
print_network_device_details(source_host_net_device[0])
# If Host is wired, collect interface details
if source_host[0]["hostType"] == "wired":
source_host_interface = interface_details(apicem["host"],
login,
id=source_host[0]["connectedInterfaceId"]) # noqa: E501
print("Attached Interface:")
print("-" * 20)
print_interface_details(source_host_interface)
destination_host_net_device = network_device_list(apicem["host"],
login,
id=destination_host[0]["connectedNetworkDeviceId"]) # noqa: E501
print("Destination Host Network Connection Details:")
print("-" * 45)
print_network_device_details(destination_host_net_device[0])
# If Host is wired, collect interface details
if destination_host[0]["hostType"] == "wired":
destination_host_interface = interface_details(apicem["host"],
login,
id=destination_host[0]["connectedInterfaceId"]) # noqa: E501
print("Attached Interface:")
print("-" * 20)
print_interface_details(destination_host_interface)
# Step 3: What path does the traffic take?
# Step 4: Are there any problems along the path?
# Run a Flow Analysis for Source/Destionation
print("Running Flow Analysis from {} to {}".format(source_ip, destination_ip)) # noqa: E501
print("-" * 55)
flow_analysis = run_flow_analysis(apicem["host"], login,
source_ip,
destination_ip)
# Print Out Details
print_flow_analysis_details(flow_analysis)