| | |
| | | import shelve |
| | | from typing import Dict |
| | | |
| | | from docker_plugin_api.IpamDriverEntities import * |
| | | |
| | | from .Ipam import * |
| | | |
| | | |
| | | pools_store = shelve.open('pools', writeback=True) |
| | | pools: Dict[str, RequestPoolEntity] = pools_store |
| | | |
| | | |
| | | def pools_sync(): |
| | | pools_store.sync() |
| | | |
| | | |
| | | spaces = { |
| | | 'local': Space('local'), |
| | | 'global': Space('global'), |
| | | } |
| | | |
| | | |
| | | def create_pool(request: RequestPoolEntity) -> tuple[str, Pool]: |
| | | space = spaces[request.AddressSpace] |
| | | pool = Pool(pool=request.Pool, subPool=request.SubPool, options=request.Options, v6=request.V6) |
| | | pool_id = space.add_pool(pool) |
| | | full_id = '{}-{}'.format(space.name, pool_id) |
| | | pools[full_id] = request |
| | | pools_sync() |
| | | return full_id, pool |
| | | |
| | | |
| | | def remove_pool(full_id: str): |
| | | space, pool = get_space_pool(full_id) |
| | | space.remove_pool(pool) |
| | | if full_id in pools: |
| | | del pools[full_id] |
| | | pools_sync() |
| | | |
| | | |
| | | def get_space_pool(full_id: str): |
| | | space_id, pool_id = full_id.rsplit('-', 2) |
| | | space = spaces[space_id] |
| | | if not space.has_pool(pool_id) and full_id in pools: |
| | | create_pool(pools[full_id]) |
| | | return space, space.get_pool(pool_id) |
| | | |
| | | |
| | | __all__ = ['spaces', 'get_space_pool'] |
| | | __all__ = ['spaces', 'get_space_pool', 'pools', 'pools_sync', 'create_pool', 'remove_pool'] |