pyIPAM - Docker Plugin for IPAM written in Python
Jacek Kowalski
2020-05-22 40a1f97aa6f7ef85c73fcf942f7872b61c64a0fd
commit | author | age
659253 1 import ipaddress
JK 2 import random
3
4 from docker_plugin_api.Plugin import InputValidationException
5
6
7 def random_hex(len=4):
8     return ''.join(random.choice('0123456789abcdef') for _ in range(len))
9
10
11 class Pool:
12     def __init__(self, pool: str = None, options: dict = None, subPool: str = None, v6: bool = None):
13         if pool == '':
14             pool = None
15         if options is None:
16             options = {}
17         if subPool == '':
18             subPool = None
19
20         if pool is None and subPool is not None:
21             raise InputValidationException('Trying to allocate subPool without pool')
22         if pool is None and v6 is None:
23             raise InputValidationException('Trying to get random pool without specifying IP version (v6 field)')
24
25         if pool is None:
26             if subPool is not None:
27                 raise InputValidationException('Trying to allocate subPool without pool')
28             elif v6 is None:
29                 raise InputValidationException('Trying to get random pool without specifying IP version (v6 field)')
30
31             if v6:
32                 pool = 'fd00:{}:{}:{}::/64'.format(random_hex(), random_hex(), random_hex())
33             else:
34                 pool = '172.{}.{}.0/24'.format(random.randrange(16, 32), random.randrange(0, 256))
35
36         self.pool = ipaddress.ip_network(pool, strict=False)
37         if subPool is not None:
38             self.subpool = ipaddress.ip_network(subPool, strict=False)
39         else:
40             self.subpool = self.pool
41
42         if not self.subpool.subnet_of(self.pool):
43             raise InputValidationException('Subpool must be a subnet of pool')
44
45         self.allocations = set()
46         self.current = self.subpool.hosts()
47
48         self.v6 = isinstance(self.pool, ipaddress.IPv6Network)
49
50     def __eq__(self, pool: 'Pool') -> bool:
51         return self.v6 == pool.v6 and self.pool == pool.pool and self.subpool == pool.subpool
52
53     def overlaps(self, pool: 'Pool') -> bool:
54         if self.v6 != pool.v6:
55             raise InputValidationException('Cannot compare v6 and non-v6 pools')
56         return self.pool.overlaps(pool.pool)
57
58     def _is_allocated(self, address: str):
59         return str(address) in self.allocations
60
61     def _find_next_address(self):
62         for address in self.current:
63             if not self._is_allocated(address):
64                 return address
65         self.current = self.subpool.hosts()
66         for address in self.current:
67             if not self._is_allocated(address):
68                 return address
69         raise InputValidationException('No free addresses in pool')
70
71     def allocate(self, address: str = None) -> str:
72         if address is None or address == '':
73             address = self._find_next_address()
74         else:
75             address = ipaddress.ip_address(address)
76
77         if self.pool.network_address == address:
78             raise InputValidationException('Cannot allocate network address to a host')
79         if not self.v6 and self.pool.broadcast_address == address:
80             raise InputValidationException('Cannot allocate broadcast address to a host')
81         if address not in self.pool:
82             raise InputValidationException('Requested address does not belong to a pool')
83
84         address = str(address)
85         if self._is_allocated(address):
86             raise InputValidationException('Requested address {} is already used'.format(address))
87         self.allocations.add(address)
88
89         return '{}/{}'.format(address, self.pool.prefixlen)
90
91     def deallocate(self, address: str):
92         address = ipaddress.ip_address(address)
93         address = str(address)
94         if address in self.allocations:
95             self.allocations.remove(address)
96
97     def __str__(self):
98         return str(self.pool)
99
100
101 class Space:
102     def __init__(self, name: str):
103         self.name = name
104         self.pools = {}
105         self.pools6 = {}
106
107     def add_pool(self, pool: Pool) -> str:
108         check = self.pools6 if pool.v6 else self.pools
109         for id, p in check.items():
110             if pool == p:
111                 return id
112             if pool.overlaps(p):
113                 raise InputValidationException('There is already defined pool {} that overlaps this one'.format(id))
114         check[str(pool)] = pool
115         return str(pool)
116
117     def get_pool(self, pool: str) -> Pool:
118         pool = str(pool)
119         if pool in self.pools:
120             return self.pools[pool]
121         elif pool in self.pools6:
122             return self.pools6[pool]
123         else:
124             raise InputValidationException('Unknown pool {}'.format(pool))
125
126     def remove_pool(self, pool: str):
127         pool = str(pool)
128         if pool in self.pools:
129             del self.pools[pool]
130         elif pool in self.pools6:
131             del self.pools6[pool]
132         else:
133             raise InputValidationException('Unknown pool {}'.format(pool))
134
135
136 __all__ = ['random_hex', 'Pool', 'Space']