Commit 9e55eef4 authored by Chuck Lever's avatar Chuck Lever
Browse files

SUNRPC: Refactor xs_sendpages()



Re-locate xs_sendpages() so that it can be shared with server code.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 0dabe948
Loading
Loading
Loading
Loading
+0 −14
Original line number Diff line number Diff line
@@ -188,20 +188,6 @@ extern int xdr_buf_read_mic(struct xdr_buf *, struct xdr_netobj *, unsigned int)
extern int read_bytes_from_xdr_buf(struct xdr_buf *, unsigned int, void *, unsigned int);
extern int write_bytes_to_xdr_buf(struct xdr_buf *, unsigned int, void *, unsigned int);

/*
 * Helper structure for copying from an sk_buff.
 */
struct xdr_skb_reader {
	struct sk_buff	*skb;
	unsigned int	offset;
	size_t		count;
	__wsum		csum;
};

typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to, size_t len);

extern int csum_partial_copy_to_xdr(struct xdr_buf *, struct sk_buff *);

extern int xdr_encode_word(struct xdr_buf *, unsigned int, u32);
extern int xdr_decode_word(struct xdr_buf *, unsigned int, u32 *);

+141 −0
Original line number Diff line number Diff line
@@ -14,9 +14,24 @@
#include <linux/types.h>
#include <linux/pagemap.h>
#include <linux/udp.h>
#include <linux/sunrpc/msg_prot.h>
#include <linux/sunrpc/xdr.h>
#include <linux/export.h>

#include "socklib.h"

/*
 * Helper structure for copying from an sk_buff.
 */
struct xdr_skb_reader {
	struct sk_buff	*skb;
	unsigned int	offset;
	size_t		count;
	__wsum		csum;
};

typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to,
				     size_t len);

/**
 * xdr_skb_read_bits - copy some data bits from skb to internal buffer
@@ -186,3 +201,129 @@ no_checksum:
	return 0;
}
EXPORT_SYMBOL_GPL(csum_partial_copy_to_xdr);

static inline int xprt_sendmsg(struct socket *sock, struct msghdr *msg,
			       size_t seek)
{
	if (seek)
		iov_iter_advance(&msg->msg_iter, seek);
	return sock_sendmsg(sock, msg);
}

static int xprt_send_kvec(struct socket *sock, struct msghdr *msg,
			  struct kvec *vec, size_t seek)
{
	iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
	return xprt_sendmsg(sock, msg, seek);
}

static int xprt_send_pagedata(struct socket *sock, struct msghdr *msg,
			      struct xdr_buf *xdr, size_t base)
{
	int err;

	err = xdr_alloc_bvec(xdr, GFP_KERNEL);
	if (err < 0)
		return err;

	iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec, xdr_buf_pagecount(xdr),
		      xdr->page_len + xdr->page_base);
	return xprt_sendmsg(sock, msg, base + xdr->page_base);
}

/* Common case:
 *  - stream transport
 *  - sending from byte 0 of the message
 *  - the message is wholly contained in @xdr's head iovec
 */
static int xprt_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
				 rpc_fraghdr marker, struct kvec *vec,
				 size_t base)
{
	struct kvec iov[2] = {
		[0] = {
			.iov_base	= &marker,
			.iov_len	= sizeof(marker)
		},
		[1] = *vec,
	};
	size_t len = iov[0].iov_len + iov[1].iov_len;

	iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
	return xprt_sendmsg(sock, msg, base);
}

/**
 * xprt_sock_sendmsg - write an xdr_buf directly to a socket
 * @sock: open socket to send on
 * @msg: socket message metadata
 * @xdr: xdr_buf containing this request
 * @base: starting position in the buffer
 * @marker: stream record marker field
 * @sent_p: return the total number of bytes successfully queued for sending
 *
 * Return values:
 *   On success, returns zero and fills in @sent_p.
 *   %-ENOTSOCK if  @sock is not a struct socket.
 */
