Commit d9ca1de8 authored by Paolo Abeni's avatar Paolo Abeni Committed by Jakub Kicinski
Browse files

mptcp: move page frag allocation in mptcp_sendmsg()



mptcp_sendmsg() is refactored so that first it copies
the data provided from user space into the send queue,
and then tries to spool the send queue via sendmsg_frag.

There a subtle change in the mptcp level collapsing on
consecutive data fragment: we now allow that only on unsent
data.

The latter don't need to deal with msghdr data anymore
and can be simplified in a relevant way.

snd_nxt and write_seq are now tracked independently.

Overall this allows some relevant cleanup and will
allow sending pending mptcp data on msk una update in
later patch.

Co-developed-by: default avatarFlorian Westphal <fw@strlen.de>
Signed-off-by: default avatarFlorian Westphal <fw@strlen.de>
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parent e16163b6
Loading
Loading
Loading
Loading
+189 −217
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ struct mptcp_skb_cb {
static struct percpu_counter mptcp_sockets_allocated;

static void __mptcp_destroy_sock(struct sock *sk);
static void __mptcp_check_send_data_fin(struct sock *sk);

/* If msk has an initial subflow socket, and the MP_CAPABLE handshake has not
 * completed yet or has failed, return the subflow socket.
@@ -814,6 +815,7 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
				       const struct mptcp_data_frag *df)
{
	return df && pfrag->page == df->page &&
		pfrag->size - pfrag->offset > 0 &&
		df->data_seq + df->data_len == msk->write_seq;
}

@@ -864,6 +866,8 @@ static void mptcp_clean_una(struct sock *sk)
		if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
			break;

		if (WARN_ON_ONCE(dfrag == msk->first_pending))
			break;
		dfrag_clear(sk, dfrag);
		cleaned = true;
	}
@@ -872,12 +876,13 @@ static void mptcp_clean_una(struct sock *sk)
	if (dfrag && after64(snd_una, dfrag->data_seq)) {
		u64 delta = snd_una - dfrag->data_seq;

		if (WARN_ON_ONCE(delta > dfrag->data_len))
		if (WARN_ON_ONCE(delta > dfrag->already_sent))
			goto out;

		dfrag->data_seq += delta;
		dfrag->offset += delta;
		dfrag->data_len -= delta;
		dfrag->already_sent -= delta;

		dfrag_uncharge(sk, delta);
		cleaned = true;
@@ -911,12 +916,23 @@ static void mptcp_clean_una_wakeup(struct sock *sk)
 */
static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
{
	struct mptcp_subflow_context *subflow;
	struct mptcp_sock *msk = mptcp_sk(sk);
	bool first = true;

	if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
					pfrag, sk->sk_allocation)))
		return true;

	sk->sk_prot->enter_memory_pressure(sk);
	sk_stream_moderate_sndbuf(sk);
	mptcp_for_each_subflow(msk, subflow) {
		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);

		if (first)
			tcp_enter_memory_pressure(ssk);
		sk_stream_moderate_sndbuf(ssk);
		first = false;
	}
	return false;
}

