Commit 72308ecb authored by Jakub Kicinski's avatar Jakub Kicinski
Browse files

Merge branch 'mptcp-improve-multiple-xmit-streams-support'

Paolo Abeni says:

====================
mptcp: improve multiple xmit streams support

This series improves MPTCP handling of multiple concurrent
xmit streams.

The to-be-transmitted data is enqueued to a subflow only when
the send window is open, keeping the subflows xmit queue shorter
and allowing for faster switch-over.

The above requires a more accurate msk socket state tracking
and some additional infrastructure to allow pushing the data
pending in the msk xmit queue as soon as the MPTCP's send window
opens (patches 6-10).

As a side effect, the MPTCP socket could enqueue data to subflows
after close() time - to completely spooling the data sitting in the
msk xmit queue. Dealing with the requires some infrastructure and
core TCP changes (patches 1-5)

Finally, patches 11-12 introduce a more accurate tracking of the other
end's receive window.

Overall this refactor the MPTCP xmit path, without introducing
new features - the new code is covered by the existing self-tests.

v2 -> v3:
 - rebased,
 - fixed checkpatch issue in patch 1/13
 - fixed some state tracking issues in patch 8/13

v1 -> v2:
 - this is just a repost, to cope with patchwork issues, no changes
   at all
====================

Link: https://lore.kernel.org/r/cover.1605458224.git.pabeni@redhat.com


Signed-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parents c0a645a7 7ed90803
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -322,6 +322,7 @@ void tcp_shutdown(struct sock *sk, int how);
int tcp_v4_early_demux(struct sk_buff *skb);
int tcp_v4_rcv(struct sk_buff *skb);

void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb);
int tcp_v4_tw_remember_stamp(struct inet_timewait_sock *tw);
int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size);
int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size);
@@ -329,6 +330,8 @@ int tcp_sendpage(struct sock *sk, struct page *page, int offset, size_t size,
		 int flags);
int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset,
			size_t size, int flags);
struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags,
			       struct page *page, int offset, size_t *size);
ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
		 size_t size, int flags);
int tcp_send_mss(struct sock *sk, int *size_goal, int flags);
@@ -392,6 +395,7 @@ void tcp_update_metrics(struct sock *sk);
void tcp_init_metrics(struct sock *sk);
void tcp_metrics_init(void);
bool tcp_peer_is_proven(struct request_sock *req, struct dst_entry *dst);
void __tcp_close(struct sock *sk, long timeout);
void tcp_close(struct sock *sk, long timeout);
void tcp_init_sock(struct sock *sk);
void tcp_init_transfer(struct sock *sk, int bpf_op, struct sk_buff *skb);
+74 −54
Original line number Diff line number Diff line
@@ -954,7 +954,7 @@ int tcp_send_mss(struct sock *sk, int *size_goal, int flags)
 * importantly be able to generate EPOLLOUT for Edge Trigger epoll()
 * users.
 */
static void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb)
void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb)
{
	if (skb && !skb->len) {
		tcp_unlink_write_queue(skb, sk);
@@ -964,55 +964,24 @@ static void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb)
	}
}

ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
			 size_t size, int flags)
struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags,
			       struct page *page, int offset, size_t *size)
{
	struct tcp_sock *tp = tcp_sk(sk);
	int mss_now, size_goal;
	int err;
	ssize_t copied;
	long timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

	if (IS_ENABLED(CONFIG_DEBUG_VM) &&
	    WARN_ONCE(!sendpage_ok(page),
		      "page must not be a Slab one and have page_count > 0"))
		return -EINVAL;

	/* Wait for a connection to finish. One exception is TCP Fast Open
	 * (passive side) where data is allowed to be sent before a connection
	 * is fully established.
	 */
	if (((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) &&
	    !tcp_passive_fastopen(sk)) {
		err = sk_stream_wait_connect(sk, &timeo);
		if (err != 0)
			goto out_err;
	}

	sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);

