diff options
author | Yang Zhang <yang.z.zhang@intel.com> | 2015-08-28 09:58:54 +0800 |
---|---|---|
committer | Yang Zhang <yang.z.zhang@intel.com> | 2015-09-01 12:44:00 +0800 |
commit | e44e3482bdb4d0ebde2d8b41830ac2cdb07948fb (patch) | |
tree | 66b09f592c55df2878107a468a91d21506104d3f /qemu/scripts/qtest.py | |
parent | 9ca8dbcc65cfc63d6f5ef3312a33184e1d726e00 (diff) |
Add qemu 2.4.0
Change-Id: Ic99cbad4b61f8b127b7dc74d04576c0bcbaaf4f5
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
Diffstat (limited to 'qemu/scripts/qtest.py')
-rw-r--r-- | qemu/scripts/qtest.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/qemu/scripts/qtest.py b/qemu/scripts/qtest.py new file mode 100644 index 000000000..a9714453a --- /dev/null +++ b/qemu/scripts/qtest.py @@ -0,0 +1,71 @@ +# QEMU qtest library +# +# Copyright (C) 2015 Red Hat Inc. +# +# Authors: +# Fam Zheng <famz@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. +# +# Based on qmp.py. +# + +import errno +import socket + +class QEMUQtestProtocol(object): + def __init__(self, address, server=False): + """ + Create a QEMUQtestProtocol object. + + @param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + @param server: server mode, listens on the socket (bool) + @raise socket.error on socket connection errors + @note No connection is established, this is done by the connect() or + accept() methods + """ + self._address = address + self._sock = self._get_sock() + if server: + self._sock.bind(self._address) + self._sock.listen(1) + + def _get_sock(self): + if isinstance(self._address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + return socket.socket(family, socket.SOCK_STREAM) + + def connect(self): + """ + Connect to the qtest socket. + + @raise socket.error on socket connection errors + """ + self._sock.connect(self._address) + + def accept(self): + """ + Await connection from QEMU. + + @raise socket.error on socket connection errors + """ + self._sock, _ = self._sock.accept() + + def cmd(self, qtest_cmd): + """ + Send a qtest command on the wire. + + @param qtest_cmd: qtest command text to be sent + """ + self._sock.sendall(qtest_cmd + "\n") + + def close(self): + self._sock.close() + + def settimeout(self, timeout): + self._sock.settimeout(timeout) |