int xprt_sock_sendmsg(struct socket *sock, struct msghdr *msg,
		      struct xdr_buf *xdr, unsigned int base,
		      rpc_fraghdr marker, unsigned int *sent_p)
{
	unsigned int rmsize = marker ? sizeof(marker) : 0;
	unsigned int remainder = rmsize + xdr->len - base;
	unsigned int want;
	int err = 0;

	*sent_p = 0;

	if (unlikely(!sock))
		return -ENOTSOCK;

	msg->msg_flags |= MSG_MORE;
	want = xdr->head[0].iov_len + rmsize;
	if (base < want) {
		unsigned int len = want - base;

		remainder -= len;
		if (remainder == 0)
			msg->msg_flags &= ~MSG_MORE;
		if (rmsize)
			err = xprt_send_rm_and_kvec(sock, msg, marker,
						    &xdr->head[0], base);
		else
			err = xprt_send_kvec(sock, msg, &xdr->head[0], base);
		if (remainder == 0 || err != len)
			goto out;
		*sent_p += err;
		base = 0;
	} else {
		base -= want;
	}

	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;

		remainder -= len;
		if (remainder == 0)
			msg->msg_flags &= ~MSG_MORE;
		err = xprt_send_pagedata(sock, msg, xdr, base);
		if (remainder == 0 || err != len)
			goto out;
		*sent_p += err;
		base = 0;
	} else {
		base -= xdr->page_len;
	}

	if (base >= xdr->tail[0].iov_len)
		return 0;
	msg->msg_flags &= ~MSG_MORE;
	err = xprt_send_kvec(sock, msg, &xdr->tail[0], base);
out:
	if (err > 0) {
		*sent_p += err;
		err = 0;
	}
	return err;
}

net/sunrpc/socklib.h

0 → 100644
+15 −0
Original line number Diff line number Diff line
/* SPDX-License-Identifier: GPL-2.0 */
/*
 * Copyright (C) 1995-1997 Olaf Kirch <okir@monad.swb.de>
 * Copyright (C) 2020, Oracle.
 */

#ifndef _NET_SUNRPC_SOCKLIB_H_
#define _NET_SUNRPC_SOCKLIB_H_

int csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb);
int xprt_sock_sendmsg(struct socket *sock, struct msghdr *msg,
		      struct xdr_buf *xdr, unsigned int base,
		      rpc_fraghdr marker, unsigned int *sent_p);

#endif /* _NET_SUNRPC_SOCKLIB_H_ */
+1 −0
Original line number Diff line number Diff line
@@ -55,6 +55,7 @@
#include <linux/sunrpc/stats.h>
#include <linux/sunrpc/xprt.h>

#include "socklib.h"
#include "sunrpc.h"

#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+20 −129
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@

#include <trace/events/sunrpc.h>

#include "socklib.h"
#include "sunrpc.h"

static void xs_close(struct rpc_xprt *xprt);
@@ -749,125 +750,6 @@ xs_stream_start_connect(struct sock_xprt *transport)

#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

static int xs_sendmsg(struct socket *sock, struct msghdr *msg, size_t seek)
{
	if (seek)
		iov_iter_advance(&msg->msg_iter, seek);
	return sock_sendmsg(sock, msg);
}

static int xs_send_kvec(struct socket *sock, struct msghdr *msg, struct kvec *vec, size_t seek)
{
	iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
	return xs_sendmsg(sock, msg, seek);
}

static int xs_send_pagedata(struct socket *sock, struct msghdr *msg, struct xdr_buf *xdr, size_t base)
{
	int err;

	err = xdr_alloc_bvec(xdr, GFP_KERNEL);
	if (err < 0)
		return err;

	iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec,
			xdr_buf_pagecount(xdr),
			xdr->page_len + xdr->page_base);
	return xs_sendmsg(sock, msg, base + xdr->page_base);
}

#define xs_record_marker_len() sizeof(rpc_fraghdr)

/* Common case:
 *  - stream transport
 *  - sending from byte 0 of the message
 *  - the message is wholly contained in @xdr's head iovec
 */
static int xs_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
		rpc_fraghdr marker, struct kvec *vec, size_t base)
{
	struct kvec iov[2] = {
		[0] = {
			.iov_base	= &marker,
			.iov_len	= sizeof(marker)
		},
		[1] = *vec,
	};
	size_t len = iov[0].iov_len + iov[1].iov_len;

	iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
	return xs_sendmsg(sock, msg, base);
}