	mss_now = tcp_send_mss(sk, &size_goal, flags);
	copied = 0;

	err = -EPIPE;
	if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
		goto out_err;

	while (size > 0) {
	struct sk_buff *skb = tcp_write_queue_tail(sk);
		int copy, i;
	struct tcp_sock *tp = tcp_sk(sk);
	bool can_coalesce;
	int copy, i;

	if (!skb || (copy = size_goal - skb->len) <= 0 ||
	    !tcp_skb_can_collapse_to(skb)) {
new_segment:
		if (!sk_stream_memory_free(sk))
				goto wait_for_space;
			return NULL;

		skb = sk_stream_alloc_skb(sk, 0, sk->sk_allocation,
					  tcp_rtx_and_write_queues_empty(sk));
		if (!skb)
				goto wait_for_space;
			return NULL;

#ifdef CONFIG_TLS_DEVICE
		skb->decrypted = !!(flags & MSG_SENDPAGE_DECRYPTED);
@@ -1021,8 +990,8 @@ new_segment:
		copy = size_goal;
	}

		if (copy > size)
			copy = size;
	if (copy > *size)
		copy = *size;

	i = skb_shinfo(skb)->nr_frags;
	can_coalesce = skb_can_coalesce(skb, i, page, offset);
@@ -1031,7 +1000,7 @@ new_segment:
		goto new_segment;
	}
	if (!sk_wmem_schedule(sk, copy))
			goto wait_for_space;
		return NULL;