@@ -932,6 +948,7 @@ mptcp_carve_data_frag(const struct mptcp_sock *msk, struct page_frag *pfrag,
	dfrag->data_seq = msk->write_seq;
	dfrag->overhead = offset - orig_offset + sizeof(struct mptcp_data_frag);
	dfrag->offset = offset + sizeof(struct mptcp_data_frag);
	dfrag->already_sent = 0;
	dfrag->page = pfrag->page;

	return dfrag;
@@ -940,121 +957,58 @@ mptcp_carve_data_frag(const struct mptcp_sock *msk, struct page_frag *pfrag,
struct mptcp_sendmsg_info {
	int mss_now;
	int size_goal;
	u16 limit;
	u16 sent;
	unsigned int flags;
};

static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
			      struct msghdr *msg, struct mptcp_data_frag *dfrag,
			      struct mptcp_data_frag *dfrag,
			      struct mptcp_sendmsg_info *info)
{
	int avail_size, offset, ret, frag_truesize = 0;
	bool dfrag_collapsed, can_collapse = false;
	u64 data_seq = dfrag->data_seq + info->sent;
	struct mptcp_sock *msk = mptcp_sk(sk);
	struct mptcp_ext *mpext = NULL;
	bool retransmission = !!dfrag;
	struct sk_buff *skb, *tail;
	struct page_frag *pfrag;
	struct page *page;
	u64 *write_seq;
	size_t psize;

	/* use the mptcp page cache so that we can easily move the data
	 * from one substream to another, but do per subflow memory accounting
	 * Note: pfrag is used only !retransmission, but the compiler if
	 * fooled into a warning if we don't init here
	 */
	pfrag = sk_page_frag(sk);
	if (!retransmission) {
		write_seq = &msk->write_seq;
		page = pfrag->page;
	} else {
		write_seq = &dfrag->data_seq;
		page = dfrag->page;
	}
	bool can_collapse = false;
	int avail_size;
	size_t ret;

	pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d",
		 msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent);

	/* compute copy limit */
	info->mss_now = tcp_send_mss(ssk, &info->size_goal, msg->msg_flags);
	/* compute send limit */
	info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags);
	avail_size = info->size_goal;
	skb = tcp_write_queue_tail(ssk);
	if (skb) {
		mpext = skb_ext_find(skb, SKB_EXT_MPTCP);

		/* Limit the write to the size available in the
		 * current skb, if any, so that we create at most a new skb.
		 * Explicitly tells TCP internals to avoid collapsing on later
		 * queue management operation, to avoid breaking the ext <->
		 * SSN association set here
		 */
		mpext = skb_ext_find(skb, SKB_EXT_MPTCP);
		can_collapse = (info->size_goal - skb->len > 0) &&
			      mptcp_skb_can_collapse_to(*write_seq, skb, mpext);
			 mptcp_skb_can_collapse_to(data_seq, skb, mpext);
		if (!can_collapse)
			TCP_SKB_CB(skb)->eor = 1;
		else
			avail_size = info->size_goal - skb->len;
	}

	if (!retransmission) {
		/* reuse tail pfrag, if possible, or carve a new one from the
		 * page allocator
		 */
		dfrag = mptcp_rtx_tail(sk);
		offset = pfrag->offset;
		dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
		if (!dfrag_collapsed) {
			dfrag = mptcp_carve_data_frag(msk, pfrag, offset);
			offset = dfrag->offset;
			frag_truesize = dfrag->overhead;
		}
		psize = min_t(size_t, pfrag->size - offset, avail_size);

		/* Copy to page */
		pr_debug("left=%zu", msg_data_left(msg));
		psize = copy_page_from_iter(pfrag->page, offset,
					    min_t(size_t, msg_data_left(msg),
						  psize),
					    &msg->msg_iter);
		pr_debug("left=%zu", msg_data_left(msg));
		if (!psize)
			return -EINVAL;

		if (!sk_wmem_schedule(sk, psize + dfrag->overhead)) {
			iov_iter_revert(&msg->msg_iter, psize);
			return -ENOMEM;
		}
	} else {
		offset = dfrag->offset;
		psize = min_t(size_t, dfrag->data_len, avail_size);
	}
	if (WARN_ON_ONCE(info->sent > info->limit ||
			 info->limit > dfrag->data_len))
		return 0;

	tail = tcp_build_frag(ssk, psize, msg->msg_flags, page, offset, &psize);
	ret = info->limit - info->sent;
	tail = tcp_build_frag(ssk, avail_size, info->flags, dfrag->page,
			      dfrag->offset + info->sent, &ret);
	if (!tail) {
		tcp_remove_empty_skb(sk, tcp_write_queue_tail(ssk));
		return -ENOMEM;
	}

	ret = psize;
	frag_truesize += ret;
	if (!retransmission) {
		if (unlikely(ret < psize))
			iov_iter_revert(&msg->msg_iter, psize - ret);

		/* send successful, keep track of sent data for mptcp-level
		 * retransmission
		 */
		dfrag->data_len += ret;
		if (!dfrag_collapsed) {
			get_page(dfrag->page);
			list_add_tail(&dfrag->list, &msk->rtx_queue);
			sk_wmem_queued_add(sk, frag_truesize);
		} else {
			sk_wmem_queued_add(sk, ret);
		}

		/* charge data on mptcp rtx queue to the master socket
		 * Note: we charge such data both to sk and ssk
		 */
		sk->sk_forward_alloc -= frag_truesize;
	}

	/* if the tail skb is still the cached one, collapsing really happened.
	 */
	if (skb == tail) {
@@ -1067,7 +1021,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
	msk->cached_ext = NULL;

	memset(mpext, 0, sizeof(*mpext));
	mpext->data_seq = *write_seq;
	mpext->data_seq = data_seq;
	mpext->subflow_seq = mptcp_subflow_ctx(ssk)->rel_write_seq;
	mpext->data_len = ret;
	mpext->use_map = 1;
@@ -1078,11 +1032,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
		 mpext->dsn64);

