import ipaddress import random from docker_plugin_api.Plugin import InputValidationException def random_hex(len=4): return ''.join(random.choice('0123456789abcdef') for _ in range(len)) class Pool: def __init__(self, pool: str = None, options: dict = None, subPool: str = None, v6: bool = None): if pool == '': pool = None if options is None: options = {} if subPool == '': subPool = None if pool is None and subPool is not None: raise InputValidationException('Trying to allocate subPool without pool') if pool is None and v6 is None: raise InputValidationException('Trying to get random pool without specifying IP version (v6 field)') if pool is None: if subPool is not None: raise InputValidationException('Trying to allocate subPool without pool') elif v6 is None: raise InputValidationException('Trying to get random pool without specifying IP version (v6 field)') if v6: pool = 'fd00:{}:{}:{}::/64'.format(random_hex(), random_hex(), random_hex()) else: pool = '172.{}.{}.0/24'.format(random.randrange(16, 32), random.randrange(0, 256)) self.pool = ipaddress.ip_network(pool, strict=False) if subPool is not None: self.subpool = ipaddress.ip_network(subPool, strict=False) else: self.subpool = self.pool if not self.subpool.subnet_of(self.pool): raise InputValidationException('Subpool must be a subnet of pool') self.allocations = set() self.current = self.subpool.hosts() self.v6 = isinstance(self.pool, ipaddress.IPv6Network) def __eq__(self, pool: 'Pool') -> bool: return self.v6 == pool.v6 and self.pool == pool.pool and self.subpool == pool.subpool def overlaps(self, pool: 'Pool') -> bool: if self.v6 != pool.v6: raise InputValidationException('Cannot compare v6 and non-v6 pools') return self.pool.overlaps(pool.pool) def _is_allocated(self, address: str): return str(address) in self.allocations def _find_next_address(self): for address in self.current: if not self._is_allocated(address): return address self.current = self.subpool.hosts() for address in self.current: if not self._is_allocated(address): return address raise InputValidationException('No free addresses in pool') def allocate(self, address: str = None) -> str: if address is None or address == '': address = self._find_next_address() else: address = ipaddress.ip_address(address) if self.pool.network_address == address: raise InputValidationException('Cannot allocate network address to a host') if not self.v6 and self.pool.broadcast_address == address: raise InputValidationException('Cannot allocate broadcast address to a host') if address not in self.pool: raise InputValidationException('Requested address does not belong to a pool') address = str(address) if self._is_allocated(address): raise InputValidationException('Requested address {} is already used'.format(address)) self.allocations.add(address) return '{}/{}'.format(address, self.pool.prefixlen) def deallocate(self, address: str): address = ipaddress.ip_address(address) address = str(address) if address in self.allocations: self.allocations.remove(address) def __str__(self): return str(self.pool) class Space: def __init__(self, name: str): self.name = name self.pools = {} self.pools6 = {} def add_pool(self, pool: Pool) -> str: check = self.pools6 if pool.v6 else self.pools for id, p in check.items(): if pool == p: return id if pool.overlaps(p): raise InputValidationException('There is already defined pool {} that overlaps this one'.format(id)) check[str(pool)] = pool return str(pool) def get_pool(self, pool: str) -> Pool: pool = str(pool) if pool in self.pools: return self.pools[pool] elif pool in self.pools6: return self.pools6[pool] else: raise InputValidationException('Unknown pool {}'.format(pool)) def remove_pool(self, pool: str): pool = str(pool) if pool in self.pools: del self.pools[pool] elif pool in self.pools6: del self.pools6[pool] else: raise InputValidationException('Unknown pool {}'.format(pool)) __all__ = ['random_hex', 'Pool', 'Space']