| | |
| | | uses: docker/login-action@v4 |
| | | with: |
| | | registry: ghcr.io |
| | | username: ${{ github.actor }} |
| | | username: ${{ github.repository_owner }} |
| | | password: ${{ secrets.GITHUB_TOKEN }} |
| | | |
| | | - name: Prepare and push Docker plugin |
| | |
| | | - name: Login to Docker Hub |
| | | uses: docker/login-action@v4 |
| | | with: |
| | | username: ${{ github.actor }} |
| | | username: ${{ github.repository_owner }} |
| | | password: ${{ secrets.DOCKER_PASSWORD }} |
| | | |
| | | - name: Prepare and push Docker plugin |
| | | run: | |
| | | export NAME="${{ github.actor }}/pyipam" |
| | | export NAME="${{ github.repository_owner }}/pyipam" |
| | | export VERSIONS="latest ${GITHUB_REF/refs\/tags\//}" |
| | | ./package.sh |
| | |
| | | check[str(pool)] = pool |
| | | return str(pool) |
| | | |
| | | def get_pool(self, pool: str) -> Pool: |
| | | def has_pool(self, pool_id: str) -> bool: |
| | | return pool_id in self.pools or pool_id in self.pools6 |
| | | |
| | | def get_pool(self, pool: str | Pool) -> Pool: |
| | | pool = str(pool) |
| | | if pool in self.pools: |
| | | return self.pools[pool] |
| | |
| | | else: |
| | | raise InputValidationException('Unknown pool {}'.format(pool)) |
| | | |
| | | def remove_pool(self, pool: str): |
| | | def remove_pool(self, pool: str | Pool): |
| | | pool = str(pool) |
| | | if pool in self.pools: |
| | | del self.pools[pool] |
| | |
| | | from docker_plugin_api.Plugin import Blueprint |
| | | import flask |
| | | from .Ipam import * |
| | | from .IpamDriverData import * |
| | | from docker_plugin_api.IpamDriverEntities import * |
| | | |
| | |
| | | @app.route('/IpamDriver.RequestPool', methods=['POST']) |
| | | def RequestPool(): |
| | | request = RequestPoolEntity(**flask.request.get_json(force=True)) |
| | | space = spaces[request.AddressSpace] |
| | | pool = Pool(pool=request.Pool, subPool=request.SubPool, options=request.Options, v6=request.V6) |
| | | pool_id = space.add_pool(pool) |
| | | full_id = '{}-{}'.format(space.name, pool_id) |
| | | full_id, pool = create_pool(request) |
| | | return { |
| | | 'PoolID': full_id, |
| | | 'Pool': str(pool), |
| | |
| | | @app.route('/IpamDriver.ReleasePool', methods=['POST']) |
| | | def ReleasePool(): |
| | | request = ReleasePoolEntity(**flask.request.get_json(force=True)) |
| | | space, pool = get_space_pool(request.PoolID) |
| | | space.remove_pool(pool) |
| | | remove_pool(request.PoolID) |
| | | return {} |
| | | |
| | | |
| | |
| | | import shelve |
| | | from typing import Dict |
| | | |
| | | from docker_plugin_api.IpamDriverEntities import * |
| | | |
| | | from .Ipam import * |
| | | |
| | | |
| | | pools_store = shelve.open('pools', writeback=True) |
| | | pools: Dict[str, RequestPoolEntity] = pools_store |
| | | |
| | | |
| | | def pools_sync(): |
| | | pools_store.sync() |
| | | |
| | | |
| | | spaces = { |
| | | 'local': Space('local'), |
| | | 'global': Space('global'), |
| | | } |
| | | |
| | | |
| | | def create_pool(request: RequestPoolEntity) -> tuple[str, Pool]: |
| | | space = spaces[request.AddressSpace] |
| | | pool = Pool(pool=request.Pool, subPool=request.SubPool, options=request.Options, v6=request.V6) |
| | | pool_id = space.add_pool(pool) |
| | | full_id = '{}-{}'.format(space.name, pool_id) |
| | | pools[full_id] = request |
| | | pools_sync() |
| | | return full_id, pool |
| | | |
| | | |
| | | def remove_pool(full_id: str): |
| | | space, pool = get_space_pool(full_id) |
| | | space.remove_pool(pool) |
| | | if full_id in pools: |
| | | del pools[full_id] |
| | | pools_sync() |
| | | |
| | | |
| | | def get_space_pool(full_id: str): |
| | | space_id, pool_id = full_id.rsplit('-', 2) |
| | | space = spaces[space_id] |
| | | if not space.has_pool(pool_id) and full_id in pools: |
| | | create_pool(pools[full_id]) |
| | | return space, space.get_pool(pool_id) |
| | | |
| | | |
| | | __all__ = ['spaces', 'get_space_pool'] |
| | | __all__ = ['spaces', 'get_space_pool', 'pools', 'pools_sync', 'create_pool', 'remove_pool'] |