# Copyright 2014 Huawei Technologies Co. Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Compass api client library. """ import json import logging import requests class Client(object): """compass restful api wrapper""" def __init__(self, url, headers=None, proxies=None, stream=None): logging.info('create api client %s', url) self.url_ = url self.session_ = requests.Session() if headers: self.session_.headers.update(headers) self.session_.headers.update({ 'Accept': 'application/json' }) if proxies is not None: self.session_.proxies = proxies if stream is not None: self.session_.stream = stream def __del__(self): self.session_.close() @classmethod def _get_response(cls, resp): response_object = {} try: response_object = resp.json() except Exception as error: logging.error('failed to load object from %s: %s', resp.url, resp.content) logging.exception(error) response_object['status'] = 'Json Parsing Failed' response_object['message'] = resp.content return resp.status_code, response_object def _get(self, req_url, data=None): url = '%s%s' % (self.url_, req_url) logging.debug('get %s with data %s', url, data) if data: resp = self.session_.get(url, params=data) else: resp = self.session_.get(url) return self._get_response(resp) def _post(self, req_url, data=None): url = '%s%s' % (self.url_, req_url) logging.debug('post %s with data %s', url, data) if data: resp = self.session_.post(url, json.dumps(data)) else: resp = self.session_.post(url) return self._get_response(resp) def _put(self, req_url, data=None): """encapsulate put method.""" url = '%s%s' % (self.url_, req_url) logging.debug('put %s with data %s', url, data) if data: resp = self.session_.put(url, json.dumps(data)) else: resp = self.session_.put(url) return self._get_response(resp) def _patch(self, req_url, data=None): url = '%s%s' % (self.url_, req_url) logging.debug('patch %s with data %s', url, data) if data: resp = self.session_.patch(url, json.dumps(data)) else: resp = self.session_.patch(url) return self._get_response(resp) def _delete(self, req_url): url = '%s%s' % (self.url_, req_url) logging.debug('delete %s', url) return self._get_response(self.session_.delete(url)) def login(self, email, password): credential = {} credential['email'] = email credential['password'] = password return self._post('/users/login', data=credential) def get_token(self, email, password): credential = {} credential['email'] = email credential['password'] = password status, resp = self._post('/users/token', data=credential) if status < 400: self.session_.headers.update({'X-Auth-Token': resp['token']}) return status, resp def get_users(self): users = self._get('/users') return users def list_switches( self, switch_ips=None, switch_ip_networks=None): """list switches.""" params = {} if switch_ips: params['switchIp'] = switch_ips if switch_ip_networks: params['switchIpNetwork'] = switch_ip_networks switchlist = self._get('/switches', data=params) return switchlist def get_switch(self, switch_id): return self._get('/switches/%s' % switch_id) def add_switch( self, switch_ip, version=None, community=None, raw_data=None): data = {} if raw_data: data = raw_data else: data['ip'] = switch_ip data['credentials'] = {} if version: data['credentials']['version'] = version if community: data['credentials']['community'] = community return self._post('/switches', data=data) def update_switch(self, switch_id, state='initialized', version='2c', community='public', raw_data={}): data = {} if raw_data: data = raw_data else: data['credentials'] = {} if version: data['credentials']['version'] = version if community: data['credentials']['community'] = community if state: data['state'] = state return self._put('/switches/%s' % switch_id, data=data) def delete_switch(self, switch_id): return self._delete('/switches/%s' % switch_id) def list_switch_machines(self, switch_id, port=None, vlans=None,
/*
* QFloat Module
*
* Copyright IBM, Corp. 2009
*
* Authors:
* Anthony Liguori <aliguori@us.ibm.com>
*
* This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
* See the COPYING.LIB file in the top-level directory.
*
*/
#include "qapi/qmp/qfloat.h"
#include "qapi/qmp/qobject.h"
#include "qemu-common.h"
static void qfloat_destroy_obj(QObject *obj);
static const QType qfloat_type = {
.code = QTYPE_QFLOAT,
.destroy = qfloat_destroy_obj,
};
/**
* qfloat_from_int(): Create a new QFloat from a float
*
* Return strong reference.
*/
QFloat *qfloat_from_double(double value)
{
QFloat *qf;
qf = g_malloc(sizeof(*qf));
qf->value = value;
QOBJECT_INIT(qf, &qfloat_type);
return qf;
}
/**
* qfloat_get_double(): Get the stored float
*/
double qfloat_get_double(const QFloat *qf)
{
return qf->value;
}
/**
* qobject_to_qfloat(): Convert a QObject into a QFloat
*/
QFloat *qobject_to_qfloat(const QObject *obj)
{
if (qobject_type(obj) != QTYPE_QFLOAT)
return NULL;
return container_of(obj, QFloat, base);
}
/**
* qfloat_destroy_obj(): Free all memory allocated by a
* QFloat object
*/
static void qfloat_destroy_obj(QObject *obj)
{
assert(obj != NULL);
g_free(qobject_to_qfloat(obj));
}