out:
	if (!retransmission)
		pfrag->offset += frag_truesize;
	WRITE_ONCE(*write_seq, *write_seq + ret);
	mptcp_subflow_ctx(ssk)->rel_write_seq += ret;

	return ret;
}

@@ -1210,19 +1160,86 @@ static void ssk_check_wmem(struct mptcp_sock *msk)
		mptcp_nospace(msk);
}

static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
static void mptcp_push_release(struct sock *sk, struct sock *ssk,
			       struct mptcp_sendmsg_info *info)
{
	mptcp_set_timeout(sk, ssk);
	tcp_push(ssk, 0, info->mss_now, tcp_sk(ssk)->nonagle, info->size_goal);
	release_sock(ssk);
}

static void mptcp_push_pending(struct sock *sk, unsigned int flags)
{
	struct sock *prev_ssk = NULL, *ssk = NULL;
	struct mptcp_sock *msk = mptcp_sk(sk);
	struct mptcp_sendmsg_info info = {
		.mss_now = 0,
		.size_goal = 0,
				.flags = flags,
	};
	struct mptcp_data_frag *dfrag;
	int len, copied = 0;
	u32 sndbuf;

	while ((dfrag = mptcp_send_head(sk))) {
		info.sent = dfrag->already_sent;
		info.limit = dfrag->data_len;
		len = dfrag->data_len - dfrag->already_sent;
		while (len > 0) {
			int ret = 0;

			prev_ssk = ssk;
			__mptcp_flush_join_list(msk);
			ssk = mptcp_subflow_get_send(msk, &sndbuf);

			/* do auto tuning */
			if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
			    sndbuf > READ_ONCE(sk->sk_sndbuf))
				WRITE_ONCE(sk->sk_sndbuf, sndbuf);

			/* try to keep the subflow socket lock across
			 * consecutive xmit on the same socket
			 */
			if (ssk != prev_ssk && prev_ssk)
				mptcp_push_release(sk, prev_ssk, &info);
			if (!ssk)
				goto out;

			if (ssk != prev_ssk || !prev_ssk)
				lock_sock(ssk);

			ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
			if (ret <= 0) {
				mptcp_push_release(sk, ssk, &info);
				goto out;
			}

			info.sent += ret;
			dfrag->already_sent += ret;
			msk->snd_nxt += ret;
			msk->snd_burst -= ret;
			copied += ret;
			len -= ret;
		}
		WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
	}

	/* at this point we held the socket lock for the last subflow we used */
	if (ssk)
		mptcp_push_release(sk, ssk, &info);

out:
	/* start the timer, if it's not pending */
	if (!mptcp_timer_pending(sk))
		mptcp_reset_timer(sk);
	if (copied)
		__mptcp_check_send_data_fin(sk);
}

static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
{
	struct mptcp_sock *msk = mptcp_sk(sk);
	struct page_frag *pfrag;
	size_t copied = 0;
	struct sock *ssk;
	int ret = 0;
	u32 sndbuf;
	bool tx_ok;
	long timeo;

	if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL))
@@ -1239,129 +1256,93 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
	}

	pfrag = sk_page_frag(sk);
