aboutsummaryrefslogtreecommitdiffstats
path: root/Testcases/vnc_api/gen/resource_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'Testcases/vnc_api/gen/resource_test.py')
-rw-r--r--Testcases/vnc_api/gen/resource_test.py5173
1 files changed, 0 insertions, 5173 deletions
diff --git a/Testcases/vnc_api/gen/resource_test.py b/Testcases/vnc_api/gen/resource_test.py
deleted file mode 100644
index 032586a..0000000
--- a/Testcases/vnc_api/gen/resource_test.py
+++ /dev/null
@@ -1,5173 +0,0 @@
-'''
-This module defines the fixture classes for all config elements
-'''
-
-# AUTO-GENERATED file from IFMapApiGenerator. Do Not Edit!
-
-import cfixture
-from vnc_api import vnc_api
-from cfgm_common.exceptions import *
-
-from generatedssuper import GeneratedsSuper
-
-class DomainTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.Domain`
- """
- def __init__(self, conn_drv, domain_name=None, parent_fixt=None, auto_prop_val=False, domain_limits=None, api_access_list=None, id_perms=None, display_name=None):
- '''
- Create DomainTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- domain_name (str): Name of domain
- parent_fixt (:class:`.ConfigRootTestFixtureGen`): Parent fixture
- domain_limits (instance): instance of :class:`DomainLimitsType`
- api_access_list (instance): instance of :class:`ApiAccessListType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(DomainTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not domain_name:
- self._name = 'default-domain'
- else:
- self._name = domain_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.domain_limits = domain_limits
- self.api_access_list = api_access_list
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_domain_limits(self.domain_limits or vnc_api.gen.resource_xsd.DomainLimitsType.populate())
- self._obj.set_api_access_list(self.api_access_list or vnc_api.gen.resource_xsd.ApiAccessListType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(DomainTestFixtureGen, self).setUp()
- # child of config-root
- self._obj = vnc_api.Domain(self._name)
- try:
- self._obj = self._conn_drv.domain_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.domain_limits = self.domain_limits
- self._obj.api_access_list = self.api_access_list
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.domain_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.domain_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_projects() or self._obj.get_namespaces() or self._obj.get_service_templates() or self._obj.get_virtual_DNSs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_domains():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.domains.remove(child_obj)
- break
-
- self._conn_drv.domain_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class DomainTestFixtureGen
-
-class GlobalVrouterConfigTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.GlobalVrouterConfig`
- """
- def __init__(self, conn_drv, global_vrouter_config_name=None, parent_fixt=None, auto_prop_val=False, linklocal_services=None, encapsulation_priorities=None, vxlan_network_identifier_mode=None, id_perms=None, display_name=None):
- '''
- Create GlobalVrouterConfigTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- global_vrouter_config_name (str): Name of global_vrouter_config
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- linklocal_services (instance): instance of :class:`LinklocalServicesTypes`
- encapsulation_priorities (instance): instance of :class:`EncapsulationPrioritiesType`
- vxlan_network_identifier_mode (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(GlobalVrouterConfigTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not global_vrouter_config_name:
- self._name = 'default-global-vrouter-config'
- else:
- self._name = global_vrouter_config_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.linklocal_services = linklocal_services
- self.encapsulation_priorities = encapsulation_priorities
- self.vxlan_network_identifier_mode = vxlan_network_identifier_mode
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_linklocal_services(self.linklocal_services or vnc_api.gen.resource_xsd.LinklocalServicesTypes.populate())
- self._obj.set_encapsulation_priorities(self.encapsulation_priorities or vnc_api.gen.resource_xsd.EncapsulationPrioritiesType.populate())
- self._obj.set_vxlan_network_identifier_mode(self.vxlan_network_identifier_mode or GeneratedsSuper.populate_string("vxlan_network_identifier_mode"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(GlobalVrouterConfigTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.GlobalVrouterConfig(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.global_vrouter_config_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.linklocal_services = self.linklocal_services
- self._obj.encapsulation_priorities = self.encapsulation_priorities
- self._obj.vxlan_network_identifier_mode = self.vxlan_network_identifier_mode
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.global_vrouter_config_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.global_vrouter_config_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_global_vrouter_configs():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.global_vrouter_configs.remove(child_obj)
- break
-
- self._conn_drv.global_vrouter_config_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class GlobalVrouterConfigTestFixtureGen
-
-class InstanceIpTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.InstanceIp`
- """
- def __init__(self, conn_drv, instance_ip_name=None, auto_prop_val=False, virtual_network_refs = None, virtual_machine_interface_refs = None, instance_ip_address=None, instance_ip_family=None, instance_ip_mode=None, subnet_uuid=None, id_perms=None, display_name=None):
- '''
- Create InstanceIpTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- %s_name (str): Name of %s
- virtual_network (list): list of :class:`VirtualNetwork` type
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- instance_ip_address (instance): instance of :class:`xsd:string`
- instance_ip_family (instance): instance of :class:`xsd:string`
- instance_ip_mode (instance): instance of :class:`xsd:string`
- subnet_uuid (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(InstanceIpTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not instance_ip_name:
- self._name = 'default-instance-ip'
- else:
- self._name = instance_ip_name
- self._obj = None
- self._auto_prop_val = auto_prop_val
- if virtual_network_refs:
- for ln in virtual_network_refs:
- self.add_virtual_network (ln)
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- self.instance_ip_address = instance_ip_address
- self.instance_ip_family = instance_ip_family
- self.instance_ip_mode = instance_ip_mode
- self.subnet_uuid = subnet_uuid
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_networks ():
- self.add_virtual_network (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_network (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualNetwork` link to :class:`InstanceIp`
- Args:
- lo (:class:`VirtualNetwork`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_network (lo)
- if update_server:
- self._conn_drv.instance_ip_update (self._obj)
-
- if add_link:
- self.add_link('virtual_network', cfixture.ConrtailLink('virtual_network', 'instance_ip', 'virtual_network', ['ref'], lo))
- #end add_virtual_network_link
-
- def get_virtual_networks (self):
- return self.get_links ('virtual_network')
- #end get_virtual_networks
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`InstanceIp`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.instance_ip_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'instance_ip', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
-
- def populate (self):
- self._obj.set_instance_ip_address(self.instance_ip_address or GeneratedsSuper.populate_string("instance_ip_address"))
- self._obj.set_instance_ip_family(self.instance_ip_family or GeneratedsSuper.populate_string("instance_ip_family"))
- self._obj.set_instance_ip_mode(self.instance_ip_mode or GeneratedsSuper.populate_string("instance_ip_mode"))
- self._obj.set_subnet_uuid(self.subnet_uuid or GeneratedsSuper.populate_string("subnet_uuid"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(InstanceIpTestFixtureGen, self).setUp()
- self._obj = vnc_api.InstanceIp(self._name)
- try:
- self._obj = self._conn_drv.instance_ip_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.instance_ip_address = self.instance_ip_address
- self._obj.instance_ip_family = self.instance_ip_family
- self._obj.instance_ip_mode = self.instance_ip_mode
- self._obj.subnet_uuid = self.subnet_uuid
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.instance_ip_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.instance_ip_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- self._conn_drv.instance_ip_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class InstanceIpTestFixtureGen
-
-class NetworkPolicyTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.NetworkPolicy`
- """
- def __init__(self, conn_drv, network_policy_name=None, parent_fixt=None, auto_prop_val=False, network_policy_entries=None, id_perms=None, display_name=None):
- '''
- Create NetworkPolicyTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- network_policy_name (str): Name of network_policy
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- network_policy_entries (instance): instance of :class:`PolicyEntriesType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(NetworkPolicyTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not network_policy_name:
- self._name = 'default-network-policy'
- else:
- self._name = network_policy_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.network_policy_entries = network_policy_entries
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_network_policy_entries(self.network_policy_entries or vnc_api.gen.resource_xsd.PolicyEntriesType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(NetworkPolicyTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.NetworkPolicy(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.network_policy_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.network_policy_entries = self.network_policy_entries
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.network_policy_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.network_policy_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_network_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_network_policys():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.network_policys.remove(child_obj)
- break
-
- self._conn_drv.network_policy_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class NetworkPolicyTestFixtureGen
-
-class LoadbalancerPoolTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.LoadbalancerPool`
- """
- def __init__(self, conn_drv, loadbalancer_pool_name=None, parent_fixt=None, auto_prop_val=False, service_instance_refs = None, virtual_machine_interface_refs = None, service_appliance_set_refs = None, loadbalancer_healthmonitor_refs = None, loadbalancer_pool_properties=None, loadbalancer_pool_provider=None, id_perms=None, display_name=None):
- '''
- Create LoadbalancerPoolTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- loadbalancer_pool_name (str): Name of loadbalancer_pool
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- service_instance (list): list of :class:`ServiceInstance` type
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- service_appliance_set (list): list of :class:`ServiceApplianceSet` type
- loadbalancer_healthmonitor (list): list of :class:`LoadbalancerHealthmonitor` type
- loadbalancer_pool_properties (instance): instance of :class:`LoadbalancerPoolType`
- loadbalancer_pool_provider (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(LoadbalancerPoolTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not loadbalancer_pool_name:
- self._name = 'default-loadbalancer-pool'
- else:
- self._name = loadbalancer_pool_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if service_instance_refs:
- for ln in service_instance_refs:
- self.add_service_instance (ln)
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- if service_appliance_set_refs:
- for ln in service_appliance_set_refs:
- self.add_service_appliance_set (ln)
- if loadbalancer_healthmonitor_refs:
- for ln in loadbalancer_healthmonitor_refs:
- self.add_loadbalancer_healthmonitor (ln)
- self.loadbalancer_pool_properties = loadbalancer_pool_properties
- self.loadbalancer_pool_provider = loadbalancer_pool_provider
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_service_instances ():
- self.add_service_instance (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_service_appliance_sets ():
- self.add_service_appliance_set (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_loadbalancer_healthmonitors ():
- self.add_loadbalancer_healthmonitor (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_service_instance (self, lo, update_server = True, add_link = True):
- '''
- add :class:`ServiceInstance` link to :class:`LoadbalancerPool`
- Args:
- lo (:class:`ServiceInstance`): obj to link
- '''
- if self._obj:
- self._obj.add_service_instance (lo)
- if update_server:
- self._conn_drv.loadbalancer_pool_update (self._obj)
-
- if add_link:
- self.add_link('service_instance', cfixture.ConrtailLink('service_instance', 'loadbalancer_pool', 'service_instance', ['ref'], lo))
- #end add_service_instance_link
-
- def get_service_instances (self):
- return self.get_links ('service_instance')
- #end get_service_instances
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`LoadbalancerPool`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.loadbalancer_pool_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'loadbalancer_pool', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
- def add_service_appliance_set (self, lo, update_server = True, add_link = True):
- '''
- add :class:`ServiceApplianceSet` link to :class:`LoadbalancerPool`
- Args:
- lo (:class:`ServiceApplianceSet`): obj to link
- '''
- if self._obj:
- self._obj.add_service_appliance_set (lo)
- if update_server:
- self._conn_drv.loadbalancer_pool_update (self._obj)
-
- if add_link:
- self.add_link('service_appliance_set', cfixture.ConrtailLink('service_appliance_set', 'loadbalancer_pool', 'service_appliance_set', ['ref'], lo))
- #end add_service_appliance_set_link
-
- def get_service_appliance_sets (self):
- return self.get_links ('service_appliance_set')
- #end get_service_appliance_sets
- def add_loadbalancer_healthmonitor (self, lo, update_server = True, add_link = True):
- '''
- add :class:`LoadbalancerHealthmonitor` link to :class:`LoadbalancerPool`
- Args:
- lo (:class:`LoadbalancerHealthmonitor`): obj to link
- '''
- if self._obj:
- self._obj.add_loadbalancer_healthmonitor (lo)
- if update_server:
- self._conn_drv.loadbalancer_pool_update (self._obj)
-
- if add_link:
- self.add_link('loadbalancer_healthmonitor', cfixture.ConrtailLink('loadbalancer_healthmonitor', 'loadbalancer_pool', 'loadbalancer_healthmonitor', ['ref'], lo))
- #end add_loadbalancer_healthmonitor_link
-
- def get_loadbalancer_healthmonitors (self):
- return self.get_links ('loadbalancer_healthmonitor')
- #end get_loadbalancer_healthmonitors
-
- def populate (self):
- self._obj.set_loadbalancer_pool_properties(self.loadbalancer_pool_properties or vnc_api.gen.resource_xsd.LoadbalancerPoolType.populate())
- self._obj.set_loadbalancer_pool_provider(self.loadbalancer_pool_provider or GeneratedsSuper.populate_string("loadbalancer_pool_provider"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(LoadbalancerPoolTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.LoadbalancerPool(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.loadbalancer_pool_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.loadbalancer_pool_properties = self.loadbalancer_pool_properties
- self._obj.loadbalancer_pool_provider = self.loadbalancer_pool_provider
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.loadbalancer_pool_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.loadbalancer_pool_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_ip_back_refs():
- return
- if self._obj.get_loadbalancer_members():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_loadbalancer_pools():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.loadbalancer_pools.remove(child_obj)
- break
-
- self._conn_drv.loadbalancer_pool_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class LoadbalancerPoolTestFixtureGen
-
-class VirtualDnsRecordTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualDnsRecord`
- """
- def __init__(self, conn_drv, virtual_DNS_record_name=None, parent_fixt=None, auto_prop_val=False, virtual_DNS_record_data=None, id_perms=None, display_name=None):
- '''
- Create VirtualDnsRecordTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- virtual_DNS_record_name (str): Name of virtual_DNS_record
- parent_fixt (:class:`.VirtualDnsTestFixtureGen`): Parent fixture
- virtual_DNS_record_data (instance): instance of :class:`VirtualDnsRecordType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualDnsRecordTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_DNS_record_name:
- self._name = 'default-virtual-DNS-record'
- else:
- self._name = virtual_DNS_record_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.virtual_DNS_record_data = virtual_DNS_record_data
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_virtual_DNS_record_data(self.virtual_DNS_record_data or vnc_api.gen.resource_xsd.VirtualDnsRecordType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualDnsRecordTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(VirtualDnsTestFixtureGen(self._conn_drv, 'default-virtual-DNS'))
-
- self._obj = vnc_api.VirtualDnsRecord(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.virtual_DNS_record_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.virtual_DNS_record_data = self.virtual_DNS_record_data
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_DNS_record_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_DNS_record_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_virtual_DNS_records():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.virtual_DNS_records.remove(child_obj)
- break
-
- self._conn_drv.virtual_DNS_record_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualDnsRecordTestFixtureGen
-
-class RouteTargetTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.RouteTarget`
- """
- def __init__(self, conn_drv, route_target_name=None, auto_prop_val=False, id_perms=None, display_name=None):
- '''
- Create RouteTargetTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- %s_name (str): Name of %s
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(RouteTargetTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not route_target_name:
- self._name = 'default-route-target'
- else:
- self._name = route_target_name
- self._obj = None
- self._auto_prop_val = auto_prop_val
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(RouteTargetTestFixtureGen, self).setUp()
- self._obj = vnc_api.RouteTarget(self._name)
- try:
- self._obj = self._conn_drv.route_target_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.route_target_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.route_target_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_logical_router_back_refs() or self._obj.get_routing_instance_back_refs():
- return
- self._conn_drv.route_target_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class RouteTargetTestFixtureGen
-
-class FloatingIpTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.FloatingIp`
- """
- def __init__(self, conn_drv, floating_ip_name=None, parent_fixt=None, auto_prop_val=False, project_refs = None, virtual_machine_interface_refs = None, floating_ip_address=None, floating_ip_is_virtual_ip=None, floating_ip_fixed_ip_address=None, floating_ip_address_family=None, id_perms=None, display_name=None):
- '''
- Create FloatingIpTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- floating_ip_name (str): Name of floating_ip
- parent_fixt (:class:`.FloatingIpPoolTestFixtureGen`): Parent fixture
- project (list): list of :class:`Project` type
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- floating_ip_address (instance): instance of :class:`xsd:string`
- floating_ip_is_virtual_ip (instance): instance of :class:`xsd:boolean`
- floating_ip_fixed_ip_address (instance): instance of :class:`xsd:string`
- floating_ip_address_family (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(FloatingIpTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not floating_ip_name:
- self._name = 'default-floating-ip'
- else:
- self._name = floating_ip_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if project_refs:
- for ln in project_refs:
- self.add_project (ln)
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- self.floating_ip_address = floating_ip_address
- self.floating_ip_is_virtual_ip = floating_ip_is_virtual_ip
- self.floating_ip_fixed_ip_address = floating_ip_fixed_ip_address
- self.floating_ip_address_family = floating_ip_address_family
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_projects ():
- self.add_project (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_project (self, lo, update_server = True, add_link = True):
- '''
- add :class:`Project` link to :class:`FloatingIp`
- Args:
- lo (:class:`Project`): obj to link
- '''
- if self._obj:
- self._obj.add_project (lo)
- if update_server:
- self._conn_drv.floating_ip_update (self._obj)
-
- if add_link:
- self.add_link('project', cfixture.ConrtailLink('project', 'floating_ip', 'project', ['ref'], lo))
- #end add_project_link
-
- def get_projects (self):
- return self.get_links ('project')
- #end get_projects
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`FloatingIp`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.floating_ip_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'floating_ip', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
-
- def populate (self):
- self._obj.set_floating_ip_address(self.floating_ip_address or GeneratedsSuper.populate_string("floating_ip_address"))
- self._obj.set_floating_ip_is_virtual_ip(self.floating_ip_is_virtual_ip or GeneratedsSuper.populate_boolean("floating_ip_is_virtual_ip"))
- self._obj.set_floating_ip_fixed_ip_address(self.floating_ip_fixed_ip_address or GeneratedsSuper.populate_string("floating_ip_fixed_ip_address"))
- self._obj.set_floating_ip_address_family(self.floating_ip_address_family or GeneratedsSuper.populate_string("floating_ip_address_family"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(FloatingIpTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(FloatingIpPoolTestFixtureGen(self._conn_drv, 'default-floating-ip-pool'))
-
- self._obj = vnc_api.FloatingIp(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.floating_ip_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.floating_ip_address = self.floating_ip_address
- self._obj.floating_ip_is_virtual_ip = self.floating_ip_is_virtual_ip
- self._obj.floating_ip_fixed_ip_address = self.floating_ip_fixed_ip_address
- self._obj.floating_ip_address_family = self.floating_ip_address_family
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.floating_ip_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.floating_ip_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_customer_attachment_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_floating_ips():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.floating_ips.remove(child_obj)
- break
-
- self._conn_drv.floating_ip_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class FloatingIpTestFixtureGen
-
-class FloatingIpPoolTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.FloatingIpPool`
- """
- def __init__(self, conn_drv, floating_ip_pool_name=None, parent_fixt=None, auto_prop_val=False, floating_ip_pool_prefixes=None, id_perms=None, display_name=None):
- '''
- Create FloatingIpPoolTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- floating_ip_pool_name (str): Name of floating_ip_pool
- parent_fixt (:class:`.VirtualNetworkTestFixtureGen`): Parent fixture
- floating_ip_pool_prefixes (instance): instance of :class:`FloatingIpPoolType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(FloatingIpPoolTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not floating_ip_pool_name:
- self._name = 'default-floating-ip-pool'
- else:
- self._name = floating_ip_pool_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.floating_ip_pool_prefixes = floating_ip_pool_prefixes
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_floating_ip_pool_prefixes(self.floating_ip_pool_prefixes or vnc_api.gen.resource_xsd.FloatingIpPoolType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(FloatingIpPoolTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(VirtualNetworkTestFixtureGen(self._conn_drv, 'default-virtual-network'))
-
- self._obj = vnc_api.FloatingIpPool(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.floating_ip_pool_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.floating_ip_pool_prefixes = self.floating_ip_pool_prefixes
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.floating_ip_pool_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.floating_ip_pool_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_project_back_refs():
- return
- if self._obj.get_floating_ips():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_floating_ip_pools():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.floating_ip_pools.remove(child_obj)
- break
-
- self._conn_drv.floating_ip_pool_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class FloatingIpPoolTestFixtureGen
-
-class PhysicalRouterTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.PhysicalRouter`
- """
- def __init__(self, conn_drv, physical_router_name=None, parent_fixt=None, auto_prop_val=False, virtual_router_refs = None, bgp_router_refs = None, virtual_network_refs = None, physical_router_management_ip=None, physical_router_dataplane_ip=None, physical_router_vendor_name=None, physical_router_product_name=None, physical_router_vnc_managed=None, physical_router_user_credentials=None, physical_router_snmp_credentials=None, physical_router_junos_service_ports=None, id_perms=None, display_name=None):
- '''
- Create PhysicalRouterTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- physical_router_name (str): Name of physical_router
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- virtual_router (list): list of :class:`VirtualRouter` type
- bgp_router (list): list of :class:`BgpRouter` type
- virtual_network (list): list of :class:`VirtualNetwork` type
- physical_router_management_ip (instance): instance of :class:`xsd:string`
- physical_router_dataplane_ip (instance): instance of :class:`xsd:string`
- physical_router_vendor_name (instance): instance of :class:`xsd:string`
- physical_router_product_name (instance): instance of :class:`xsd:string`
- physical_router_vnc_managed (instance): instance of :class:`xsd:boolean`
- physical_router_user_credentials (instance): instance of :class:`UserCredentials`
- physical_router_snmp_credentials (instance): instance of :class:`SNMPCredentials`
- physical_router_junos_service_ports (instance): instance of :class:`JunosServicePorts`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(PhysicalRouterTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not physical_router_name:
- self._name = 'default-physical-router'
- else:
- self._name = physical_router_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if virtual_router_refs:
- for ln in virtual_router_refs:
- self.add_virtual_router (ln)
- if bgp_router_refs:
- for ln in bgp_router_refs:
- self.add_bgp_router (ln)
- if virtual_network_refs:
- for ln in virtual_network_refs:
- self.add_virtual_network (ln)
- self.physical_router_management_ip = physical_router_management_ip
- self.physical_router_dataplane_ip = physical_router_dataplane_ip
- self.physical_router_vendor_name = physical_router_vendor_name
- self.physical_router_product_name = physical_router_product_name
- self.physical_router_vnc_managed = physical_router_vnc_managed
- self.physical_router_user_credentials = physical_router_user_credentials
- self.physical_router_snmp_credentials = physical_router_snmp_credentials
- self.physical_router_junos_service_ports = physical_router_junos_service_ports
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_routers ():
- self.add_virtual_router (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_bgp_routers ():
- self.add_bgp_router (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_networks ():
- self.add_virtual_network (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_router (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualRouter` link to :class:`PhysicalRouter`
- Args:
- lo (:class:`VirtualRouter`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_router (lo)
- if update_server:
- self._conn_drv.physical_router_update (self._obj)
-
- if add_link:
- self.add_link('virtual_router', cfixture.ConrtailLink('virtual_router', 'physical_router', 'virtual_router', ['ref'], lo))
- #end add_virtual_router_link
-
- def get_virtual_routers (self):
- return self.get_links ('virtual_router')
- #end get_virtual_routers
- def add_bgp_router (self, lo, update_server = True, add_link = True):
- '''
- add :class:`BgpRouter` link to :class:`PhysicalRouter`
- Args:
- lo (:class:`BgpRouter`): obj to link
- '''
- if self._obj:
- self._obj.add_bgp_router (lo)
- if update_server:
- self._conn_drv.physical_router_update (self._obj)
-
- if add_link:
- self.add_link('bgp_router', cfixture.ConrtailLink('bgp_router', 'physical_router', 'bgp_router', ['ref'], lo))
- #end add_bgp_router_link
-
- def get_bgp_routers (self):
- return self.get_links ('bgp_router')
- #end get_bgp_routers
- def add_virtual_network (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualNetwork` link to :class:`PhysicalRouter`
- Args:
- lo (:class:`VirtualNetwork`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_network (lo)
- if update_server:
- self._conn_drv.physical_router_update (self._obj)
-
- if add_link:
- self.add_link('virtual_network', cfixture.ConrtailLink('virtual_network', 'physical_router', 'virtual_network', ['ref'], lo))
- #end add_virtual_network_link
-
- def get_virtual_networks (self):
- return self.get_links ('virtual_network')
- #end get_virtual_networks
-
- def populate (self):
- self._obj.set_physical_router_management_ip(self.physical_router_management_ip or GeneratedsSuper.populate_string("physical_router_management_ip"))
- self._obj.set_physical_router_dataplane_ip(self.physical_router_dataplane_ip or GeneratedsSuper.populate_string("physical_router_dataplane_ip"))
- self._obj.set_physical_router_vendor_name(self.physical_router_vendor_name or GeneratedsSuper.populate_string("physical_router_vendor_name"))
- self._obj.set_physical_router_product_name(self.physical_router_product_name or GeneratedsSuper.populate_string("physical_router_product_name"))
- self._obj.set_physical_router_vnc_managed(self.physical_router_vnc_managed or GeneratedsSuper.populate_boolean("physical_router_vnc_managed"))
- self._obj.set_physical_router_user_credentials(self.physical_router_user_credentials or [vnc_api.gen.resource_xsd.UserCredentials.populate()])
- self._obj.set_physical_router_snmp_credentials(self.physical_router_snmp_credentials or vnc_api.gen.resource_xsd.SNMPCredentials.populate())
- self._obj.set_physical_router_junos_service_ports(self.physical_router_junos_service_ports or vnc_api.gen.resource_xsd.JunosServicePorts.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(PhysicalRouterTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.PhysicalRouter(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.physical_router_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.physical_router_management_ip = self.physical_router_management_ip
- self._obj.physical_router_dataplane_ip = self.physical_router_dataplane_ip
- self._obj.physical_router_vendor_name = self.physical_router_vendor_name
- self._obj.physical_router_product_name = self.physical_router_product_name
- self._obj.physical_router_vnc_managed = self.physical_router_vnc_managed
- self._obj.physical_router_user_credentials = self.physical_router_user_credentials
- self._obj.physical_router_snmp_credentials = self.physical_router_snmp_credentials
- self._obj.physical_router_junos_service_ports = self.physical_router_junos_service_ports
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.physical_router_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.physical_router_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_physical_interfaces() or self._obj.get_logical_interfaces():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_physical_routers():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.physical_routers.remove(child_obj)
- break
-
- self._conn_drv.physical_router_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class PhysicalRouterTestFixtureGen
-
-class BgpRouterTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.BgpRouter`
- """
- def __init__(self, conn_drv, bgp_router_name=None, parent_fixt=None, auto_prop_val=False, bgp_router_ref_infos = None, bgp_router_parameters=None, id_perms=None, display_name=None):
- '''
- Create BgpRouterTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- bgp_router_name (str): Name of bgp_router
- parent_fixt (:class:`.RoutingInstanceTestFixtureGen`): Parent fixture
- bgp_router (list): list of tuple (:class:`BgpRouter`, :class: `BgpPeeringAttributes`) type
- bgp_router_parameters (instance): instance of :class:`BgpRouterParams`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(BgpRouterTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not bgp_router_name:
- self._name = 'default-bgp-router'
- else:
- self._name = bgp_router_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if bgp_router_ref_infos:
- for ln, ref in bgp_router_ref_infos:
- self.add_bgp_router (ln, ref)
- self.bgp_router_parameters = bgp_router_parameters
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_bgp_routers ():
- self.add_bgp_router (*ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_bgp_router (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`BgpRouter` link to :class:`BgpRouter`
- Args:
- lo (:class:`BgpRouter`): obj to link
- ref (:class:`BgpPeeringAttributes`): property of the link object
- '''
- if self._obj:
- self._obj.add_bgp_router (lo, ref)
- if update_server:
- self._conn_drv.bgp_router_update (self._obj)
-
- if add_link:
- self.add_link('bgp_router', cfixture.ConrtailLink('bgp_router', 'bgp_router', 'bgp_router', ['ref'], (lo, ref)))
- #end add_bgp_router_link
-
- def get_bgp_routers (self):
- return self.get_links ('bgp_router')
- #end get_bgp_routers
-
- def populate (self):
- self._obj.set_bgp_router_parameters(self.bgp_router_parameters or vnc_api.gen.resource_xsd.BgpRouterParams.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(BgpRouterTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(RoutingInstanceTestFixtureGen(self._conn_drv, 'default-routing-instance'))
-
- self._obj = vnc_api.BgpRouter(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.bgp_router_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.bgp_router_parameters = self.bgp_router_parameters
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.bgp_router_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.bgp_router_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_global_system_config_back_refs() or self._obj.get_physical_router_back_refs() or self._obj.get_virtual_router_back_refs() or self._obj.get_bgp_router_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_bgp_routers():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.bgp_routers.remove(child_obj)
- break
-
- self._conn_drv.bgp_router_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class BgpRouterTestFixtureGen
-
-class VirtualRouterTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualRouter`
- """
- def __init__(self, conn_drv, virtual_router_name=None, parent_fixt=None, auto_prop_val=False, bgp_router_refs = None, virtual_machine_refs = None, virtual_router_type=None, virtual_router_ip_address=None, id_perms=None, display_name=None):
- '''
- Create VirtualRouterTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- virtual_router_name (str): Name of virtual_router
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- bgp_router (list): list of :class:`BgpRouter` type
- virtual_machine (list): list of :class:`VirtualMachine` type
- virtual_router_type (instance): instance of :class:`xsd:string`
- virtual_router_ip_address (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualRouterTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_router_name:
- self._name = 'default-virtual-router'
- else:
- self._name = virtual_router_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if bgp_router_refs:
- for ln in bgp_router_refs:
- self.add_bgp_router (ln)
- if virtual_machine_refs:
- for ln in virtual_machine_refs:
- self.add_virtual_machine (ln)
- self.virtual_router_type = virtual_router_type
- self.virtual_router_ip_address = virtual_router_ip_address
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_bgp_routers ():
- self.add_bgp_router (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machines ():
- self.add_virtual_machine (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_bgp_router (self, lo, update_server = True, add_link = True):
- '''
- add :class:`BgpRouter` link to :class:`VirtualRouter`
- Args:
- lo (:class:`BgpRouter`): obj to link
- '''
- if self._obj:
- self._obj.add_bgp_router (lo)
- if update_server:
- self._conn_drv.virtual_router_update (self._obj)
-
- if add_link:
- self.add_link('bgp_router', cfixture.ConrtailLink('bgp_router', 'virtual_router', 'bgp_router', ['ref'], lo))
- #end add_bgp_router_link
-
- def get_bgp_routers (self):
- return self.get_links ('bgp_router')
- #end get_bgp_routers
- def add_virtual_machine (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachine` link to :class:`VirtualRouter`
- Args:
- lo (:class:`VirtualMachine`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine (lo)
- if update_server:
- self._conn_drv.virtual_router_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine', cfixture.ConrtailLink('virtual_machine', 'virtual_router', 'virtual_machine', ['ref'], lo))
- #end add_virtual_machine_link
-
- def get_virtual_machines (self):
- return self.get_links ('virtual_machine')
- #end get_virtual_machines
-
- def populate (self):
- self._obj.set_virtual_router_type(self.virtual_router_type or GeneratedsSuper.populate_string("virtual_router_type"))
- self._obj.set_virtual_router_ip_address(self.virtual_router_ip_address or GeneratedsSuper.populate_string("virtual_router_ip_address"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualRouterTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.VirtualRouter(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.virtual_router_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.virtual_router_type = self.virtual_router_type
- self._obj.virtual_router_ip_address = self.virtual_router_ip_address
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_router_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_router_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_physical_router_back_refs() or self._obj.get_provider_attachment_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_virtual_routers():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.virtual_routers.remove(child_obj)
- break
-
- self._conn_drv.virtual_router_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualRouterTestFixtureGen
-
-class SubnetTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.Subnet`
- """
- def __init__(self, conn_drv, subnet_name=None, auto_prop_val=False, virtual_machine_interface_refs = None, subnet_ip_prefix=None, id_perms=None, display_name=None):
- '''
- Create SubnetTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- %s_name (str): Name of %s
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- subnet_ip_prefix (instance): instance of :class:`SubnetType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(SubnetTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not subnet_name:
- self._name = 'default-subnet'
- else:
- self._name = subnet_name
- self._obj = None
- self._auto_prop_val = auto_prop_val
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- self.subnet_ip_prefix = subnet_ip_prefix
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`Subnet`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.subnet_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'subnet', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
-
- def populate (self):
- self._obj.set_subnet_ip_prefix(self.subnet_ip_prefix or vnc_api.gen.resource_xsd.SubnetType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(SubnetTestFixtureGen, self).setUp()
- self._obj = vnc_api.Subnet(self._name)
- try:
- self._obj = self._conn_drv.subnet_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.subnet_ip_prefix = self.subnet_ip_prefix
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.subnet_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.subnet_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- self._conn_drv.subnet_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class SubnetTestFixtureGen
-
-class GlobalSystemConfigTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.GlobalSystemConfig`
- """
- def __init__(self, conn_drv, global_system_config_name=None, parent_fixt=None, auto_prop_val=False, bgp_router_refs = None, autonomous_system=None, config_version=None, plugin_tuning=None, ibgp_auto_mesh=None, ip_fabric_subnets=None, id_perms=None, display_name=None):
- '''
- Create GlobalSystemConfigTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- global_system_config_name (str): Name of global_system_config
- parent_fixt (:class:`.ConfigRootTestFixtureGen`): Parent fixture
- bgp_router (list): list of :class:`BgpRouter` type
- autonomous_system (instance): instance of :class:`xsd:integer`
- config_version (instance): instance of :class:`xsd:string`
- plugin_tuning (instance): instance of :class:`PluginProperties`
- ibgp_auto_mesh (instance): instance of :class:`xsd:boolean`
- ip_fabric_subnets (instance): instance of :class:`SubnetListType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(GlobalSystemConfigTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not global_system_config_name:
- self._name = 'default-global-system-config'
- else:
- self._name = global_system_config_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if bgp_router_refs:
- for ln in bgp_router_refs:
- self.add_bgp_router (ln)
- self.autonomous_system = autonomous_system
- self.config_version = config_version
- self.plugin_tuning = plugin_tuning
- self.ibgp_auto_mesh = ibgp_auto_mesh
- self.ip_fabric_subnets = ip_fabric_subnets
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_bgp_routers ():
- self.add_bgp_router (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_bgp_router (self, lo, update_server = True, add_link = True):
- '''
- add :class:`BgpRouter` link to :class:`GlobalSystemConfig`
- Args:
- lo (:class:`BgpRouter`): obj to link
- '''
- if self._obj:
- self._obj.add_bgp_router (lo)
- if update_server:
- self._conn_drv.global_system_config_update (self._obj)
-
- if add_link:
- self.add_link('bgp_router', cfixture.ConrtailLink('bgp_router', 'global_system_config', 'bgp_router', ['ref'], lo))
- #end add_bgp_router_link
-
- def get_bgp_routers (self):
- return self.get_links ('bgp_router')
- #end get_bgp_routers
-
- def populate (self):
- self._obj.set_autonomous_system(self.autonomous_system or GeneratedsSuper.populate_integer("autonomous_system"))
- self._obj.set_config_version(self.config_version or GeneratedsSuper.populate_string("config_version"))
- self._obj.set_plugin_tuning(self.plugin_tuning or vnc_api.gen.resource_xsd.PluginProperties.populate())
- self._obj.set_ibgp_auto_mesh(self.ibgp_auto_mesh or GeneratedsSuper.populate_boolean("ibgp_auto_mesh"))
- self._obj.set_ip_fabric_subnets(self.ip_fabric_subnets or vnc_api.gen.resource_xsd.SubnetListType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(GlobalSystemConfigTestFixtureGen, self).setUp()
- # child of config-root
- self._obj = vnc_api.GlobalSystemConfig(self._name)
- try:
- self._obj = self._conn_drv.global_system_config_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.autonomous_system = self.autonomous_system
- self._obj.config_version = self.config_version
- self._obj.plugin_tuning = self.plugin_tuning
- self._obj.ibgp_auto_mesh = self.ibgp_auto_mesh
- self._obj.ip_fabric_subnets = self.ip_fabric_subnets
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.global_system_config_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.global_system_config_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_global_vrouter_configs() or self._obj.get_physical_routers() or self._obj.get_virtual_routers() or self._obj.get_config_nodes() or self._obj.get_analytics_nodes() or self._obj.get_database_nodes() or self._obj.get_service_appliance_sets():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_global_system_configs():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.global_system_configs.remove(child_obj)
- break
-
- self._conn_drv.global_system_config_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class GlobalSystemConfigTestFixtureGen
-
-class ServiceApplianceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.ServiceAppliance`
- """
- def __init__(self, conn_drv, service_appliance_name=None, parent_fixt=None, auto_prop_val=False, service_appliance_user_credentials=None, service_appliance_ip_address=None, service_appliance_properties=None, id_perms=None, display_name=None):
- '''
- Create ServiceApplianceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- service_appliance_name (str): Name of service_appliance
- parent_fixt (:class:`.ServiceApplianceSetTestFixtureGen`): Parent fixture
- service_appliance_user_credentials (instance): instance of :class:`UserCredentials`
- service_appliance_ip_address (instance): instance of :class:`xsd:string`
- service_appliance_properties (instance): instance of :class:`KeyValuePairs`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ServiceApplianceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not service_appliance_name:
- self._name = 'default-service-appliance'
- else:
- self._name = service_appliance_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.service_appliance_user_credentials = service_appliance_user_credentials
- self.service_appliance_ip_address = service_appliance_ip_address
- self.service_appliance_properties = service_appliance_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_service_appliance_user_credentials(self.service_appliance_user_credentials or [vnc_api.gen.resource_xsd.UserCredentials.populate()])
- self._obj.set_service_appliance_ip_address(self.service_appliance_ip_address or GeneratedsSuper.populate_string("service_appliance_ip_address"))
- self._obj.set_service_appliance_properties(self.service_appliance_properties or vnc_api.gen.resource_xsd.KeyValuePairs.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ServiceApplianceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ServiceApplianceSetTestFixtureGen(self._conn_drv, 'default-service-appliance-set'))
-
- self._obj = vnc_api.ServiceAppliance(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.service_appliance_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.service_appliance_user_credentials = self.service_appliance_user_credentials
- self._obj.service_appliance_ip_address = self.service_appliance_ip_address
- self._obj.service_appliance_properties = self.service_appliance_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.service_appliance_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.service_appliance_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_service_appliances():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.service_appliances.remove(child_obj)
- break
-
- self._conn_drv.service_appliance_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ServiceApplianceTestFixtureGen
-
-class ServiceInstanceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.ServiceInstance`
- """
- def __init__(self, conn_drv, service_instance_name=None, parent_fixt=None, auto_prop_val=False, service_template_refs = None, service_instance_properties=None, id_perms=None, display_name=None):
- '''
- Create ServiceInstanceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- service_instance_name (str): Name of service_instance
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- service_template (list): list of :class:`ServiceTemplate` type
- service_instance_properties (instance): instance of :class:`ServiceInstanceType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ServiceInstanceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not service_instance_name:
- self._name = 'default-service-instance'
- else:
- self._name = service_instance_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if service_template_refs:
- for ln in service_template_refs:
- self.add_service_template (ln)
- self.service_instance_properties = service_instance_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_service_templates ():
- self.add_service_template (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_service_template (self, lo, update_server = True, add_link = True):
- '''
- add :class:`ServiceTemplate` link to :class:`ServiceInstance`
- Args:
- lo (:class:`ServiceTemplate`): obj to link
- '''
- if self._obj:
- self._obj.add_service_template (lo)
- if update_server:
- self._conn_drv.service_instance_update (self._obj)
-
- if add_link:
- self.add_link('service_template', cfixture.ConrtailLink('service_template', 'service_instance', 'service_template', ['ref'], lo))
- #end add_service_template_link
-
- def get_service_templates (self):
- return self.get_links ('service_template')
- #end get_service_templates
-
- def populate (self):
- self._obj.set_service_instance_properties(self.service_instance_properties or vnc_api.gen.resource_xsd.ServiceInstanceType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ServiceInstanceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.ServiceInstance(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.service_instance_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.service_instance_properties = self.service_instance_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.service_instance_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.service_instance_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_logical_router_back_refs() or self._obj.get_loadbalancer_pool_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_service_instances():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.service_instances.remove(child_obj)
- break
-
- self._conn_drv.service_instance_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ServiceInstanceTestFixtureGen
-
-class NamespaceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.Namespace`
- """
- def __init__(self, conn_drv, namespace_name=None, parent_fixt=None, auto_prop_val=False, namespace_cidr=None, id_perms=None, display_name=None):
- '''
- Create NamespaceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- namespace_name (str): Name of namespace
- parent_fixt (:class:`.DomainTestFixtureGen`): Parent fixture
- namespace_cidr (instance): instance of :class:`SubnetType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(NamespaceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not namespace_name:
- self._name = 'default-namespace'
- else:
- self._name = namespace_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.namespace_cidr = namespace_cidr
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_namespace_cidr(self.namespace_cidr or vnc_api.gen.resource_xsd.SubnetType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(NamespaceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(DomainTestFixtureGen(self._conn_drv, 'default-domain'))
-
- self._obj = vnc_api.Namespace(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.namespace_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.namespace_cidr = self.namespace_cidr
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.namespace_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.namespace_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_project_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_namespaces():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.namespaces.remove(child_obj)
- break
-
- self._conn_drv.namespace_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class NamespaceTestFixtureGen
-
-class LogicalInterfaceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.LogicalInterface`
- """
- def __init__(self, conn_drv, logical_interface_name=None, parent_fixt=None, auto_prop_val=False, virtual_machine_interface_refs = None, logical_interface_vlan_tag=None, logical_interface_type=None, id_perms=None, display_name=None):
- '''
- Create LogicalInterfaceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- logical_interface_name (str): Name of logical_interface
- parent_fixt (:class:`.PhysicalRouterTestFixtureGen`): Parent fixture
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- logical_interface_vlan_tag (instance): instance of :class:`xsd:integer`
- logical_interface_type (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(LogicalInterfaceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not logical_interface_name:
- self._name = 'default-logical-interface'
- else:
- self._name = logical_interface_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- self.logical_interface_vlan_tag = logical_interface_vlan_tag
- self.logical_interface_type = logical_interface_type
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`LogicalInterface`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.logical_interface_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'logical_interface', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
-
- def populate (self):
- self._obj.set_logical_interface_vlan_tag(self.logical_interface_vlan_tag or GeneratedsSuper.populate_integer("logical_interface_vlan_tag"))
- self._obj.set_logical_interface_type(self.logical_interface_type or GeneratedsSuper.populate_string("logical_interface_type"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(LogicalInterfaceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- raise AmbiguousParentError("[[u'default-global-system-config', u'default-physical-router'], [u'default-global-system-config', u'default-physical-router', u'default-physical-interface']]")
-
- self._obj = vnc_api.LogicalInterface(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.logical_interface_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.logical_interface_vlan_tag = self.logical_interface_vlan_tag
- self._obj.logical_interface_type = self.logical_interface_type
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.logical_interface_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.logical_interface_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_logical_interfaces():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.logical_interfaces.remove(child_obj)
- break
-
- self._conn_drv.logical_interface_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class LogicalInterfaceTestFixtureGen
-
-class RouteTableTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.RouteTable`
- """
- def __init__(self, conn_drv, route_table_name=None, parent_fixt=None, auto_prop_val=False, routes=None, id_perms=None, display_name=None):
- '''
- Create RouteTableTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- route_table_name (str): Name of route_table
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- routes (instance): instance of :class:`RouteTableType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(RouteTableTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not route_table_name:
- self._name = 'default-route-table'
- else:
- self._name = route_table_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.routes = routes
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_routes(self.routes or vnc_api.gen.resource_xsd.RouteTableType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(RouteTableTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.RouteTable(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.route_table_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.routes = self.routes
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.route_table_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.route_table_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_network_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_route_tables():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.route_tables.remove(child_obj)
- break
-
- self._conn_drv.route_table_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class RouteTableTestFixtureGen
-
-class PhysicalInterfaceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.PhysicalInterface`
- """
- def __init__(self, conn_drv, physical_interface_name=None, parent_fixt=None, auto_prop_val=False, id_perms=None, display_name=None):
- '''
- Create PhysicalInterfaceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- physical_interface_name (str): Name of physical_interface
- parent_fixt (:class:`.PhysicalRouterTestFixtureGen`): Parent fixture
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(PhysicalInterfaceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not physical_interface_name:
- self._name = 'default-physical-interface'
- else:
- self._name = physical_interface_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(PhysicalInterfaceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(PhysicalRouterTestFixtureGen(self._conn_drv, 'default-physical-router'))
-
- self._obj = vnc_api.PhysicalInterface(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.physical_interface_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.physical_interface_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.physical_interface_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_logical_interfaces():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_physical_interfaces():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.physical_interfaces.remove(child_obj)
- break
-
- self._conn_drv.physical_interface_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class PhysicalInterfaceTestFixtureGen
-
-class AccessControlListTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.AccessControlList`
- """
- def __init__(self, conn_drv, access_control_list_name=None, parent_fixt=None, auto_prop_val=False, access_control_list_entries=None, id_perms=None, display_name=None):
- '''
- Create AccessControlListTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- access_control_list_name (str): Name of access_control_list
- parent_fixt (:class:`.VirtualNetworkTestFixtureGen`): Parent fixture
- access_control_list_entries (instance): instance of :class:`AclEntriesType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(AccessControlListTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not access_control_list_name:
- self._name = 'default-access-control-list'
- else:
- self._name = access_control_list_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.access_control_list_entries = access_control_list_entries
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_access_control_list_entries(self.access_control_list_entries or vnc_api.gen.resource_xsd.AclEntriesType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(AccessControlListTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- raise AmbiguousParentError("[[u'default-domain', u'default-project', u'default-virtual-network'], [u'default-domain', u'default-project', u'default-security-group']]")
-
- self._obj = vnc_api.AccessControlList(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.access_control_list_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.access_control_list_entries = self.access_control_list_entries
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.access_control_list_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.access_control_list_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_access_control_lists():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.access_control_lists.remove(child_obj)
- break
-
- self._conn_drv.access_control_list_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class AccessControlListTestFixtureGen
-
-class AnalyticsNodeTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.AnalyticsNode`
- """
- def __init__(self, conn_drv, analytics_node_name=None, parent_fixt=None, auto_prop_val=False, analytics_node_ip_address=None, id_perms=None, display_name=None):
- '''
- Create AnalyticsNodeTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- analytics_node_name (str): Name of analytics_node
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- analytics_node_ip_address (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(AnalyticsNodeTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not analytics_node_name:
- self._name = 'default-analytics-node'
- else:
- self._name = analytics_node_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.analytics_node_ip_address = analytics_node_ip_address
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_analytics_node_ip_address(self.analytics_node_ip_address or GeneratedsSuper.populate_string("analytics_node_ip_address"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(AnalyticsNodeTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.AnalyticsNode(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.analytics_node_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.analytics_node_ip_address = self.analytics_node_ip_address
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.analytics_node_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.analytics_node_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_analytics_nodes():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.analytics_nodes.remove(child_obj)
- break
-
- self._conn_drv.analytics_node_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class AnalyticsNodeTestFixtureGen
-
-class VirtualDnsTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualDns`
- """
- def __init__(self, conn_drv, virtual_DNS_name=None, parent_fixt=None, auto_prop_val=False, virtual_DNS_data=None, id_perms=None, display_name=None):
- '''
- Create VirtualDnsTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- virtual_DNS_name (str): Name of virtual_DNS
- parent_fixt (:class:`.DomainTestFixtureGen`): Parent fixture
- virtual_DNS_data (instance): instance of :class:`VirtualDnsType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualDnsTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_DNS_name:
- self._name = 'default-virtual-DNS'
- else:
- self._name = virtual_DNS_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.virtual_DNS_data = virtual_DNS_data
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_virtual_DNS_data(self.virtual_DNS_data or vnc_api.gen.resource_xsd.VirtualDnsType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualDnsTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(DomainTestFixtureGen(self._conn_drv, 'default-domain'))
-
- self._obj = vnc_api.VirtualDns(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.virtual_DNS_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.virtual_DNS_data = self.virtual_DNS_data
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_DNS_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_DNS_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_network_ipam_back_refs():
- return
- if self._obj.get_virtual_DNS_records():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_virtual_DNSs():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.virtual_DNSs.remove(child_obj)
- break
-
- self._conn_drv.virtual_DNS_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualDnsTestFixtureGen
-
-class CustomerAttachmentTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.CustomerAttachment`
- """
- def __init__(self, conn_drv, customer_attachment_name=None, auto_prop_val=False, virtual_machine_interface_refs = None, floating_ip_refs = None, attachment_address=None, id_perms=None, display_name=None):
- '''
- Create CustomerAttachmentTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- %s_name (str): Name of %s
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- floating_ip (list): list of :class:`FloatingIp` type
- attachment_address (instance): instance of :class:`AttachmentAddressType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(CustomerAttachmentTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not customer_attachment_name:
- self._name = 'default-customer-attachment'
- else:
- self._name = customer_attachment_name
- self._obj = None
- self._auto_prop_val = auto_prop_val
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- if floating_ip_refs:
- for ln in floating_ip_refs:
- self.add_floating_ip (ln)
- self.attachment_address = attachment_address
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_floating_ips ():
- self.add_floating_ip (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`CustomerAttachment`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.customer_attachment_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'customer_attachment', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
- def add_floating_ip (self, lo, update_server = True, add_link = True):
- '''
- add :class:`FloatingIp` link to :class:`CustomerAttachment`
- Args:
- lo (:class:`FloatingIp`): obj to link
- '''
- if self._obj:
- self._obj.add_floating_ip (lo)
- if update_server:
- self._conn_drv.customer_attachment_update (self._obj)
-
- if add_link:
- self.add_link('floating_ip', cfixture.ConrtailLink('floating_ip', 'customer_attachment', 'floating_ip', ['ref'], lo))
- #end add_floating_ip_link
-
- def get_floating_ips (self):
- return self.get_links ('floating_ip')
- #end get_floating_ips
-
- def populate (self):
- self._obj.set_attachment_address(self.attachment_address or vnc_api.gen.resource_xsd.AttachmentAddressType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(CustomerAttachmentTestFixtureGen, self).setUp()
- self._obj = vnc_api.CustomerAttachment(self._name)
- try:
- self._obj = self._conn_drv.customer_attachment_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.attachment_address = self.attachment_address
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.customer_attachment_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.customer_attachment_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- self._conn_drv.customer_attachment_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class CustomerAttachmentTestFixtureGen
-
-class ServiceApplianceSetTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.ServiceApplianceSet`
- """
- def __init__(self, conn_drv, service_appliance_set_name=None, parent_fixt=None, auto_prop_val=False, service_appliance_set_properties=None, service_appliance_driver=None, service_appliance_ha_mode=None, id_perms=None, display_name=None):
- '''
- Create ServiceApplianceSetTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- service_appliance_set_name (str): Name of service_appliance_set
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- service_appliance_set_properties (instance): instance of :class:`KeyValuePairs`
- service_appliance_driver (instance): instance of :class:`xsd:string`
- service_appliance_ha_mode (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ServiceApplianceSetTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not service_appliance_set_name:
- self._name = 'default-service-appliance-set'
- else:
- self._name = service_appliance_set_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.service_appliance_set_properties = service_appliance_set_properties
- self.service_appliance_driver = service_appliance_driver
- self.service_appliance_ha_mode = service_appliance_ha_mode
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_service_appliance_set_properties(self.service_appliance_set_properties or vnc_api.gen.resource_xsd.KeyValuePairs.populate())
- self._obj.set_service_appliance_driver(self.service_appliance_driver or GeneratedsSuper.populate_string("service_appliance_driver"))
- self._obj.set_service_appliance_ha_mode(self.service_appliance_ha_mode or GeneratedsSuper.populate_string("service_appliance_ha_mode"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ServiceApplianceSetTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.ServiceApplianceSet(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.service_appliance_set_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.service_appliance_set_properties = self.service_appliance_set_properties
- self._obj.service_appliance_driver = self.service_appliance_driver
- self._obj.service_appliance_ha_mode = self.service_appliance_ha_mode
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.service_appliance_set_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.service_appliance_set_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_loadbalancer_pool_back_refs():
- return
- if self._obj.get_service_appliances():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_service_appliance_sets():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.service_appliance_sets.remove(child_obj)
- break
-
- self._conn_drv.service_appliance_set_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ServiceApplianceSetTestFixtureGen
-
-class ConfigNodeTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.ConfigNode`
- """
- def __init__(self, conn_drv, config_node_name=None, parent_fixt=None, auto_prop_val=False, config_node_ip_address=None, id_perms=None, display_name=None):
- '''
- Create ConfigNodeTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- config_node_name (str): Name of config_node
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- config_node_ip_address (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ConfigNodeTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not config_node_name:
- self._name = 'default-config-node'
- else:
- self._name = config_node_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.config_node_ip_address = config_node_ip_address
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_config_node_ip_address(self.config_node_ip_address or GeneratedsSuper.populate_string("config_node_ip_address"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ConfigNodeTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.ConfigNode(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.config_node_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.config_node_ip_address = self.config_node_ip_address
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.config_node_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.config_node_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_config_nodes():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.config_nodes.remove(child_obj)
- break
-
- self._conn_drv.config_node_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ConfigNodeTestFixtureGen
-
-class QosQueueTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.QosQueue`
- """
- def __init__(self, conn_drv, qos_queue_name=None, parent_fixt=None, auto_prop_val=False, min_bandwidth=None, max_bandwidth=None, id_perms=None, display_name=None):
- '''
- Create QosQueueTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- qos_queue_name (str): Name of qos_queue
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- min_bandwidth (instance): instance of :class:`xsd:integer`
- max_bandwidth (instance): instance of :class:`xsd:integer`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(QosQueueTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not qos_queue_name:
- self._name = 'default-qos-queue'
- else:
- self._name = qos_queue_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.min_bandwidth = min_bandwidth
- self.max_bandwidth = max_bandwidth
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_min_bandwidth(self.min_bandwidth or GeneratedsSuper.populate_integer("min_bandwidth"))
- self._obj.set_max_bandwidth(self.max_bandwidth or GeneratedsSuper.populate_integer("max_bandwidth"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(QosQueueTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.QosQueue(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.qos_queue_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.min_bandwidth = self.min_bandwidth
- self._obj.max_bandwidth = self.max_bandwidth
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.qos_queue_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.qos_queue_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_qos_forwarding_class_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_qos_queues():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.qos_queues.remove(child_obj)
- break
-
- self._conn_drv.qos_queue_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class QosQueueTestFixtureGen
-
-class VirtualMachineTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualMachine`
- """
- def __init__(self, conn_drv, virtual_machine_name=None, auto_prop_val=False, service_instance_refs = None, id_perms=None, display_name=None):
- '''
- Create VirtualMachineTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- %s_name (str): Name of %s
- service_instance (list): list of :class:`ServiceInstance` type
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualMachineTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_machine_name:
- self._name = 'default-virtual-machine'
- else:
- self._name = virtual_machine_name
- self._obj = None
- self._auto_prop_val = auto_prop_val
- if service_instance_refs:
- for ln in service_instance_refs:
- self.add_service_instance (ln)
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_service_instances ():
- self.add_service_instance (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_service_instance (self, lo, update_server = True, add_link = True):
- '''
- add :class:`ServiceInstance` link to :class:`VirtualMachine`
- Args:
- lo (:class:`ServiceInstance`): obj to link
- '''
- if self._obj:
- self._obj.add_service_instance (lo)
- if update_server:
- self._conn_drv.virtual_machine_update (self._obj)
-
- if add_link:
- self.add_link('service_instance', cfixture.ConrtailLink('service_instance', 'virtual_machine', 'service_instance', ['ref', 'derived'], lo))
- #end add_service_instance_link
-
- def get_service_instances (self):
- return self.get_links ('service_instance')
- #end get_service_instances
-
- def populate (self):
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualMachineTestFixtureGen, self).setUp()
- self._obj = vnc_api.VirtualMachine(self._name)
- try:
- self._obj = self._conn_drv.virtual_machine_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_machine_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_machine_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_machine_interface_back_refs() or self._obj.get_virtual_router_back_refs():
- return
- if self._obj.get_virtual_machine_interfaces():
- return
- self._conn_drv.virtual_machine_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualMachineTestFixtureGen
-
-class InterfaceRouteTableTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.InterfaceRouteTable`
- """
- def __init__(self, conn_drv, interface_route_table_name=None, parent_fixt=None, auto_prop_val=False, interface_route_table_routes=None, id_perms=None, display_name=None):
- '''
- Create InterfaceRouteTableTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- interface_route_table_name (str): Name of interface_route_table
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- interface_route_table_routes (instance): instance of :class:`RouteTableType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(InterfaceRouteTableTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not interface_route_table_name:
- self._name = 'default-interface-route-table'
- else:
- self._name = interface_route_table_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.interface_route_table_routes = interface_route_table_routes
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_interface_route_table_routes(self.interface_route_table_routes or vnc_api.gen.resource_xsd.RouteTableType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(InterfaceRouteTableTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.InterfaceRouteTable(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.interface_route_table_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.interface_route_table_routes = self.interface_route_table_routes
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.interface_route_table_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.interface_route_table_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_machine_interface_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_interface_route_tables():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.interface_route_tables.remove(child_obj)
- break
-
- self._conn_drv.interface_route_table_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class InterfaceRouteTableTestFixtureGen
-
-class ServiceTemplateTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.ServiceTemplate`
- """
- def __init__(self, conn_drv, service_template_name=None, parent_fixt=None, auto_prop_val=False, service_template_properties=None, id_perms=None, display_name=None):
- '''
- Create ServiceTemplateTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- service_template_name (str): Name of service_template
- parent_fixt (:class:`.DomainTestFixtureGen`): Parent fixture
- service_template_properties (instance): instance of :class:`ServiceTemplateType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ServiceTemplateTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not service_template_name:
- self._name = 'default-service-template'
- else:
- self._name = service_template_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.service_template_properties = service_template_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_service_template_properties(self.service_template_properties or vnc_api.gen.resource_xsd.ServiceTemplateType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ServiceTemplateTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(DomainTestFixtureGen(self._conn_drv, 'default-domain'))
-
- self._obj = vnc_api.ServiceTemplate(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.service_template_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.service_template_properties = self.service_template_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.service_template_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.service_template_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_service_instance_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_service_templates():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.service_templates.remove(child_obj)
- break
-
- self._conn_drv.service_template_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ServiceTemplateTestFixtureGen
-
-class VirtualIpTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualIp`
- """
- def __init__(self, conn_drv, virtual_ip_name=None, parent_fixt=None, auto_prop_val=False, loadbalancer_pool_refs = None, virtual_machine_interface_refs = None, virtual_ip_properties=None, id_perms=None, display_name=None):
- '''
- Create VirtualIpTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- virtual_ip_name (str): Name of virtual_ip
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- loadbalancer_pool (list): list of :class:`LoadbalancerPool` type
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- virtual_ip_properties (instance): instance of :class:`VirtualIpType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualIpTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_ip_name:
- self._name = 'default-virtual-ip'
- else:
- self._name = virtual_ip_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if loadbalancer_pool_refs:
- for ln in loadbalancer_pool_refs:
- self.add_loadbalancer_pool (ln)
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- self.virtual_ip_properties = virtual_ip_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_loadbalancer_pools ():
- self.add_loadbalancer_pool (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_loadbalancer_pool (self, lo, update_server = True, add_link = True):
- '''
- add :class:`LoadbalancerPool` link to :class:`VirtualIp`
- Args:
- lo (:class:`LoadbalancerPool`): obj to link
- '''
- if self._obj:
- self._obj.add_loadbalancer_pool (lo)
- if update_server:
- self._conn_drv.virtual_ip_update (self._obj)
-
- if add_link:
- self.add_link('loadbalancer_pool', cfixture.ConrtailLink('loadbalancer_pool', 'virtual_ip', 'loadbalancer_pool', ['ref'], lo))
- #end add_loadbalancer_pool_link
-
- def get_loadbalancer_pools (self):
- return self.get_links ('loadbalancer_pool')
- #end get_loadbalancer_pools
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`VirtualIp`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.virtual_ip_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'virtual_ip', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
-
- def populate (self):
- self._obj.set_virtual_ip_properties(self.virtual_ip_properties or vnc_api.gen.resource_xsd.VirtualIpType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualIpTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.VirtualIp(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.virtual_ip_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.virtual_ip_properties = self.virtual_ip_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_ip_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_ip_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_virtual_ips():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.virtual_ips.remove(child_obj)
- break
-
- self._conn_drv.virtual_ip_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualIpTestFixtureGen
-
-class LoadbalancerMemberTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.LoadbalancerMember`
- """
- def __init__(self, conn_drv, loadbalancer_member_name=None, parent_fixt=None, auto_prop_val=False, loadbalancer_member_properties=None, id_perms=None, display_name=None):
- '''
- Create LoadbalancerMemberTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- loadbalancer_member_name (str): Name of loadbalancer_member
- parent_fixt (:class:`.LoadbalancerPoolTestFixtureGen`): Parent fixture
- loadbalancer_member_properties (instance): instance of :class:`LoadbalancerMemberType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(LoadbalancerMemberTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not loadbalancer_member_name:
- self._name = 'default-loadbalancer-member'
- else:
- self._name = loadbalancer_member_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.loadbalancer_member_properties = loadbalancer_member_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_loadbalancer_member_properties(self.loadbalancer_member_properties or vnc_api.gen.resource_xsd.LoadbalancerMemberType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(LoadbalancerMemberTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(LoadbalancerPoolTestFixtureGen(self._conn_drv, 'default-loadbalancer-pool'))
-
- self._obj = vnc_api.LoadbalancerMember(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.loadbalancer_member_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.loadbalancer_member_properties = self.loadbalancer_member_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.loadbalancer_member_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.loadbalancer_member_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_loadbalancer_members():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.loadbalancer_members.remove(child_obj)
- break
-
- self._conn_drv.loadbalancer_member_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class LoadbalancerMemberTestFixtureGen
-
-class SecurityGroupTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.SecurityGroup`
- """
- def __init__(self, conn_drv, security_group_name=None, parent_fixt=None, auto_prop_val=False, security_group_id=None, configured_security_group_id=None, security_group_entries=None, id_perms=None, display_name=None):
- '''
- Create SecurityGroupTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- security_group_name (str): Name of security_group
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- security_group_id (instance): instance of :class:`xsd:string`
- configured_security_group_id (instance): instance of :class:`xsd:integer`
- security_group_entries (instance): instance of :class:`PolicyEntriesType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(SecurityGroupTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not security_group_name:
- self._name = 'default-security-group'
- else:
- self._name = security_group_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.security_group_id = security_group_id
- self.configured_security_group_id = configured_security_group_id
- self.security_group_entries = security_group_entries
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_security_group_id(self.security_group_id or GeneratedsSuper.populate_string("security_group_id"))
- self._obj.set_configured_security_group_id(self.configured_security_group_id or GeneratedsSuper.populate_integer("configured_security_group_id"))
- self._obj.set_security_group_entries(self.security_group_entries or vnc_api.gen.resource_xsd.PolicyEntriesType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(SecurityGroupTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.SecurityGroup(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.security_group_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.security_group_id = self.security_group_id
- self._obj.configured_security_group_id = self.configured_security_group_id
- self._obj.security_group_entries = self.security_group_entries
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.security_group_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.security_group_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_machine_interface_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_security_groups():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.security_groups.remove(child_obj)
- break
-
- self._conn_drv.security_group_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class SecurityGroupTestFixtureGen
-
-class ProviderAttachmentTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.ProviderAttachment`
- """
- def __init__(self, conn_drv, provider_attachment_name=None, auto_prop_val=False, virtual_router_refs = None, id_perms=None, display_name=None):
- '''
- Create ProviderAttachmentTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- %s_name (str): Name of %s
- virtual_router (list): list of :class:`VirtualRouter` type
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ProviderAttachmentTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not provider_attachment_name:
- self._name = 'default-provider-attachment'
- else:
- self._name = provider_attachment_name
- self._obj = None
- self._auto_prop_val = auto_prop_val
- if virtual_router_refs:
- for ln in virtual_router_refs:
- self.add_virtual_router (ln)
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_routers ():
- self.add_virtual_router (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_router (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualRouter` link to :class:`ProviderAttachment`
- Args:
- lo (:class:`VirtualRouter`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_router (lo)
- if update_server:
- self._conn_drv.provider_attachment_update (self._obj)
-
- if add_link:
- self.add_link('virtual_router', cfixture.ConrtailLink('virtual_router', 'provider_attachment', 'virtual_router', ['ref'], lo))
- #end add_virtual_router_link
-
- def get_virtual_routers (self):
- return self.get_links ('virtual_router')
- #end get_virtual_routers
-
- def populate (self):
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ProviderAttachmentTestFixtureGen, self).setUp()
- self._obj = vnc_api.ProviderAttachment(self._name)
- try:
- self._obj = self._conn_drv.provider_attachment_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.provider_attachment_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.provider_attachment_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- self._conn_drv.provider_attachment_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ProviderAttachmentTestFixtureGen
-
-class VirtualMachineInterfaceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualMachineInterface`
- """
- def __init__(self, conn_drv, virtual_machine_interface_name=None, parent_fixt=None, auto_prop_val=False, qos_forwarding_class_refs = None, security_group_refs = None, virtual_machine_interface_refs = None, virtual_machine_refs = None, virtual_network_refs = None, routing_instance_ref_infos = None, interface_route_table_refs = None, virtual_machine_interface_mac_addresses=None, virtual_machine_interface_dhcp_option_list=None, virtual_machine_interface_host_routes=None, virtual_machine_interface_allowed_address_pairs=None, vrf_assign_table=None, virtual_machine_interface_device_owner=None, virtual_machine_interface_properties=None, id_perms=None, display_name=None):
- '''
- Create VirtualMachineInterfaceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- virtual_machine_interface_name (str): Name of virtual_machine_interface
- parent_fixt (:class:`.VirtualMachineTestFixtureGen`): Parent fixture
- qos_forwarding_class (list): list of :class:`QosForwardingClass` type
- security_group (list): list of :class:`SecurityGroup` type
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- virtual_machine (list): list of :class:`VirtualMachine` type
- virtual_network (list): list of :class:`VirtualNetwork` type
- routing_instance (list): list of tuple (:class:`RoutingInstance`, :class: `PolicyBasedForwardingRuleType`) type
- interface_route_table (list): list of :class:`InterfaceRouteTable` type
- virtual_machine_interface_mac_addresses (instance): instance of :class:`MacAddressesType`
- virtual_machine_interface_dhcp_option_list (instance): instance of :class:`DhcpOptionsListType`
- virtual_machine_interface_host_routes (instance): instance of :class:`RouteTableType`
- virtual_machine_interface_allowed_address_pairs (instance): instance of :class:`AllowedAddressPairs`
- vrf_assign_table (instance): instance of :class:`VrfAssignTableType`
- virtual_machine_interface_device_owner (instance): instance of :class:`xsd:string`
- virtual_machine_interface_properties (instance): instance of :class:`VirtualMachineInterfacePropertiesType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualMachineInterfaceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_machine_interface_name:
- self._name = 'default-virtual-machine-interface'
- else:
- self._name = virtual_machine_interface_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if qos_forwarding_class_refs:
- for ln in qos_forwarding_class_refs:
- self.add_qos_forwarding_class (ln)
- if security_group_refs:
- for ln in security_group_refs:
- self.add_security_group (ln)
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- if virtual_machine_refs:
- for ln in virtual_machine_refs:
- self.add_virtual_machine (ln)
- if virtual_network_refs:
- for ln in virtual_network_refs:
- self.add_virtual_network (ln)
- if routing_instance_ref_infos:
- for ln, ref in routing_instance_ref_infos:
- self.add_routing_instance (ln, ref)
- if interface_route_table_refs:
- for ln in interface_route_table_refs:
- self.add_interface_route_table (ln)
- self.virtual_machine_interface_mac_addresses = virtual_machine_interface_mac_addresses
- self.virtual_machine_interface_dhcp_option_list = virtual_machine_interface_dhcp_option_list
- self.virtual_machine_interface_host_routes = virtual_machine_interface_host_routes
- self.virtual_machine_interface_allowed_address_pairs = virtual_machine_interface_allowed_address_pairs
- self.vrf_assign_table = vrf_assign_table
- self.virtual_machine_interface_device_owner = virtual_machine_interface_device_owner
- self.virtual_machine_interface_properties = virtual_machine_interface_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_qos_forwarding_classs ():
- self.add_qos_forwarding_class (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_security_groups ():
- self.add_security_group (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_machines ():
- self.add_virtual_machine (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_networks ():
- self.add_virtual_network (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_routing_instances ():
- self.add_routing_instance (*ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_interface_route_tables ():
- self.add_interface_route_table (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_qos_forwarding_class (self, lo, update_server = True, add_link = True):
- '''
- add :class:`QosForwardingClass` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`QosForwardingClass`): obj to link
- '''
- if self._obj:
- self._obj.add_qos_forwarding_class (lo)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('qos_forwarding_class', cfixture.ConrtailLink('qos_forwarding_class', 'virtual_machine_interface', 'qos_forwarding_class', ['ref'], lo))
- #end add_qos_forwarding_class_link
-
- def get_qos_forwarding_classs (self):
- return self.get_links ('qos_forwarding_class')
- #end get_qos_forwarding_classs
- def add_security_group (self, lo, update_server = True, add_link = True):
- '''
- add :class:`SecurityGroup` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`SecurityGroup`): obj to link
- '''
- if self._obj:
- self._obj.add_security_group (lo)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('security_group', cfixture.ConrtailLink('security_group', 'virtual_machine_interface', 'security_group', ['ref'], lo))
- #end add_security_group_link
-
- def get_security_groups (self):
- return self.get_links ('security_group')
- #end get_security_groups
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'virtual_machine_interface', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
- def add_virtual_machine (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachine` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`VirtualMachine`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine (lo)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine', cfixture.ConrtailLink('virtual_machine', 'virtual_machine_interface', 'virtual_machine', ['ref'], lo))
- #end add_virtual_machine_link
-
- def get_virtual_machines (self):
- return self.get_links ('virtual_machine')
- #end get_virtual_machines
- def add_virtual_network (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualNetwork` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`VirtualNetwork`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_network (lo)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('virtual_network', cfixture.ConrtailLink('virtual_network', 'virtual_machine_interface', 'virtual_network', ['ref'], lo))
- #end add_virtual_network_link
-
- def get_virtual_networks (self):
- return self.get_links ('virtual_network')
- #end get_virtual_networks
- def add_routing_instance (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`RoutingInstance` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`RoutingInstance`): obj to link
- ref (:class:`PolicyBasedForwardingRuleType`): property of the link object
- '''
- if self._obj:
- self._obj.add_routing_instance (lo, ref)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('routing_instance', cfixture.ConrtailLink('routing_instance', 'virtual_machine_interface', 'routing_instance', ['ref'], (lo, ref)))
- #end add_routing_instance_link
-
- def get_routing_instances (self):
- return self.get_links ('routing_instance')
- #end get_routing_instances
- def add_interface_route_table (self, lo, update_server = True, add_link = True):
- '''
- add :class:`InterfaceRouteTable` link to :class:`VirtualMachineInterface`
- Args:
- lo (:class:`InterfaceRouteTable`): obj to link
- '''
- if self._obj:
- self._obj.add_interface_route_table (lo)
- if update_server:
- self._conn_drv.virtual_machine_interface_update (self._obj)
-
- if add_link:
- self.add_link('interface_route_table', cfixture.ConrtailLink('interface_route_table', 'virtual_machine_interface', 'interface_route_table', ['ref'], lo))
- #end add_interface_route_table_link
-
- def get_interface_route_tables (self):
- return self.get_links ('interface_route_table')
- #end get_interface_route_tables
-
- def populate (self):
- self._obj.set_virtual_machine_interface_mac_addresses(self.virtual_machine_interface_mac_addresses or vnc_api.gen.resource_xsd.MacAddressesType.populate())
- self._obj.set_virtual_machine_interface_dhcp_option_list(self.virtual_machine_interface_dhcp_option_list or vnc_api.gen.resource_xsd.DhcpOptionsListType.populate())
- self._obj.set_virtual_machine_interface_host_routes(self.virtual_machine_interface_host_routes or vnc_api.gen.resource_xsd.RouteTableType.populate())
- self._obj.set_virtual_machine_interface_allowed_address_pairs(self.virtual_machine_interface_allowed_address_pairs or vnc_api.gen.resource_xsd.AllowedAddressPairs.populate())
- self._obj.set_vrf_assign_table(self.vrf_assign_table or vnc_api.gen.resource_xsd.VrfAssignTableType.populate())
- self._obj.set_virtual_machine_interface_device_owner(self.virtual_machine_interface_device_owner or GeneratedsSuper.populate_string("virtual_machine_interface_device_owner"))
- self._obj.set_virtual_machine_interface_properties(self.virtual_machine_interface_properties or vnc_api.gen.resource_xsd.VirtualMachineInterfacePropertiesType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualMachineInterfaceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- raise AmbiguousParentError("[[u'default-virtual-machine'], [u'default-domain', u'default-project']]")
-
- self._obj = vnc_api.VirtualMachineInterface(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.virtual_machine_interface_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.virtual_machine_interface_mac_addresses = self.virtual_machine_interface_mac_addresses
- self._obj.virtual_machine_interface_dhcp_option_list = self.virtual_machine_interface_dhcp_option_list
- self._obj.virtual_machine_interface_host_routes = self.virtual_machine_interface_host_routes
- self._obj.virtual_machine_interface_allowed_address_pairs = self.virtual_machine_interface_allowed_address_pairs
- self._obj.vrf_assign_table = self.vrf_assign_table
- self._obj.virtual_machine_interface_device_owner = self.virtual_machine_interface_device_owner
- self._obj.virtual_machine_interface_properties = self.virtual_machine_interface_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_machine_interface_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_machine_interface_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_machine_interface_back_refs() or self._obj.get_instance_ip_back_refs() or self._obj.get_subnet_back_refs() or self._obj.get_floating_ip_back_refs() or self._obj.get_logical_interface_back_refs() or self._obj.get_customer_attachment_back_refs() or self._obj.get_logical_router_back_refs() or self._obj.get_loadbalancer_pool_back_refs() or self._obj.get_virtual_ip_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_virtual_machine_interfaces():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.virtual_machine_interfaces.remove(child_obj)
- break
-
- self._conn_drv.virtual_machine_interface_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualMachineInterfaceTestFixtureGen
-
-class LoadbalancerHealthmonitorTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.LoadbalancerHealthmonitor`
- """
- def __init__(self, conn_drv, loadbalancer_healthmonitor_name=None, parent_fixt=None, auto_prop_val=False, loadbalancer_healthmonitor_properties=None, id_perms=None, display_name=None):
- '''
- Create LoadbalancerHealthmonitorTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- loadbalancer_healthmonitor_name (str): Name of loadbalancer_healthmonitor
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- loadbalancer_healthmonitor_properties (instance): instance of :class:`LoadbalancerHealthmonitorType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(LoadbalancerHealthmonitorTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not loadbalancer_healthmonitor_name:
- self._name = 'default-loadbalancer-healthmonitor'
- else:
- self._name = loadbalancer_healthmonitor_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.loadbalancer_healthmonitor_properties = loadbalancer_healthmonitor_properties
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_loadbalancer_healthmonitor_properties(self.loadbalancer_healthmonitor_properties or vnc_api.gen.resource_xsd.LoadbalancerHealthmonitorType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(LoadbalancerHealthmonitorTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.LoadbalancerHealthmonitor(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.loadbalancer_healthmonitor_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.loadbalancer_healthmonitor_properties = self.loadbalancer_healthmonitor_properties
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.loadbalancer_healthmonitor_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.loadbalancer_healthmonitor_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_loadbalancer_pool_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_loadbalancer_healthmonitors():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.loadbalancer_healthmonitors.remove(child_obj)
- break
-
- self._conn_drv.loadbalancer_healthmonitor_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class LoadbalancerHealthmonitorTestFixtureGen
-
-class VirtualNetworkTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.VirtualNetwork`
- """
- def __init__(self, conn_drv, virtual_network_name=None, parent_fixt=None, auto_prop_val=False, qos_forwarding_class_refs = None, network_ipam_ref_infos = None, network_policy_ref_infos = None, route_table_refs = None, virtual_network_properties=None, virtual_network_network_id=None, route_target_list=None, router_external=None, is_shared=None, external_ipam=None, flood_unknown_unicast=None, id_perms=None, display_name=None):
- '''
- Create VirtualNetworkTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- virtual_network_name (str): Name of virtual_network
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- qos_forwarding_class (list): list of :class:`QosForwardingClass` type
- network_ipam (list): list of tuple (:class:`NetworkIpam`, :class: `VnSubnetsType`) type
- network_policy (list): list of tuple (:class:`NetworkPolicy`, :class: `VirtualNetworkPolicyType`) type
- route_table (list): list of :class:`RouteTable` type
- virtual_network_properties (instance): instance of :class:`VirtualNetworkType`
- virtual_network_network_id (instance): instance of :class:`xsd:integer`
- route_target_list (instance): instance of :class:`RouteTargetList`
- router_external (instance): instance of :class:`xsd:boolean`
- is_shared (instance): instance of :class:`xsd:boolean`
- external_ipam (instance): instance of :class:`xsd:boolean`
- flood_unknown_unicast (instance): instance of :class:`xsd:boolean`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(VirtualNetworkTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not virtual_network_name:
- self._name = 'default-virtual-network'
- else:
- self._name = virtual_network_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if qos_forwarding_class_refs:
- for ln in qos_forwarding_class_refs:
- self.add_qos_forwarding_class (ln)
- if network_ipam_ref_infos:
- for ln, ref in network_ipam_ref_infos:
- self.add_network_ipam (ln, ref)
- if network_policy_ref_infos:
- for ln, ref in network_policy_ref_infos:
- self.add_network_policy (ln, ref)
- if route_table_refs:
- for ln in route_table_refs:
- self.add_route_table (ln)
- self.virtual_network_properties = virtual_network_properties
- self.virtual_network_network_id = virtual_network_network_id
- self.route_target_list = route_target_list
- self.router_external = router_external
- self.is_shared = is_shared
- self.external_ipam = external_ipam
- self.flood_unknown_unicast = flood_unknown_unicast
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_qos_forwarding_classs ():
- self.add_qos_forwarding_class (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_network_ipams ():
- self.add_network_ipam (*ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_network_policys ():
- self.add_network_policy (*ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_route_tables ():
- self.add_route_table (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_qos_forwarding_class (self, lo, update_server = True, add_link = True):
- '''
- add :class:`QosForwardingClass` link to :class:`VirtualNetwork`
- Args:
- lo (:class:`QosForwardingClass`): obj to link
- '''
- if self._obj:
- self._obj.add_qos_forwarding_class (lo)
- if update_server:
- self._conn_drv.virtual_network_update (self._obj)
-
- if add_link:
- self.add_link('qos_forwarding_class', cfixture.ConrtailLink('qos_forwarding_class', 'virtual_network', 'qos_forwarding_class', ['ref'], lo))
- #end add_qos_forwarding_class_link
-
- def get_qos_forwarding_classs (self):
- return self.get_links ('qos_forwarding_class')
- #end get_qos_forwarding_classs
- def add_network_ipam (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`NetworkIpam` link to :class:`VirtualNetwork`
- Args:
- lo (:class:`NetworkIpam`): obj to link
- ref (:class:`VnSubnetsType`): property of the link object
- '''
- if self._obj:
- self._obj.add_network_ipam (lo, ref)
- if update_server:
- self._conn_drv.virtual_network_update (self._obj)
-
- if add_link:
- self.add_link('network_ipam', cfixture.ConrtailLink('network_ipam', 'virtual_network', 'network_ipam', ['ref'], (lo, ref)))
- #end add_network_ipam_link
-
- def get_network_ipams (self):
- return self.get_links ('network_ipam')
- #end get_network_ipams
- def add_network_policy (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`NetworkPolicy` link to :class:`VirtualNetwork`
- Args:
- lo (:class:`NetworkPolicy`): obj to link
- ref (:class:`VirtualNetworkPolicyType`): property of the link object
- '''
- if self._obj:
- self._obj.add_network_policy (lo, ref)
- if update_server:
- self._conn_drv.virtual_network_update (self._obj)
-
- if add_link:
- self.add_link('network_policy', cfixture.ConrtailLink('network_policy', 'virtual_network', 'network_policy', ['ref'], (lo, ref)))
- #end add_network_policy_link
-
- def get_network_policys (self):
- return self.get_links ('network_policy')
- #end get_network_policys
- def add_route_table (self, lo, update_server = True, add_link = True):
- '''
- add :class:`RouteTable` link to :class:`VirtualNetwork`
- Args:
- lo (:class:`RouteTable`): obj to link
- '''
- if self._obj:
- self._obj.add_route_table (lo)
- if update_server:
- self._conn_drv.virtual_network_update (self._obj)
-
- if add_link:
- self.add_link('route_table', cfixture.ConrtailLink('route_table', 'virtual_network', 'route_table', ['ref'], lo))
- #end add_route_table_link
-
- def get_route_tables (self):
- return self.get_links ('route_table')
- #end get_route_tables
-
- def populate (self):
- self._obj.set_virtual_network_properties(self.virtual_network_properties or vnc_api.gen.resource_xsd.VirtualNetworkType.populate())
- self._obj.set_virtual_network_network_id(self.virtual_network_network_id or GeneratedsSuper.populate_integer("virtual_network_network_id"))
- self._obj.set_route_target_list(self.route_target_list or vnc_api.gen.resource_xsd.RouteTargetList.populate())
- self._obj.set_router_external(self.router_external or GeneratedsSuper.populate_boolean("router_external"))
- self._obj.set_is_shared(self.is_shared or GeneratedsSuper.populate_boolean("is_shared"))
- self._obj.set_external_ipam(self.external_ipam or GeneratedsSuper.populate_boolean("external_ipam"))
- self._obj.set_flood_unknown_unicast(self.flood_unknown_unicast or GeneratedsSuper.populate_boolean("flood_unknown_unicast"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(VirtualNetworkTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.VirtualNetwork(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.virtual_network_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.virtual_network_properties = self.virtual_network_properties
- self._obj.virtual_network_network_id = self.virtual_network_network_id
- self._obj.route_target_list = self.route_target_list
- self._obj.router_external = self.router_external
- self._obj.is_shared = self.is_shared
- self._obj.external_ipam = self.external_ipam
- self._obj.flood_unknown_unicast = self.flood_unknown_unicast
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.virtual_network_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.virtual_network_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_machine_interface_back_refs() or self._obj.get_instance_ip_back_refs() or self._obj.get_physical_router_back_refs() or self._obj.get_logical_router_back_refs():
- return
- if self._obj.get_floating_ip_pools():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_virtual_networks():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.virtual_networks.remove(child_obj)
- break
-
- self._conn_drv.virtual_network_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class VirtualNetworkTestFixtureGen
-
-class ProjectTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.Project`
- """
- def __init__(self, conn_drv, project_name=None, parent_fixt=None, auto_prop_val=False, namespace_ref_infos = None, floating_ip_pool_refs = None, quota=None, id_perms=None, display_name=None):
- '''
- Create ProjectTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- project_name (str): Name of project
- parent_fixt (:class:`.DomainTestFixtureGen`): Parent fixture
- namespace (list): list of tuple (:class:`Namespace`, :class: `SubnetType`) type
- floating_ip_pool (list): list of :class:`FloatingIpPool` type
- quota (instance): instance of :class:`QuotaType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(ProjectTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not project_name:
- self._name = 'default-project'
- else:
- self._name = project_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if namespace_ref_infos:
- for ln, ref in namespace_ref_infos:
- self.add_namespace (ln, ref)
- if floating_ip_pool_refs:
- for ln in floating_ip_pool_refs:
- self.add_floating_ip_pool (ln)
- self.quota = quota
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_namespaces ():
- self.add_namespace (*ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_floating_ip_pools ():
- self.add_floating_ip_pool (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_namespace (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`Namespace` link to :class:`Project`
- Args:
- lo (:class:`Namespace`): obj to link
- ref (:class:`SubnetType`): property of the link object
- '''
- if self._obj:
- self._obj.add_namespace (lo, ref)
- if update_server:
- self._conn_drv.project_update (self._obj)
-
- if add_link:
- self.add_link('namespace', cfixture.ConrtailLink('namespace', 'project', 'namespace', ['ref'], (lo, ref)))
- #end add_namespace_link
-
- def get_namespaces (self):
- return self.get_links ('namespace')
- #end get_namespaces
- def add_floating_ip_pool (self, lo, update_server = True, add_link = True):
- '''
- add :class:`FloatingIpPool` link to :class:`Project`
- Args:
- lo (:class:`FloatingIpPool`): obj to link
- '''
- if self._obj:
- self._obj.add_floating_ip_pool (lo)
- if update_server:
- self._conn_drv.project_update (self._obj)
-
- if add_link:
- self.add_link('floating_ip_pool', cfixture.ConrtailLink('floating_ip_pool', 'project', 'floating_ip_pool', ['ref'], lo))
- #end add_floating_ip_pool_link
-
- def get_floating_ip_pools (self):
- return self.get_links ('floating_ip_pool')
- #end get_floating_ip_pools
-
- def populate (self):
- self._obj.set_quota(self.quota or vnc_api.gen.resource_xsd.QuotaType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(ProjectTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(DomainTestFixtureGen(self._conn_drv, 'default-domain'))
-
- self._obj = vnc_api.Project(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.project_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.quota = self.quota
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.project_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.project_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_floating_ip_back_refs():
- return
- if self._obj.get_security_groups() or self._obj.get_virtual_networks() or self._obj.get_qos_queues() or self._obj.get_qos_forwarding_classs() or self._obj.get_network_ipams() or self._obj.get_network_policys() or self._obj.get_virtual_machine_interfaces() or self._obj.get_service_instances() or self._obj.get_route_tables() or self._obj.get_interface_route_tables() or self._obj.get_logical_routers() or self._obj.get_loadbalancer_pools() or self._obj.get_loadbalancer_healthmonitors() or self._obj.get_virtual_ips():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_projects():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.projects.remove(child_obj)
- break
-
- self._conn_drv.project_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class ProjectTestFixtureGen
-
-class QosForwardingClassTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.QosForwardingClass`
- """
- def __init__(self, conn_drv, qos_forwarding_class_name=None, parent_fixt=None, auto_prop_val=False, qos_queue_refs = None, dscp=None, trusted=None, id_perms=None, display_name=None):
- '''
- Create QosForwardingClassTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- qos_forwarding_class_name (str): Name of qos_forwarding_class
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- qos_queue (list): list of :class:`QosQueue` type
- dscp (instance): instance of :class:`xsd:integer`
- trusted (instance): instance of :class:`xsd:boolean`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(QosForwardingClassTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not qos_forwarding_class_name:
- self._name = 'default-qos-forwarding-class'
- else:
- self._name = qos_forwarding_class_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if qos_queue_refs:
- for ln in qos_queue_refs:
- self.add_qos_queue (ln)
- self.dscp = dscp
- self.trusted = trusted
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_qos_queues ():
- self.add_qos_queue (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_qos_queue (self, lo, update_server = True, add_link = True):
- '''
- add :class:`QosQueue` link to :class:`QosForwardingClass`
- Args:
- lo (:class:`QosQueue`): obj to link
- '''
- if self._obj:
- self._obj.add_qos_queue (lo)
- if update_server:
- self._conn_drv.qos_forwarding_class_update (self._obj)
-
- if add_link:
- self.add_link('qos_queue', cfixture.ConrtailLink('qos_queue', 'qos_forwarding_class', 'qos_queue', ['ref'], lo))
- #end add_qos_queue_link
-
- def get_qos_queues (self):
- return self.get_links ('qos_queue')
- #end get_qos_queues
-
- def populate (self):
- self._obj.set_dscp(self.dscp or GeneratedsSuper.populate_integer("dscp"))
- self._obj.set_trusted(self.trusted or GeneratedsSuper.populate_boolean("trusted"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(QosForwardingClassTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.QosForwardingClass(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.qos_forwarding_class_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.dscp = self.dscp
- self._obj.trusted = self.trusted
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.qos_forwarding_class_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.qos_forwarding_class_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_network_back_refs() or self._obj.get_virtual_machine_interface_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_qos_forwarding_classs():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.qos_forwarding_classs.remove(child_obj)
- break
-
- self._conn_drv.qos_forwarding_class_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class QosForwardingClassTestFixtureGen
-
-class DatabaseNodeTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.DatabaseNode`
- """
- def __init__(self, conn_drv, database_node_name=None, parent_fixt=None, auto_prop_val=False, database_node_ip_address=None, id_perms=None, display_name=None):
- '''
- Create DatabaseNodeTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- database_node_name (str): Name of database_node
- parent_fixt (:class:`.GlobalSystemConfigTestFixtureGen`): Parent fixture
- database_node_ip_address (instance): instance of :class:`xsd:string`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(DatabaseNodeTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not database_node_name:
- self._name = 'default-database-node'
- else:
- self._name = database_node_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- self.database_node_ip_address = database_node_ip_address
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- return None
- #end _update_links
-
-
- def populate (self):
- self._obj.set_database_node_ip_address(self.database_node_ip_address or GeneratedsSuper.populate_string("database_node_ip_address"))
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(DatabaseNodeTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(GlobalSystemConfigTestFixtureGen(self._conn_drv, 'default-global-system-config'))
-
- self._obj = vnc_api.DatabaseNode(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.database_node_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.database_node_ip_address = self.database_node_ip_address
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.database_node_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.database_node_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_database_nodes():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.database_nodes.remove(child_obj)
- break
-
- self._conn_drv.database_node_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class DatabaseNodeTestFixtureGen
-
-class RoutingInstanceTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.RoutingInstance`
- """
- def __init__(self, conn_drv, routing_instance_name=None, parent_fixt=None, auto_prop_val=False, routing_instance_ref_infos = None, route_target_ref_infos = None, service_chain_information=None, routing_instance_is_default=None, static_route_entries=None, default_ce_protocol=None, id_perms=None, display_name=None):
- '''
- Create RoutingInstanceTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- routing_instance_name (str): Name of routing_instance
- parent_fixt (:class:`.VirtualNetworkTestFixtureGen`): Parent fixture
- routing_instance (list): list of tuple (:class:`RoutingInstance`, :class: `ConnectionType`) type
- route_target (list): list of tuple (:class:`RouteTarget`, :class: `InstanceTargetType`) type
- service_chain_information (instance): instance of :class:`ServiceChainInfo`
- routing_instance_is_default (instance): instance of :class:`xsd:boolean`
- static_route_entries (instance): instance of :class:`StaticRouteEntriesType`
- default_ce_protocol (instance): instance of :class:`DefaultProtocolType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(RoutingInstanceTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not routing_instance_name:
- self._name = 'default-routing-instance'
- else:
- self._name = routing_instance_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if routing_instance_ref_infos:
- for ln, ref in routing_instance_ref_infos:
- self.add_routing_instance (ln, ref)
- if route_target_ref_infos:
- for ln, ref in route_target_ref_infos:
- self.add_route_target (ln, ref)
- self.service_chain_information = service_chain_information
- self.routing_instance_is_default = routing_instance_is_default
- self.static_route_entries = static_route_entries
- self.default_ce_protocol = default_ce_protocol
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_routing_instances ():
- self.add_routing_instance (*ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_route_targets ():
- self.add_route_target (*ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_routing_instance (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`RoutingInstance` link to :class:`RoutingInstance`
- Args:
- lo (:class:`RoutingInstance`): obj to link
- ref (:class:`ConnectionType`): property of the link object
- '''
- if self._obj:
- self._obj.add_routing_instance (lo, ref)
- if update_server:
- self._conn_drv.routing_instance_update (self._obj)
-
- if add_link:
- self.add_link('routing_instance', cfixture.ConrtailLink('routing_instance', 'routing_instance', 'routing_instance', ['ref'], (lo, ref)))
- #end add_routing_instance_link
-
- def get_routing_instances (self):
- return self.get_links ('routing_instance')
- #end get_routing_instances
- def add_route_target (self, lo, ref, update_server = True, add_link = True):
- '''
- add :class:`RouteTarget` link to :class:`RoutingInstance`
- Args:
- lo (:class:`RouteTarget`): obj to link
- ref (:class:`InstanceTargetType`): property of the link object
- '''
- if self._obj:
- self._obj.add_route_target (lo, ref)
- if update_server:
- self._conn_drv.routing_instance_update (self._obj)
-
- if add_link:
- self.add_link('route_target', cfixture.ConrtailLink('route_target', 'routing_instance', 'route_target', ['ref'], (lo, ref)))
- #end add_route_target_link
-
- def get_route_targets (self):
- return self.get_links ('route_target')
- #end get_route_targets
-
- def populate (self):
- self._obj.set_service_chain_information(self.service_chain_information or vnc_api.gen.resource_xsd.ServiceChainInfo.populate())
- self._obj.set_routing_instance_is_default(self.routing_instance_is_default or GeneratedsSuper.populate_boolean("routing_instance_is_default"))
- self._obj.set_static_route_entries(self.static_route_entries or vnc_api.gen.resource_xsd.StaticRouteEntriesType.populate())
- self._obj.set_default_ce_protocol(self.default_ce_protocol or vnc_api.gen.resource_xsd.DefaultProtocolType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(RoutingInstanceTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(VirtualNetworkTestFixtureGen(self._conn_drv, 'default-virtual-network'))
-
- self._obj = vnc_api.RoutingInstance(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.routing_instance_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.service_chain_information = self.service_chain_information
- self._obj.routing_instance_is_default = self.routing_instance_is_default
- self._obj.static_route_entries = self.static_route_entries
- self._obj.default_ce_protocol = self.default_ce_protocol
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.routing_instance_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.routing_instance_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_machine_interface_back_refs() or self._obj.get_routing_instance_back_refs():
- return
- if self._obj.get_bgp_routers():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_routing_instances():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.routing_instances.remove(child_obj)
- break
-
- self._conn_drv.routing_instance_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class RoutingInstanceTestFixtureGen
-
-class NetworkIpamTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.NetworkIpam`
- """
- def __init__(self, conn_drv, network_ipam_name=None, parent_fixt=None, auto_prop_val=False, virtual_DNS_refs = None, network_ipam_mgmt=None, id_perms=None, display_name=None):
- '''
- Create NetworkIpamTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- network_ipam_name (str): Name of network_ipam
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- virtual_DNS (list): list of :class:`VirtualDns` type
- network_ipam_mgmt (instance): instance of :class:`IpamType`
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(NetworkIpamTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not network_ipam_name:
- self._name = 'default-network-ipam'
- else:
- self._name = network_ipam_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if virtual_DNS_refs:
- for ln in virtual_DNS_refs:
- self.add_virtual_DNS (ln)
- self.network_ipam_mgmt = network_ipam_mgmt
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_DNSs ():
- self.add_virtual_DNS (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_DNS (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualDns` link to :class:`NetworkIpam`
- Args:
- lo (:class:`VirtualDns`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_DNS (lo)
- if update_server:
- self._conn_drv.network_ipam_update (self._obj)
-
- if add_link:
- self.add_link('virtual_DNS', cfixture.ConrtailLink('virtual_DNS', 'network_ipam', 'virtual_DNS', ['ref'], lo))
- #end add_virtual_DNS_link
-
- def get_virtual_DNSs (self):
- return self.get_links ('virtual_DNS')
- #end get_virtual_DNSs
-
- def populate (self):
- self._obj.set_network_ipam_mgmt(self.network_ipam_mgmt or vnc_api.gen.resource_xsd.IpamType.populate())
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(NetworkIpamTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.NetworkIpam(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.network_ipam_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.network_ipam_mgmt = self.network_ipam_mgmt
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.network_ipam_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.network_ipam_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- if self._obj.get_virtual_network_back_refs():
- return
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_network_ipams():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.network_ipams.remove(child_obj)
- break
-
- self._conn_drv.network_ipam_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class NetworkIpamTestFixtureGen
-
-class LogicalRouterTestFixtureGen(cfixture.ContrailFixture):
- """
- Fixture for :class:`.LogicalRouter`
- """
- def __init__(self, conn_drv, logical_router_name=None, parent_fixt=None, auto_prop_val=False, virtual_machine_interface_refs = None, route_target_refs = None, virtual_network_refs = None, service_instance_refs = None, id_perms=None, display_name=None):
- '''
- Create LogicalRouterTestFixtureGen object
-
- constructor
-
- Args:
- conn_drv (:class:`ConnectionDriver`): connection driver (eg. :class:`vnc_api.vnc_api.VncApi`, :class:`novaclient.client.Client`, etc)
-
- Kwargs:
- logical_router_name (str): Name of logical_router
- parent_fixt (:class:`.ProjectTestFixtureGen`): Parent fixture
- virtual_machine_interface (list): list of :class:`VirtualMachineInterface` type
- route_target (list): list of :class:`RouteTarget` type
- virtual_network (list): list of :class:`VirtualNetwork` type
- service_instance (list): list of :class:`ServiceInstance` type
- id_perms (instance): instance of :class:`IdPermsType`
- display_name (instance): instance of :class:`xsd:string`
-
- '''
- super(LogicalRouterTestFixtureGen, self).__init__()
- self._conn_drv = conn_drv
- if not logical_router_name:
- self._name = 'default-logical-router'
- else:
- self._name = logical_router_name
- self._obj = None
- self._parent_fixt = parent_fixt
- self._auto_prop_val = auto_prop_val
- if virtual_machine_interface_refs:
- for ln in virtual_machine_interface_refs:
- self.add_virtual_machine_interface (ln)
- if route_target_refs:
- for ln in route_target_refs:
- self.add_route_target (ln)
- if virtual_network_refs:
- for ln in virtual_network_refs:
- self.add_virtual_network (ln)
- if service_instance_refs:
- for ln in service_instance_refs:
- self.add_service_instance (ln)
- self.id_perms = id_perms
- self.display_name = display_name
- #end __init__
-
- def _update_links (self, update_server):
- for ln in self.get_virtual_machine_interfaces ():
- self.add_virtual_machine_interface (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_route_targets ():
- self.add_route_target (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_virtual_networks ():
- self.add_virtual_network (ln.fixture (), update_server = update_server, add_link = False)
- for ln in self.get_service_instances ():
- self.add_service_instance (ln.fixture (), update_server = update_server, add_link = False)
- return None
- #end _update_links
-
- def add_virtual_machine_interface (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualMachineInterface` link to :class:`LogicalRouter`
- Args:
- lo (:class:`VirtualMachineInterface`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_machine_interface (lo)
- if update_server:
- self._conn_drv.logical_router_update (self._obj)
-
- if add_link:
- self.add_link('virtual_machine_interface', cfixture.ConrtailLink('virtual_machine_interface', 'logical_router', 'virtual_machine_interface', ['ref'], lo))
- #end add_virtual_machine_interface_link
-
- def get_virtual_machine_interfaces (self):
- return self.get_links ('virtual_machine_interface')
- #end get_virtual_machine_interfaces
- def add_route_target (self, lo, update_server = True, add_link = True):
- '''
- add :class:`RouteTarget` link to :class:`LogicalRouter`
- Args:
- lo (:class:`RouteTarget`): obj to link
- '''
- if self._obj:
- self._obj.add_route_target (lo)
- if update_server:
- self._conn_drv.logical_router_update (self._obj)
-
- if add_link:
- self.add_link('route_target', cfixture.ConrtailLink('route_target', 'logical_router', 'route_target', ['ref'], lo))
- #end add_route_target_link
-
- def get_route_targets (self):
- return self.get_links ('route_target')
- #end get_route_targets
- def add_virtual_network (self, lo, update_server = True, add_link = True):
- '''
- add :class:`VirtualNetwork` link to :class:`LogicalRouter`
- Args:
- lo (:class:`VirtualNetwork`): obj to link
- '''
- if self._obj:
- self._obj.add_virtual_network (lo)
- if update_server:
- self._conn_drv.logical_router_update (self._obj)
-
- if add_link:
- self.add_link('virtual_network', cfixture.ConrtailLink('virtual_network', 'logical_router', 'virtual_network', ['ref'], lo))
- #end add_virtual_network_link
-
- def get_virtual_networks (self):
- return self.get_links ('virtual_network')
- #end get_virtual_networks
- def add_service_instance (self, lo, update_server = True, add_link = True):
- '''
- add :class:`ServiceInstance` link to :class:`LogicalRouter`
- Args:
- lo (:class:`ServiceInstance`): obj to link
- '''
- if self._obj:
- self._obj.add_service_instance (lo)
- if update_server:
- self._conn_drv.logical_router_update (self._obj)
-
- if add_link:
- self.add_link('service_instance', cfixture.ConrtailLink('service_instance', 'logical_router', 'service_instance', ['ref'], lo))
- #end add_service_instance_link
-
- def get_service_instances (self):
- return self.get_links ('service_instance')
- #end get_service_instances
-
- def populate (self):
- self._obj.set_id_perms(self.id_perms or vnc_api.gen.resource_xsd.IdPermsType.populate())
- self._obj.set_display_name(self.display_name or GeneratedsSuper.populate_string("display_name"))
- #end populate
-
- def setUp(self):
- super(LogicalRouterTestFixtureGen, self).setUp()
- if not self._parent_fixt:
- self._parent_fixt = self.useFixture(ProjectTestFixtureGen(self._conn_drv, 'default-project'))
-
- self._obj = vnc_api.LogicalRouter(self._name, self._parent_fixt.getObj ())
- try:
- self._obj = self._conn_drv.logical_router_read (fq_name=self._obj.get_fq_name())
- self._update_links (update_server=True)
- except NoIdError:
- self._update_links (update_server=False)
- if self._auto_prop_val:
- self.populate ()
- else:
- self._obj.id_perms = self.id_perms
- self._obj.display_name = self.display_name
- self._conn_drv.logical_router_create(self._obj)
- # read back for server allocated values
- self._obj = self._conn_drv.logical_router_read(id = self._obj.uuid)
- #end setUp
-
- def cleanUp(self):
- parent_fixt = getattr(self, '_parent_fixt', None)
- if parent_fixt:
- # non config-root child
- parent_obj = self._parent_fixt.getObj()
- # remove child from parent obj
- for child_obj in parent_obj.get_logical_routers():
- if type(child_obj) == dict:
- child_uuid = child_obj['uuid']
- else:
- child_uuid = child_obj.uuid
- if child_uuid == self._obj.uuid:
- parent_obj.logical_routers.remove(child_obj)
- break
-
- self._conn_drv.logical_router_delete(id = self._obj.uuid)
- #end cleanUp
-
- def getObj(self):
- return self._obj
- #end getObj
-
-#end class LogicalRouterTestFixtureGen
-