-
Notifications
You must be signed in to change notification settings - Fork 0
/
subnet_gap_finder.py
188 lines (143 loc) · 5.63 KB
/
subnet_gap_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#!/usr/bin/env python3
# Python libs
import argparse
import ipaddress
import logging
import sys
# Flag to turn off AWS functionality (for lightweight use case)
REQUIRE_BOTO = True
# Third-party libs
try:
if REQUIRE_BOTO:
from botocore.exceptions import ClientError
import boto3
from tabulate import tabulate
import netaddr
except ImportError as exc:
print(f'{exc}')
exit(1)
log = logging.getLogger(__name__)
def _convert_to_ips(str_nets):
'''
Convert a list of strings to a list of IP networks. Any garbage is thrown out.
'''
ip_nets = []
for net in str_nets:
try:
ip_nets.append(ipaddress.ip_network(net))
except ValueError:
continue
return sorted(ip_nets)
def find_ip_gaps(ip_nets):
'''
Given an input of a list of IP networks, compare the list to find gaps which can be used for new subnets.
'''
ret = []
try:
last_net = ip_nets[0]
except IndexError:
log.error(f'No IP networks found!')
return
for this_net in ip_nets:
if last_net == this_net:
continue
if int(this_net.network_address) - int(last_net.broadcast_address) > 1:
start_ip = last_net.broadcast_address + 1
end_ip = this_net.network_address - 1
log.info('Found gap from {0} to {1}'.format(start_ip, end_ip))
gap = {'start': start_ip, 'end': end_ip, 'cidrs': []}
for x in netaddr.cidr_merge(list(netaddr.iter_iprange(str(start_ip), str(end_ip)))):
log.info(f'Base CIDR for gap: {x}')
gap['cidrs'].append(x)
ret.append(gap)
last_net = this_net
return ret
def file_gaps(filename):
'''
Find subnet gaps in a file containing a newline-spearated list of networks.
'''
try:
with open(filename, 'r') as f:
str_nets = f.read().split('\n')
except FileNotFoundError as exc:
log.error(f'{exc}')
return
return find_ip_gaps(_convert_to_ips(str_nets))
def vpc_gaps(vpc_id):
'''
Connect to AWS and find subnets associated with a VPC. Gaps between the subnets will be reported. If no gaps
between subnets are found, We'll check for gaps between the last subnet and the end of the VPC space.
'''
if not REQUIRE_BOTO:
log.error(f'AWS functionality is disabled!')
return
try:
ec2_client = boto3.client('ec2')
sn_return = ec2_client.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])
sn_cidrs = _convert_to_ips([sn.get('CidrBlock') for sn in sn_return.get('Subnets', [])])
except ClientError as exc:
log.error(f'Unable to get subnet info: {exc}')
return
gaps = find_ip_gaps(sn_cidrs)
if not gaps:
try:
vpc_return = ec2_client.describe_vpcs(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])
vpc_cidr = _convert_to_ips([vpc.get('CidrBlock') for vpc in vpc_return.get('Vpcs', [])])[0]
except ClientError as exc:
log.error(f'Unable to get VPC info: {exc}')
return
if int(vpc_cidr.broadcast_address) - int(sn_cidrs[-1].broadcast_address) > 1:
start_ip = sn_cidrs[-1].broadcast_address + 1
end_ip = vpc_cidr.broadcast_address
log.info('Found gap from {0} to END OF VPC {1}'.format(start_ip, end_ip))
gap = {'start': start_ip, 'end': end_ip, 'cidrs': []}
for x in netaddr.cidr_merge(list(netaddr.iter_iprange(str(start_ip), str(end_ip)))):
log.info(f'Base CIDR for gap: {x}')
gap['cidrs'].append(x)
gaps.append(gap)
return gaps
def _highlander(*args):
'''
Convenience function which ensures exactly one passed parameter has a value.
'''
return sum(bool(a) for a in args) == 1
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Utility script to show unused gaps in IP space within a list of networks.'
)
parser.add_argument('--filename', '-f', dest='filename', type=str,
help='Path to a file containing a list of IP networks. '
'EITHER "filename" OR "vpcid" IS REQUIRED')
parser.add_argument('--loglevel', '-l', dest='loglevel', type=str, default='WARN',
help='Specify log level as text string: ERROR, WARNING, INFO, DEBUG')
parser.add_argument('--vpcid', '-i', dest='vpc_id', type=str,
help='ID of a VPC in AWS to query. Subnets will be checked as the list of networks. '
'EITHER "filename" OR "vpcid" IS REQUIRED.')
args = parser.parse_args()
try:
log.setLevel(args.loglevel.upper())
except ValueError as exc:
print(f'Failed to set log level ({exc}) Using default.')
log.setLevel(parser.get_default('loglevel'))
log_handler = logging.StreamHandler(sys.stdout)
log_formatter = logging.Formatter('%(asctime)s [%(levelname)-5.5s] %(message)s')
log_handler.setFormatter(log_formatter)
log.addHandler(log_handler)
if not _highlander(args.filename, args.vpc_id):
log.error('Either filename or VPC ID should be specified.')
exit(1)
if args.filename:
gaps = file_gaps(args.filename)
elif args.vpc_id:
gaps = vpc_gaps(args.vpc_id)
if not gaps:
log.error('No information returned!')
exit(1)
print(
tabulate(
[[g['start'], g['end'], ', '.join([str(n) for n in g['cidrs']])] for g in gaps],
headers=['Gap Start', 'Gap End', 'Gap CIDRs'],
tablefmt='orgtbl'
)
)
exit(0)