summaryrefslogtreecommitdiffstats
path: root/qemu/block/nbd-client.c
diff options
context:
space:
mode:
Diffstat (limited to 'qemu/block/nbd-client.c')
-rw-r--r--qemu/block/nbd-client.c113
1 files changed, 71 insertions, 42 deletions
diff --git a/qemu/block/nbd-client.c b/qemu/block/nbd-client.c
index e1bb9198c..878e879ac 100644
--- a/qemu/block/nbd-client.c
+++ b/qemu/block/nbd-client.c
@@ -26,8 +26,8 @@
* THE SOFTWARE.
*/
+#include "qemu/osdep.h"
#include "nbd-client.h"
-#include "qemu/sockets.h"
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
@@ -47,13 +47,21 @@ static void nbd_teardown_connection(BlockDriverState *bs)
{
NbdClientSession *client = nbd_get_client_session(bs);
+ if (!client->ioc) { /* Already closed */
+ return;
+ }
+
/* finish any pending coroutines */
- shutdown(client->sock, 2);
+ qio_channel_shutdown(client->ioc,
+ QIO_CHANNEL_SHUTDOWN_BOTH,
+ NULL);
nbd_recv_coroutines_enter_all(client);
nbd_client_detach_aio_context(bs);
- closesocket(client->sock);
- client->sock = -1;
+ object_unref(OBJECT(client->sioc));
+ client->sioc = NULL;
+ object_unref(OBJECT(client->ioc));
+ client->ioc = NULL;
}
static void nbd_reply_ready(void *opaque)
@@ -63,12 +71,16 @@ static void nbd_reply_ready(void *opaque)
uint64_t i;
int ret;
+ if (!s->ioc) { /* Already closed */
+ return;
+ }
+
if (s->reply.handle == 0) {
/* No reply already in flight. Fetch a header. It is possible
* that another thread has done the same thing in parallel, so
* the socket is not readable anymore.
*/
- ret = nbd_receive_reply(s->sock, &s->reply);
+ ret = nbd_receive_reply(s->ioc, &s->reply);
if (ret == -EAGAIN) {
return;
}
@@ -119,32 +131,36 @@ static int nbd_co_send_request(BlockDriverState *bs,
}
}
+ g_assert(qemu_in_coroutine());
assert(i < MAX_NBD_REQUESTS);
request->handle = INDEX_TO_HANDLE(s, i);
+
+ if (!s->ioc) {
+ qemu_co_mutex_unlock(&s->send_mutex);
+ return -EPIPE;
+ }
+
s->send_coroutine = qemu_coroutine_self();
aio_context = bdrv_get_aio_context(bs);
- aio_set_fd_handler(aio_context, s->sock,
+ aio_set_fd_handler(aio_context, s->sioc->fd, false,
nbd_reply_ready, nbd_restart_write, bs);
if (qiov) {
- if (!s->is_unix) {
- socket_set_cork(s->sock, 1);
- }
- rc = nbd_send_request(s->sock, request);
+ qio_channel_set_cork(s->ioc, true);
+ rc = nbd_send_request(s->ioc, request);
if (rc >= 0) {
- ret = qemu_co_sendv(s->sock, qiov->iov, qiov->niov,
- offset, request->len);
+ ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+ offset, request->len, 0);
if (ret != request->len) {
rc = -EIO;
}
}
- if (!s->is_unix) {
- socket_set_cork(s->sock, 0);
- }
+ qio_channel_set_cork(s->ioc, false);
} else {
- rc = nbd_send_request(s->sock, request);
+ rc = nbd_send_request(s->ioc, request);
}
- aio_set_fd_handler(aio_context, s->sock, nbd_reply_ready, NULL, bs);
+ aio_set_fd_handler(aio_context, s->sioc->fd, false,
+ nbd_reply_ready, NULL, bs);
s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
@@ -160,12 +176,13 @@ static void nbd_co_receive_reply(NbdClientSession *s,
* peek at the next reply and avoid yielding if it's ours? */
qemu_coroutine_yield();
*reply = s->reply;
- if (reply->handle != request->handle) {
+ if (reply->handle != request->handle ||
+ !s->ioc) {
reply->error = EIO;
} else {
if (qiov && reply->error == 0) {
- ret = qemu_co_recvv(s->sock, qiov->iov, qiov->niov,
- offset, request->len);
+ ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+ offset, request->len, 1);
if (ret != request->len) {
reply->error = EIO;
}
@@ -226,15 +243,15 @@ static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num,
static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, QEMUIOVector *qiov,
- int offset)
+ int offset, int *flags)
{
NbdClientSession *client = nbd_get_client_session(bs);
struct nbd_request request = { .type = NBD_CMD_WRITE };
struct nbd_reply reply;
ssize_t ret;
- if (!bdrv_enable_write_cache(bs) &&
- (client->nbdflags & NBD_FLAG_SEND_FUA)) {
+ if ((*flags & BDRV_REQ_FUA) && (client->nbdflags & NBD_FLAG_SEND_FUA)) {
+ *flags &= ~BDRV_REQ_FUA;
request.type |= NBD_CMD_FLAG_FUA;
}
@@ -274,12 +291,13 @@ int nbd_client_co_readv(BlockDriverState *bs, int64_t sector_num,
}
int nbd_client_co_writev(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors, QEMUIOVector *qiov)
+ int nb_sectors, QEMUIOVector *qiov, int *flags)
{
int offset = 0;
int ret;
while (nb_sectors > NBD_MAX_SECTORS) {
- ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
+ ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset,
+ flags);
if (ret < 0) {
return ret;
}
@@ -287,7 +305,7 @@ int nbd_client_co_writev(BlockDriverState *bs, int64_t sector_num,
sector_num += NBD_MAX_SECTORS;
nb_sectors -= NBD_MAX_SECTORS;
}
- return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset);
+ return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset, flags);
}
int nbd_client_co_flush(BlockDriverState *bs)
@@ -301,10 +319,6 @@ int nbd_client_co_flush(BlockDriverState *bs)
return 0;
}
- if (client->nbdflags & NBD_FLAG_SEND_FUA) {
- request.type |= NBD_CMD_FLAG_FUA;
- }
-
request.from = 0;
request.len = 0;
@@ -348,14 +362,15 @@ int nbd_client_co_discard(BlockDriverState *bs, int64_t sector_num,
void nbd_client_detach_aio_context(BlockDriverState *bs)
{
aio_set_fd_handler(bdrv_get_aio_context(bs),
- nbd_get_client_session(bs)->sock, NULL, NULL, NULL);
+ nbd_get_client_session(bs)->sioc->fd,
+ false, NULL, NULL, NULL);
}
void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
- aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sock,
- nbd_reply_ready, NULL, bs);
+ aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
+ false, nbd_reply_ready, NULL, bs);
}
void nbd_client_close(BlockDriverState *bs)
@@ -367,16 +382,20 @@ void nbd_client_close(BlockDriverState *bs)
.len = 0
};
- if (client->sock == -1) {
+ if (client->ioc == NULL) {
return;
}
- nbd_send_request(client->sock, &request);
+ nbd_send_request(client->ioc, &request);
nbd_teardown_connection(bs);
}
-int nbd_client_init(BlockDriverState *bs, int sock, const char *export,
+int nbd_client_init(BlockDriverState *bs,
+ QIOChannelSocket *sioc,
+ const char *export,
+ QCryptoTLSCreds *tlscreds,
+ const char *hostname,
Error **errp)
{
NbdClientSession *client = nbd_get_client_session(bs);
@@ -384,22 +403,32 @@ int nbd_client_init(BlockDriverState *bs, int sock, const char *export,
/* NBD handshake */
logout("session init %s\n", export);
- qemu_set_block(sock);
- ret = nbd_receive_negotiate(sock, export,
- &client->nbdflags, &client->size, errp);
+ qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
+
+ ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
+ &client->nbdflags,
+ tlscreds, hostname,
+ &client->ioc,
+ &client->size, errp);
if (ret < 0) {
logout("Failed to negotiate with the NBD server\n");
- closesocket(sock);
return ret;
}
qemu_co_mutex_init(&client->send_mutex);
qemu_co_mutex_init(&client->free_sema);
- client->sock = sock;
+ client->sioc = sioc;
+ object_ref(OBJECT(client->sioc));
+
+ if (!client->ioc) {
+ client->ioc = QIO_CHANNEL(sioc);
+ object_ref(OBJECT(client->ioc));
+ }
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
- qemu_set_nonblock(sock);
+ qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
+
nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
logout("Established connection with NBD server\n");