diff options
Diffstat (limited to 'cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py')
-rw-r--r-- | cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py b/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py new file mode 100644 index 0000000..d17646c --- /dev/null +++ b/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py @@ -0,0 +1,206 @@ +# Copyright (c) 2018 Intel. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Track resources like FPGA GPU and QAT for a host. Provides the +conductor with useful information about availability through the accelerator +model. +""" + +from oslo_log import log as logging +from oslo_messaging.rpc.client import RemoteError +from oslo_utils import uuidutils + +from cyborg.accelerator.drivers.fpga.base import FPGADriver +from cyborg.accelerator.drivers.gpu.base import GPUDriver +from cyborg.common import utils +from cyborg import objects + + +LOG = logging.getLogger(__name__) + +AGENT_RESOURCE_SEMAPHORE = "agent_resources" + +DEPLOYABLE_VERSION = "1.0" + +# need to change the driver field name +DEPLOYABLE_HOST_MAPS = {"assignable": "assignable", + "pcie_address": "devices", + "board": "product_id", + "type": "function", + "vendor": "vendor_id", + "name": "name"} + + +class ResourceTracker(object): + """Agent helper class for keeping track of resource usage when hardware + Accelerator resources updated. Update the Deployable DB through conductor. + """ + + def __init__(self, host, cond_api): + # FIXME (Shaohe) local cache for Accelerator. + # Will fix it in next release. + self.fpgas = None + self.host = host + self.conductor_api = cond_api + self.fpga_driver = FPGADriver() + self.gpu_driver = GPUDriver() + + @utils.synchronized(AGENT_RESOURCE_SEMAPHORE) + def claim(self, context): + pass + + def _fpga_compare_and_update(self, host_dev, acclerator): + need_updated = False + for k, v in DEPLOYABLE_HOST_MAPS.items(): + if acclerator[k] != host_dev[v]: + need_updated = True + acclerator[k] = host_dev[v] + return need_updated + + def _gen_deployable_from_host_dev(self, host_dev): + dep = {} + for k, v in DEPLOYABLE_HOST_MAPS.items(): + dep[k] = host_dev[v] + dep["host"] = self.host + dep["version"] = DEPLOYABLE_VERSION + dep["availability"] = "free" + dep["uuid"] = uuidutils.generate_uuid() + return dep + + @utils.synchronized(AGENT_RESOURCE_SEMAPHORE) + def update_usage(self, context): + """Update the resource usage and stats after a change in an + instance + """ + def create_deployable(fpgas, bdf, parent_uuid=None): + fpga = fpgas[bdf] + dep = self._gen_deployable_from_host_dev(fpga) + # if parent_uuid: + dep["parent_uuid"] = parent_uuid + obj_dep = objects.Deployable(context, **dep) + new_dep = self.conductor_api.deployable_create(context, obj_dep) + return new_dep + + self.update_gpu_usage(context) + # NOTE(Shaohe Feng) need more agreement on how to keep consistency. + fpgas = self._get_fpga_devices() + bdfs = set(fpgas.keys()) + deployables = self.conductor_api.deployable_get_by_host( + context, self.host) + + # NOTE(Shaohe Feng) when no "pcie_address" in deployable? + accls = dict([(v["pcie_address"], v) for v in deployables + if v["type"] == "FPGA"]) + accl_bdfs = set(accls.keys()) + + # Firstly update + for mutual in accl_bdfs & bdfs: + accl = accls[mutual] + if self._fpga_compare_and_update(fpgas[mutual], accl): + try: + self.conductor_api.deployable_update(context, accl) + except RemoteError as e: + LOG.error(e) + # Add + new = bdfs - accl_bdfs + new_pf = set([n for n in new if fpgas[n]["function"] == "pf"]) + for n in new_pf: + new_dep = create_deployable(fpgas, n) + accls[n] = new_dep + sub_vf = set() + if "regions" in n: + sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]]) + for vf in sub_vf & new: + new_dep = create_deployable(fpgas, vf, new_dep["uuid"]) + accls[vf] = new_dep + new.remove(vf) + for n in new - new_pf: + p_bdf = fpgas[n]["parent_devices"] + p_accl = accls[p_bdf] + p_uuid = p_accl["uuid"] + new_dep = create_deployable(fpgas, n, p_uuid) + + # Delete + for obsolete in accl_bdfs - bdfs: + try: + self.conductor_api.deployable_delete(context, accls[obsolete]) + except RemoteError as e: + LOG.error(e) + del accls[obsolete] + + def _get_fpga_devices(self): + + def form_dict(devices, fpgas): + for v in devices: + fpgas[v["devices"]] = v + if "regions" in v: + form_dict(v["regions"], fpgas) + + fpgas = {} + vendors = self.fpga_driver.discover_vendors() + for v in vendors: + driver = self.fpga_driver.create(v) + form_dict(driver.discover(), fpgas) + return fpgas + + def update_gpu_usage(self, context): + """Update the gpu resource usage and stats after a change in an + instance, for the original update_usage specified update fpga, define a + new func update gpu here. + """ + def create_deployable(gpus, bdf, parent_uuid=None): + gpu = gpus[bdf] + dep = self._gen_deployable_from_host_dev(gpu) + # if parent_uuid: + dep["parent_uuid"] = parent_uuid + obj_dep = objects.Deployable(context, **dep) + new_dep = self.conductor_api.deployable_create(context, obj_dep) + return new_dep + gpus = self._get_gpu_devices() + deployables = self.conductor_api.deployable_get_by_host( + context, self.host) + + accls = dict([(v["pcie_address"], v) for v in deployables + if v["type"] == "GPU"]) + all_gpus = dict([(v["devices"], v) for v in gpus]) + + # Add + new = set(all_gpus.keys()) - set(accls.keys()) + new_gpus = [all_gpus[n] for n in new] + for n in new_gpus: + dep = self._gen_deployable_from_host_dev(n) + # if parent_uuid: + dep["parent_uuid"] = None + obj_dep = objects.Deployable(context, **dep) + self.conductor_api.deployable_create(context, obj_dep) + + # Delete + not_exists = set(accls.keys()) - set(all_gpus.keys()) + for obsolete in not_exists: + try: + self.conductor_api.deployable_delete(context, accls[obsolete]) + except RemoteError as e: + LOG.error(e) + del accls[obsolete] + + def _get_gpu_devices(self): + gpus = [] + vendors = self.gpu_driver.discover_vendors() + for v in vendors: + driver = self.gpu_driver.create(v) + if driver: + gpus.extend(driver.discover()) + return gpus |