summaryrefslogtreecommitdiffstats
path: root/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py
diff options
context:
space:
mode:
Diffstat (limited to 'cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py')
-rw-r--r--cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py b/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py
new file mode 100644
index 0000000..d17646c
--- /dev/null
+++ b/cyborg_enhancement/mitaka_version/cyborg/cyborg/agent/resource_tracker.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2018 Intel.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Track resources like FPGA GPU and QAT for a host. Provides the
+conductor with useful information about availability through the accelerator
+model.
+"""
+
+from oslo_log import log as logging
+from oslo_messaging.rpc.client import RemoteError
+from oslo_utils import uuidutils
+
+from cyborg.accelerator.drivers.fpga.base import FPGADriver
+from cyborg.accelerator.drivers.gpu.base import GPUDriver
+from cyborg.common import utils
+from cyborg import objects
+
+
+LOG = logging.getLogger(__name__)
+
+AGENT_RESOURCE_SEMAPHORE = "agent_resources"
+
+DEPLOYABLE_VERSION = "1.0"
+
+# need to change the driver field name
+DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
+ "pcie_address": "devices",
+ "board": "product_id",
+ "type": "function",
+ "vendor": "vendor_id",
+ "name": "name"}
+
+
+class ResourceTracker(object):
+ """Agent helper class for keeping track of resource usage when hardware
+ Accelerator resources updated. Update the Deployable DB through conductor.
+ """
+
+ def __init__(self, host, cond_api):
+ # FIXME (Shaohe) local cache for Accelerator.
+ # Will fix it in next release.
+ self.fpgas = None
+ self.host = host
+ self.conductor_api = cond_api
+ self.fpga_driver = FPGADriver()
+ self.gpu_driver = GPUDriver()
+
+ @utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
+ def claim(self, context):
+ pass
+
+ def _fpga_compare_and_update(self, host_dev, acclerator):
+ need_updated = False
+ for k, v in DEPLOYABLE_HOST_MAPS.items():
+ if acclerator[k] != host_dev[v]:
+ need_updated = True
+ acclerator[k] = host_dev[v]
+ return need_updated
+
+ def _gen_deployable_from_host_dev(self, host_dev):
+ dep = {}
+ for k, v in DEPLOYABLE_HOST_MAPS.items():
+ dep[k] = host_dev[v]
+ dep["host"] = self.host
+ dep["version"] = DEPLOYABLE_VERSION
+ dep["availability"] = "free"
+ dep["uuid"] = uuidutils.generate_uuid()
+ return dep
+
+ @utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
+ def update_usage(self, context):
+ """Update the resource usage and stats after a change in an
+ instance
+ """
+ def create_deployable(fpgas, bdf, parent_uuid=None):
+ fpga = fpgas[bdf]
+ dep = self._gen_deployable_from_host_dev(fpga)
+ # if parent_uuid:
+ dep["parent_uuid"] = parent_uuid
+ obj_dep = objects.Deployable(context, **dep)
+ new_dep = self.conductor_api.deployable_create(context, obj_dep)
+ return new_dep
+
+ self.update_gpu_usage(context)
+ # NOTE(Shaohe Feng) need more agreement on how to keep consistency.
+ fpgas = self._get_fpga_devices()
+ bdfs = set(fpgas.keys())
+ deployables = self.conductor_api.deployable_get_by_host(
+ context, self.host)
+
+ # NOTE(Shaohe Feng) when no "pcie_address" in deployable?
+ accls = dict([(v["pcie_address"], v) for v in deployables
+ if v["type"] == "FPGA"])
+ accl_bdfs = set(accls.keys())
+
+ # Firstly update
+ for mutual in accl_bdfs & bdfs:
+ accl = accls[mutual]
+ if self._fpga_compare_and_update(fpgas[mutual], accl):
+ try:
+ self.conductor_api.deployable_update(context, accl)
+ except RemoteError as e:
+ LOG.error(e)
+ # Add
+ new = bdfs - accl_bdfs
+ new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
+ for n in new_pf:
+ new_dep = create_deployable(fpgas, n)
+ accls[n] = new_dep
+ sub_vf = set()
+ if "regions" in n:
+ sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
+ for vf in sub_vf & new:
+ new_dep = create_deployable(fpgas, vf, new_dep["uuid"])
+ accls[vf] = new_dep
+ new.remove(vf)
+ for n in new - new_pf:
+ p_bdf = fpgas[n]["parent_devices"]
+ p_accl = accls[p_bdf]
+ p_uuid = p_accl["uuid"]
+ new_dep = create_deployable(fpgas, n, p_uuid)
+
+ # Delete
+ for obsolete in accl_bdfs - bdfs:
+ try:
+ self.conductor_api.deployable_delete(context, accls[obsolete])
+ except RemoteError as e:
+ LOG.error(e)
+ del accls[obsolete]
+
+ def _get_fpga_devices(self):
+
+ def form_dict(devices, fpgas):
+ for v in devices:
+ fpgas[v["devices"]] = v
+ if "regions" in v:
+ form_dict(v["regions"], fpgas)
+
+ fpgas = {}
+ vendors = self.fpga_driver.discover_vendors()
+ for v in vendors:
+ driver = self.fpga_driver.create(v)
+ form_dict(driver.discover(), fpgas)
+ return fpgas
+
+ def update_gpu_usage(self, context):
+ """Update the gpu resource usage and stats after a change in an
+ instance, for the original update_usage specified update fpga, define a
+ new func update gpu here.
+ """
+ def create_deployable(gpus, bdf, parent_uuid=None):
+ gpu = gpus[bdf]
+ dep = self._gen_deployable_from_host_dev(gpu)
+ # if parent_uuid:
+ dep["parent_uuid"] = parent_uuid
+ obj_dep = objects.Deployable(context, **dep)
+ new_dep = self.conductor_api.deployable_create(context, obj_dep)
+ return new_dep
+ gpus = self._get_gpu_devices()
+ deployables = self.conductor_api.deployable_get_by_host(
+ context, self.host)
+
+ accls = dict([(v["pcie_address"], v) for v in deployables
+ if v["type"] == "GPU"])
+ all_gpus = dict([(v["devices"], v) for v in gpus])
+
+ # Add
+ new = set(all_gpus.keys()) - set(accls.keys())
+ new_gpus = [all_gpus[n] for n in new]
+ for n in new_gpus:
+ dep = self._gen_deployable_from_host_dev(n)
+ # if parent_uuid:
+ dep["parent_uuid"] = None
+ obj_dep = objects.Deployable(context, **dep)
+ self.conductor_api.deployable_create(context, obj_dep)
+
+ # Delete
+ not_exists = set(accls.keys()) - set(all_gpus.keys())
+ for obsolete in not_exists:
+ try:
+ self.conductor_api.deployable_delete(context, accls[obsolete])
+ except RemoteError as e:
+ LOG.error(e)
+ del accls[obsolete]
+
+ def _get_gpu_devices(self):
+ gpus = []
+ vendors = self.gpu_driver.discover_vendors()
+ for v in vendors:
+ driver = self.gpu_driver.create(v)
+ if driver:
+ gpus.extend(driver.discover())
+ return gpus