import shelve from typing import Dict from docker_plugin_api.IpamDriverEntities import * from .Ipam import * pools_store = shelve.open('pools', writeback=True) pools: Dict[str, RequestPoolEntity] = pools_store def pools_sync(): pools_store.sync() spaces = { 'local': Space('local'), 'global': Space('global'), } def create_pool(request: RequestPoolEntity) -> tuple[str, Pool]: space = spaces[request.AddressSpace] pool = Pool(pool=request.Pool, subPool=request.SubPool, options=request.Options, v6=request.V6) pool_id = space.add_pool(pool) full_id = '{}-{}'.format(space.name, pool_id) pools[full_id] = request pools_sync() return full_id, pool def remove_pool(full_id: str): space, pool = get_space_pool(full_id) space.remove_pool(pool) if full_id in pools: del pools[full_id] pools_sync() def get_space_pool(full_id: str): space_id, pool_id = full_id.rsplit('-', 2) space = spaces[space_id] if not space.has_pool(pool_id) and full_id in pools: create_pool(pools[full_id]) return space, space.get_pool(pool_id) __all__ = ['spaces', 'get_space_pool', 'pools', 'pools_sync', 'create_pool', 'remove_pool']