	if (can_coalesce) {
		skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
@@ -1053,6 +1022,52 @@ new_segment:
	TCP_SKB_CB(skb)->end_seq += copy;
	tcp_skb_pcount_set(skb, 0);

	*size = copy;
	return skb;
}

ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
			 size_t size, int flags)
{
	struct tcp_sock *tp = tcp_sk(sk);
	int mss_now, size_goal;
	int err;
	ssize_t copied;
	long timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

	if (IS_ENABLED(CONFIG_DEBUG_VM) &&
	    WARN_ONCE(!sendpage_ok(page),
		      "page must not be a Slab one and have page_count > 0"))
		return -EINVAL;

	/* Wait for a connection to finish. One exception is TCP Fast Open
	 * (passive side) where data is allowed to be sent before a connection
	 * is fully established.
	 */
	if (((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) &&
	    !tcp_passive_fastopen(sk)) {
		err = sk_stream_wait_connect(sk, &timeo);
		if (err != 0)
			goto out_err;
	}

	sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);

	mss_now = tcp_send_mss(sk, &size_goal, flags);
	copied = 0;

	err = -EPIPE;
	if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
		goto out_err;

	while (size > 0) {
		struct sk_buff *skb;
		size_t copy = size;

		skb = tcp_build_frag(sk, size_goal, flags, page, offset, &copy);
		if (!skb)
			goto wait_for_space;

		if (!copied)
			TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH;

@@ -2405,13 +2420,12 @@ bool tcp_check_oom(struct sock *sk, int shift)
	return too_many_orphans || out_of_socket_memory;
}

void tcp_close(struct sock *sk, long timeout)
void __tcp_close(struct sock *sk, long timeout)
{
	struct sk_buff *skb;
	int data_was_unread = 0;
	int state;

	lock_sock(sk);
	sk->sk_shutdown = SHUTDOWN_MASK;

	if (sk->sk_state == TCP_LISTEN) {
@@ -2575,6 +2589,12 @@ adjudge_to_death:
out:
	bh_unlock_sock(sk);
	local_bh_enable();
}

void tcp_close(struct sock *sk, long timeout)
{
	lock_sock(sk);
	__tcp_close(sk, timeout);
	release_sock(sk);
	sock_put(sk);
}
+23 −7
Original line number Diff line number Diff line
@@ -492,7 +492,7 @@ static bool mptcp_established_options_dss(struct sock *sk, struct sk_buff *skb,
	bool ret = false;

	mpext = skb ? mptcp_get_ext(skb) : NULL;
	snd_data_fin_enable = READ_ONCE(msk->snd_data_fin_enable);
	snd_data_fin_enable = mptcp_data_fin_enabled(msk);

	if (!skb || (mpext && mpext->use_map) || snd_data_fin_enable) {
		unsigned int map_size;
@@ -809,11 +809,14 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
	return cur_ack;
}

static void update_una(struct mptcp_sock *msk,
static void ack_update_msk(struct mptcp_sock *msk,
			   const struct sock *ssk,
			   struct mptcp_options_received *mp_opt)
{
	u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
	u64 write_seq = READ_ONCE(msk->write_seq);
	u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end);
	u64 snd_nxt = READ_ONCE(msk->snd_nxt);
	struct sock *sk = (struct sock *)msk;

	/* avoid ack expansion on update conflict, to reduce the risk of
	 * wrongly expanding to a future ack sequence number, which is way
@@ -822,15 +825,28 @@ static void update_una(struct mptcp_sock *msk,
	new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64);

	/* ACK for data not even sent yet? Ignore. */
	if (after64(new_snd_una, write_seq))
	if (after64(new_snd_una, snd_nxt))
		new_snd_una = old_snd_una;

	new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd;

	while (after64(new_wnd_end, old_wnd_end)) {
		wnd_end = old_wnd_end;
		old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end,
					       new_wnd_end);
		if (old_wnd_end == wnd_end) {
			if (mptcp_send_head(sk))
				mptcp_schedule_work(sk);
			break;
		}
	}

	while (after64(new_snd_una, old_snd_una)) {
		snd_una = old_snd_una;
		old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
					       new_snd_una);
		if (old_snd_una == snd_una) {
			mptcp_data_acked((struct sock *)msk);
			mptcp_data_acked(sk);
			break;
		}
	}
@@ -930,7 +946,7 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
	 * monodirectional flows will stuck
	 */
	if (mp_opt.use_ack)
		update_una(msk, &mp_opt);
		ack_update_msk(msk, sk, &mp_opt);

	/* Zero-data-length packets are dropped by the caller and not
	 * propagated to the MPTCP layer, so the skb extension does not
+1 −2
Original line number Diff line number Diff line
@@ -89,8 +89,7 @@ static bool mptcp_pm_schedule_work(struct mptcp_sock *msk,
		return false;

	msk->pm.status |= BIT(new_status);
	if (schedule_work(&msk->work))
		sock_hold((struct sock *)msk);
	mptcp_schedule_work((struct sock *)msk);
	return true;
}

+2 −4
Original line number Diff line number Diff line
@@ -416,14 +416,13 @@ void mptcp_pm_nl_rm_addr_received(struct mptcp_sock *msk)
	list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) {
		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
		int how = RCV_SHUTDOWN | SEND_SHUTDOWN;
		long timeout = 0;

		if (msk->pm.rm_id != subflow->remote_id)
			continue;

		spin_unlock_bh(&msk->pm.lock);
		mptcp_subflow_shutdown(sk, ssk, how);
		__mptcp_close_ssk(sk, ssk, subflow, timeout);
		__mptcp_close_ssk(sk, ssk, subflow);
		spin_lock_bh(&msk->pm.lock);

		msk->pm.add_addr_accepted--;
@@ -452,14 +451,13 @@ void mptcp_pm_nl_rm_subflow_received(struct mptcp_sock *msk, u8 rm_id)
	list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) {
		struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
		int how = RCV_SHUTDOWN | SEND_SHUTDOWN;
		long timeout = 0;

		if (rm_id != subflow->local_id)
			continue;

		spin_unlock_bh(&msk->pm.lock);
		mptcp_subflow_shutdown(sk, ssk, how);
		__mptcp_close_ssk(sk, ssk, subflow, timeout);
		__mptcp_close_ssk(sk, ssk, subflow);
		spin_lock_bh(&msk->pm.lock);

		msk->pm.local_addr_used--;
Loading