restart:
	mptcp_clean_una(sk);

	while (msg_data_left(msg)) {
		struct mptcp_data_frag *dfrag;
		int frag_truesize = 0;
		bool dfrag_collapsed;
		size_t psize, offset;

		if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
			ret = -EPIPE;
			goto out;
		}

	__mptcp_flush_join_list(msk);
	ssk = mptcp_subflow_get_send(msk, &sndbuf);
	while (!sk_stream_memory_free(sk) ||
	       !ssk ||
	       !mptcp_page_frag_refill(ssk, pfrag)) {
		if (ssk) {
			/* make sure retransmit timer is
			 * running before we wait for memory.
			 *
			 * The retransmit timer might be needed
			 * to make the peer send an up-to-date
			 * MPTCP Ack.
		/* reuse tail pfrag, if possible, or carve a new one from the
		 * page allocator
		 */
			mptcp_set_timeout(sk, ssk);
			if (!mptcp_timer_pending(sk))
				mptcp_reset_timer(sk);
		}

		mptcp_nospace(msk);
		ret = sk_stream_wait_memory(sk, &timeo);
		if (ret)
			goto out;

		mptcp_clean_una(sk);

		ssk = mptcp_subflow_get_send(msk, &sndbuf);
		if (list_empty(&msk->conn_list)) {
			ret = -ENOTCONN;
			goto out;
		}
		dfrag = mptcp_pending_tail(sk);
		dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
		if (!dfrag_collapsed) {
			if (!sk_stream_memory_free(sk)) {
				mptcp_push_pending(sk, msg->msg_flags);
				if (!sk_stream_memory_free(sk))
					goto wait_for_memory;
			}
			if (!mptcp_page_frag_refill(sk, pfrag))
				goto wait_for_memory;

	/* do auto tuning */
	if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
	    sndbuf > READ_ONCE(sk->sk_sndbuf))
		WRITE_ONCE(sk->sk_sndbuf, sndbuf);

	pr_debug("conn_list->subflow=%p", ssk);

	lock_sock(ssk);
	tx_ok = msg_data_left(msg);
	while (tx_ok) {
		ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &info);
		if (ret < 0) {
			if (ret == -EAGAIN && timeo > 0) {
				mptcp_set_timeout(sk, ssk);
				release_sock(ssk);
				goto restart;
			}
			break;
			dfrag = mptcp_carve_data_frag(msk, pfrag, pfrag->offset);
			frag_truesize = dfrag->overhead;
		}

		/* burst can be negative, we will try move to the next subflow
		 * at selection time, if possible.
		/* we do not bound vs wspace, to allow a single packet.
		 * memory accounting will prevent execessive memory usage
		 * anyway
		 */
		msk->snd_burst -= ret;
		copied += ret;

		tx_ok = msg_data_left(msg);
		if (!tx_ok)
			break;

		if (!sk_stream_memory_free(ssk) ||
		    !mptcp_page_frag_refill(ssk, pfrag) ||
		    !mptcp_ext_cache_refill(msk)) {
			tcp_push(ssk, msg->msg_flags, info.mss_now,
				 tcp_sk(ssk)->nonagle, info.size_goal);
			mptcp_set_timeout(sk, ssk);
			release_sock(ssk);
			goto restart;
		offset = dfrag->offset + dfrag->data_len;
		psize = pfrag->size - offset;
		psize = min_t(size_t, psize, msg_data_left(msg));
		if (!sk_wmem_schedule(sk, psize + frag_truesize))
			goto wait_for_memory;

		if (copy_page_from_iter(dfrag->page, offset, psize,
					&msg->msg_iter) != psize) {
			ret = -EFAULT;
			goto out;
		}

		/* memory is charged to mptcp level socket as well, i.e.
		 * if msg is very large, mptcp socket may run out of buffer
		 * space.  mptcp_clean_una() will release data that has
		 * been acked at mptcp level in the mean time, so there is
		 * a good chance we can continue sending data right away.
		 *
		 * Normally, when the tcp subflow can accept more data, then
		 * so can the MPTCP socket.  However, we need to cope with
		 * peers that might lag behind in their MPTCP-level
		 * acknowledgements, i.e.  data might have been acked at
		 * tcp level only.  So, we must also check the MPTCP socket
		 * limits before we send more data.
		 */
		if (unlikely(!sk_stream_memory_free(sk))) {
			tcp_push(ssk, msg->msg_flags, info.mss_now,
				 tcp_sk(ssk)->nonagle, info.size_goal);
			mptcp_clean_una(sk);
			if (!sk_stream_memory_free(sk)) {
				/* can't send more for now, need to wait for
				 * MPTCP-level ACKs from peer.
				 *
				 * Wakeup will happen via mptcp_clean_una().
		/* data successfully copied into the write queue */
		copied += psize;
		dfrag->data_len += psize;
		frag_truesize += psize;
		pfrag->offset += frag_truesize;
		WRITE_ONCE(msk->write_seq, msk->write_seq + psize);

		/* charge data on mptcp pending queue to the msk socket
		 * Note: we charge such data both to sk and ssk
		 */
				mptcp_set_timeout(sk, ssk);
				release_sock(ssk);
				goto restart;
			}
		}
		sk_wmem_queued_add(sk, frag_truesize);
		sk->sk_forward_alloc -= frag_truesize;
		if (!dfrag_collapsed) {
			get_page(dfrag->page);
			list_add_tail(&dfrag->list, &msk->rtx_queue);
			if (!msk->first_pending)
				WRITE_ONCE(msk->first_pending, dfrag);
		}
		pr_debug("msk=%p dfrag at seq=%lld len=%d sent=%d new=%d", msk,
			 dfrag->data_seq, dfrag->data_len, dfrag->already_sent,
			 !dfrag_collapsed);

	mptcp_set_timeout(sk, ssk);
	if (copied) {
		tcp_push(ssk, msg->msg_flags, info.mss_now,
			 tcp_sk(ssk)->nonagle, info.size_goal);
		if (!mptcp_ext_cache_refill(msk))
			goto wait_for_memory;
		continue;

		/* start the timer, if it's not pending */
		if (!mptcp_timer_pending(sk))
wait_for_memory:
		mptcp_nospace(msk);
		mptcp_clean_una(sk);
		if (mptcp_timer_pending(sk))
			mptcp_reset_timer(sk);
		ret = sk_stream_wait_memory(sk, &timeo);
		if (ret)
			goto out;
	}

	release_sock(ssk);
	if (copied)
		mptcp_push_pending(sk, msg->msg_flags);

out:
	msk->snd_nxt = msk->write_seq;
	ssk_check_wmem(msk);
	release_sock(sk);
	return copied ? : ret;
@@ -1700,7 +1681,7 @@ static struct sock *mptcp_subflow_get_retrans(const struct mptcp_sock *msk)
	sock_owned_by_me((const struct sock *)msk);

	if (__mptcp_check_fallback(msk))
		return msk->first;
		return NULL;

	mptcp_for_each_subflow(msk, subflow) {
		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
@@ -1843,12 +1824,7 @@ static void mptcp_worker(struct work_struct *work)
	struct sock *ssk, *sk = &msk->sk.icsk_inet.sk;
	struct mptcp_sendmsg_info info = {};
	struct mptcp_data_frag *dfrag;
	int orig_len, orig_offset;
	u64 orig_write_seq;
	size_t copied = 0;
	struct msghdr msg = {
		.msg_flags = MSG_DONTWAIT,
	};
	int state, ret;

	lock_sock(sk);
@@ -1901,18 +1877,17 @@ static void mptcp_worker(struct work_struct *work)

	lock_sock(ssk);

	orig_len = dfrag->data_len;
	orig_offset = dfrag->offset;
	orig_write_seq = dfrag->data_seq;
	while (dfrag->data_len > 0) {
		ret = mptcp_sendmsg_frag(sk, ssk, &msg, dfrag, &info);
	/* limit retransmission to the bytes already sent on some subflows */
	info.sent = 0;
	info.limit = dfrag->already_sent;
	while (info.sent < dfrag->already_sent) {
		ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
		if (ret < 0)
			break;

		MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
		copied += ret;
		dfrag->data_len -= ret;
		dfrag->offset += ret;
		info.sent += ret;

		if (!mptcp_ext_cache_refill(msk))
			break;
@@ -1921,10 +1896,6 @@ static void mptcp_worker(struct work_struct *work)
		tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
			 info.size_goal);

	dfrag->data_seq = orig_write_seq;
	dfrag->offset = orig_offset;
	dfrag->data_len = orig_len;

	mptcp_set_timeout(sk, ssk);
	release_sock(ssk);

@@ -1996,6 +1967,7 @@ static void __mptcp_clear_xmit(struct sock *sk)

	sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);

	WRITE_ONCE(msk->first_pending, NULL);
	list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
		dfrag_clear(sk, dfrag);
}