CiscoTool_v1/netmiko_Api/utils/netmiko_test.py

231 lines
9.3 KiB
Python

from netmiko import ConnectHandler, NetmikoAuthenticationException, ReadTimeout, ConnectionException, NetMikoTimeoutException
from getpass import getpass
from pprint import pprint
import sys
import socket
# type = sys.argv[1]
# ip = sys.argv[2]
# username = sys.argv[3]
# password = sys.argv[4]
# device_type= (ssh | telnet) cisco_ios | cisco_ios_telnet
# ip
# host
# username
# password= string | getpass() #Yeu cau nhap password
# port
#secret
type = "cisco_ios_telnet"
host = sys.argv[2]
username = ""
password = ""
# R1 = {
# 'device_type':"cisco_ios_telnet",
# 'ip':"172.16.20.17",
# 'username':"",
# 'password':"",
# 'port': 23,
# # 'password':
# # 'secret':'',
# }
R1 = {
'device_type':type,
'ip':host,
'username':username,
'password':password,
'port': int(sys.argv[3]),
# 'password': '',
'secret':'',
}
def config_ip_port(connect, port, ip, netmask):
connect.enable()
connect.config_mode()
print("------Config ip address port "+port+"-------")
output = connect.send_config_set([
"int "+port,
"ip addr "+ip+" "+netmask,
"no shut"])
output += connect.save_config()
return output
def config_ip_vlan(connect, vlan, name, rangePort, ip, netMask):
connect.enable()
connect.config_mode()
print("------Configure ip address vlan "+vlan+"-------")
output = connect.send_config_set([
"vlan "+ vlan,
"name "+name,
"exit",
"inter vlan "+vlan,
"ip addr "+ ip + " "+netMask,
"no shut",
"exit",
"inter range "+rangePort,
"switchport access vlan "+vlan
])
output += connect.save_config()
return output
def config_ip_vlan_from_file(connect, file_name):
print("------Configure ip address vlan from"+file_name+"-------")
connect.enable()
connect.config_mode()
output = connect.send_config_from_file(file_name)
output += connect.save_config()
return output
def test_device(connect):
output = connect.find_prompt()
output += "||"
output += connect.send_command("show inv")
output += connect.send_command("show ver")
output += connect.send_command("show env all")
connect.enable()
output += connect.send_command("show post")
return output
def convert_interface_ranges(interfaces):
array_interface = []
other_interface = []
result = []
for x in interfaces.split("\n"):
if ("up" in x or "down" in x) and x.split(" ")[0]!="" and "Embedded" not in x:
if "Giga" in x or "Fast" in x:
array_interface.append(x.split(" ")[0])
else:
other_interface.append(x.split(" ")[0])
if "ASA" in "asfasf":
return array_interface+other_interface
else:
shorthand_ranges = {}
for interface in array_interface:
if len(interface.split("/"))==2:
if interface.split("/")[0] not in shorthand_ranges:
shorthand_ranges[interface.split("/")[0]] = [interface]
else:
shorthand_ranges[interface.split("/")[0]].append(interface)
else:
if len(interface.split("/"))==3:
if interface.split("/")[0]+"/"+interface.split("/")[1] not in shorthand_ranges:
shorthand_ranges[interface.split("/")[0]+"/"+interface.split("/")[1]] = [interface]
else:
shorthand_ranges[interface.split("/")[0]+"/"+interface.split("/")[1]].append(interface)
else:
if len(interface.split("/"))==4:
if interface.split("/")[0]+"/"+interface.split("/")[1]+"/"+interface.split("/")[2] not in shorthand_ranges:
shorthand_ranges[interface.split("/")[0]+"/"+interface.split("/")[1]+"/"+interface.split("/")[2]] = [interface]
else:
shorthand_ranges[interface.split("/")[0]+"/"+interface.split("/")[1]+"/"+interface.split("/")[2]].append(interface)
for x in shorthand_ranges:
if len(shorthand_ranges[x]) > 2:
result.append(shorthand_ranges[x][0]+"-"+shorthand_ranges[x][-1].split("/")[len(shorthand_ranges[x][-1].split("/"))-1])
else:
if len(shorthand_ranges[x]) == 2:
result.append(shorthand_ranges[x][0])
result.append(shorthand_ranges[x][1])
else:
result.append(shorthand_ranges[x][0])
return result+other_interface
output = ""
check=""
list = []
list1 = []
list2 = []
connect = {}
def startPort(list, output, list1, check, net_connect):
try:
if check == "":
check += net_connect.send_command( "show ip interface brief",read_timeout=300)
check += net_connect.send_command( "show interface ip brief",read_timeout=300)
output += check
list = convert_interface_ranges(check)
for x in list:
if x not in list1:
if "-" in x and ("Fa" in x or "Gi" in x):
output += net_connect.send_config_set([
"interface range "+ x,
"no shut"
], strip_prompt=False, strip_command=False, delay_factor=400,read_timeout=30)
list1.append(x)
# print(output)
else:
output += net_connect.send_config_set([
"interface "+ x,
"no shut"
], strip_prompt=False, strip_command=False, delay_factor=400,read_timeout=30)
list1.append(x)
# print(output)
print(output)
except NetmikoAuthenticationException:
print("Netmiko Authentication xxError\n(1) Can not connect to device\n(2) Can not load IOS image (switch:, rommon)")
except NetMikoTimeoutException:
print("NetMikoTimeoutException xxError")
except ConnectionRefusedError as err:
print("Connection Refused xxError\n(1) Connection exists on device\n(2) Your connection is interrupted by other connection") #Connection Refused
except ValueError:
print("Mode xxError")
except ReadTimeout:
# output = "TIME OUT\n"+output
startPort(list, output, list1, check, net_connect)
except ConnectionException:
print("Connection xxError")
try:
# print(R1)
with ConnectHandler(**R1) as net_connect:
connect = net_connect
#change mode
net_connect.enable()
# net_connect.config_mode()
if sys.argv[4] == "clear":
output = net_connect.send_command_timing( sys.argv[1], strip_prompt=False, strip_command=False, delay_factor=4)
if "[confirm]" in output:
output += net_connect.send_command("\n", delay_factor=4, expect_string=r"#")
net_connect.disconnect()
print(output)
else:
output += net_connect.send_command_timing("\n" , strip_prompt=False, strip_command=False, delay_factor=400, last_read=1.0, read_timeout=55)
if "yes/no" in output:
output += net_connect.send_command_timing("no" , strip_prompt=False, strip_command=False, delay_factor=400, last_read=15.0, read_timeout=120)
output += net_connect.send_command_timing("\n" , strip_prompt=False, strip_command=False, delay_factor=400, last_read=15.0, read_timeout=55)
net_connect.send_command_timing( "no", strip_prompt=False, strip_command=False, delay_factor=4, read_timeout=55)
net_connect.send_command_timing( "\n", strip_prompt=False, strip_command=False, delay_factor=4, read_timeout=55)
net_connect.enable()
for x in sys.argv[1].split(","):
output+="###"+x+"\n"
output += net_connect.send_command(x, read_timeout=120)
if "More" in output:
output += net_connect.send_command_timing(" " , strip_prompt=False, strip_command=False, delay_factor=400, last_read=1.0, read_timeout=60)
# print(output)
if sys.argv[1]!="show inv":
startPort(list, output, list1, check, connect)
net_connect.disconnect()
else:
print(output)
net_connect.disconnect()
# print(output)
except NetmikoAuthenticationException:
print("Netmiko Authentication xxError\n(1) Can not connect to device\n(2) Can not load IOS image (switch:, rommon)")
except NetMikoTimeoutException:
print("NetMikoTimeoutException xxError")
except ConnectionRefusedError as err:
print("Connection Refused xxError\n(1) Connection exists on device\n(2) Your connection is interrupted by other connection") #Connection Refused
except ValueError:
print("Mode xxError")
except ReadTimeout:
output = "TIME OUT\n"+output
startPort(list1, output, list1, check, connect)
except ConnectionException:
print("Connection xxError")