/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 * @rm: stream record marker field
 * @sent_p: return the total number of bytes successfully queued for sending
 *
 */
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, rpc_fraghdr rm, int *sent_p)
{
	struct msghdr msg = {
		.msg_name = addr,
		.msg_namelen = addrlen,
		.msg_flags = XS_SENDMSG_FLAGS | MSG_MORE,
	};
	unsigned int rmsize = rm ? sizeof(rm) : 0;
	unsigned int remainder = rmsize + xdr->len - base;
	unsigned int want;
	int err = 0;

	if (unlikely(!sock))
		return -ENOTSOCK;

	want = xdr->head[0].iov_len + rmsize;
	if (base < want) {
		unsigned int len = want - base;
		remainder -= len;
		if (remainder == 0)
			msg.msg_flags &= ~MSG_MORE;
		if (rmsize)
			err = xs_send_rm_and_kvec(sock, &msg, rm,
					&xdr->head[0], base);
		else
			err = xs_send_kvec(sock, &msg, &xdr->head[0], base);
		if (remainder == 0 || err != len)
			goto out;
		*sent_p += err;
		base = 0;
	} else
		base -= want;

	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
		if (remainder == 0)
			msg.msg_flags &= ~MSG_MORE;
		err = xs_send_pagedata(sock, &msg, xdr, base);
		if (remainder == 0 || err != len)
			goto out;
		*sent_p += err;
		base = 0;
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
		return 0;
	msg.msg_flags &= ~MSG_MORE;
	err = xs_send_kvec(sock, &msg, &xdr->tail[0], base);
out:
	if (err > 0) {
		*sent_p += err;
		err = 0;
	}
	return err;
}

/**
 * xs_nospace - handle transmit was incomplete
 * @req: pointer to RPC request
@@ -959,8 +841,11 @@ static int xs_local_send_request(struct rpc_rqst *req)
	struct xdr_buf *xdr = &req->rq_snd_buf;
	rpc_fraghdr rm = xs_stream_record_marker(xdr);
	unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
	struct msghdr msg = {
		.msg_flags	= XS_SENDMSG_FLAGS,
	};
	unsigned int uninitialized_var(sent);
	int status;
	int sent = 0;

	/* Close the stream if the previous transmission was incomplete */
	if (xs_send_request_was_aborted(transport, req)) {
@@ -972,7 +857,7 @@ static int xs_local_send_request(struct rpc_rqst *req)
			req->rq_svec->iov_base, req->rq_svec->iov_len);

	req->rq_xtime = ktime_get();
	status = xs_sendpages(transport->sock, NULL, 0, xdr,
	status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
				   transport->xmit.offset, rm, &sent);
	dprintk("RPC:       %s(%u) = %d\n",
			__func__, xdr->len - transport->xmit.offset, status);
@@ -1025,7 +910,12 @@ static int xs_udp_send_request(struct rpc_rqst *req)
	struct rpc_xprt *xprt = req->rq_xprt;
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int sent = 0;
	struct msghdr msg = {
		.msg_name	= xs_addr(xprt),
		.msg_namelen	= xprt->addrlen,
		.msg_flags	= XS_SENDMSG_FLAGS,
	};
	unsigned int uninitialized_var(sent);
	int status;

	xs_pktdump("packet data:",
@@ -1039,8 +929,7 @@ static int xs_udp_send_request(struct rpc_rqst *req)
		return -EBADSLT;

	req->rq_xtime = ktime_get();
	status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
			      xdr, 0, 0, &sent);
	status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent);

	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
			xdr->len, status);
@@ -1106,9 +995,12 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
	struct xdr_buf *xdr = &req->rq_snd_buf;
	rpc_fraghdr rm = xs_stream_record_marker(xdr);
	unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
	struct msghdr msg = {
		.msg_flags	= XS_SENDMSG_FLAGS,
	};
	bool vm_wait = false;
	unsigned int uninitialized_var(sent);
	int status;
	int sent;

	/* Close the stream if the previous transmission was incomplete */
	if (xs_send_request_was_aborted(transport, req)) {
@@ -1129,8 +1021,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
	 * called sendmsg(). */
	req->rq_xtime = ktime_get();
	while (1) {
		sent = 0;
		status = xs_sendpages(transport->sock, NULL, 0, xdr,
		status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
					   transport->xmit.offset, rm, &sent);

		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",