Commit 8d00e202 authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'tipc-multicast-through-replication'



Jon Maloy says:

====================
tipc: emulate multicast through replication

TIPC multicast messages are currently distributed via L2 broadcast
or IP multicast to all nodes in the cluster, irrespective of the
number of real destinations of the message.

In this series we introduce an option to transport messages via
replication ("replicast") across a selected number of unicast links,
instead of relying on the underlying media. This option is used when
true broadcast/multicast is not supported by the media, or when the
number of true destinations is much smaller than the cluster size.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents a5e8c070 01fd12bb
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
/*
 * include/uapi/linux/tipc.h: Header for TIPC socket interface
 *
 * Copyright (c) 2003-2006, Ericsson AB
 * Copyright (c) 2003-2006, 2015-2016 Ericsson AB
 * Copyright (c) 2005, 2010-2011, Wind River Systems
 * All rights reserved.
 *
@@ -220,7 +220,7 @@ struct sockaddr_tipc {
#define TIPC_DESTNAME	3	/* destination name */

/*
 * TIPC-specific socket option values
 * TIPC-specific socket option names
 */

#define TIPC_IMPORTANCE		127	/* Default: TIPC_LOW_IMPORTANCE */
@@ -229,6 +229,8 @@ struct sockaddr_tipc {
#define TIPC_CONN_TIMEOUT	130	/* Default: 8000 (ms)  */
#define TIPC_NODE_RECVQ_DEPTH	131	/* Default: none (read only) */
#define TIPC_SOCK_RECVQ_DEPTH	132	/* Default: none (read only) */
#define TIPC_MCAST_BROADCAST    133     /* Default: TIPC selects. No arg */
#define TIPC_MCAST_REPLICAST    134     /* Default: TIPC selects. No arg */

/*
 * Maximum sizes of TIPC bearer-related names (including terminating NULL)
+172 −28
Original line number Diff line number Diff line
/*
 * net/tipc/bcast.c: TIPC broadcast code
 *
 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
 * Copyright (c) 2004-2006, 2014-2016, Ericsson AB
 * Copyright (c) 2004, Intel Corporation.
 * Copyright (c) 2005, 2010-2011, Wind River Systems
 * All rights reserved.
@@ -39,9 +39,8 @@
#include "socket.h"
#include "msg.h"
#include "bcast.h"
#include "name_distr.h"
#include "link.h"
#include "node.h"
#include "name_table.h"

#define	BCLINK_WIN_DEFAULT	50	/* bcast link window size (default) */
#define	BCLINK_WIN_MIN	        32	/* bcast minimum link window size */
@@ -54,12 +53,20 @@ const char tipc_bclink_name[] = "broadcast-link";
 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
 * @dest: array keeping number of reachable destinations per bearer
 * @primary_bearer: a bearer having links to all broadcast destinations, if any
 * @bcast_support: indicates if primary bearer, if any, supports broadcast
 * @rcast_support: indicates if all peer nodes support replicast
 * @rc_ratio: dest count as percentage of cluster size where send method changes
 * @bc_threshold: calculated drom rc_ratio; if dests > threshold use broadcast
 */
struct tipc_bc_base {
	struct tipc_link *link;
	struct sk_buff_head inputq;
	int dests[MAX_BEARERS];
	int primary_bearer;
	bool bcast_support;
	bool rcast_support;
	int rc_ratio;
	int bc_threshold;
};

static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -69,7 +76,20 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)

int tipc_bcast_get_mtu(struct net *net)
{
	return tipc_link_mtu(tipc_bc_sndlink(net));
	return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
}

void tipc_bcast_disable_rcast(struct net *net)
{
	tipc_bc_base(net)->rcast_support = false;
}

static void tipc_bcbase_calc_bc_threshold(struct net *net)
{
	struct tipc_bc_base *bb = tipc_bc_base(net);
	int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));

	bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
}

/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -79,9 +99,10 @@ static void tipc_bcbase_select_primary(struct net *net)
{
	struct tipc_bc_base *bb = tipc_bc_base(net);
	int all_dests =  tipc_link_bc_peers(bb->link);
	int i, mtu;
	int i, mtu, prim;

	bb->primary_bearer = INVALID_BEARER_ID;
	bb->bcast_support = true;

	if (!all_dests)
		return;
@@ -93,7 +114,7 @@ static void tipc_bcbase_select_primary(struct net *net)
		mtu = tipc_bearer_mtu(net, i);
		if (mtu < tipc_link_mtu(bb->link))
			tipc_link_set_mtu(bb->link, mtu);

		bb->bcast_support &= tipc_bearer_bcast_support(net, i);
		if (bb->dests[i] < all_dests)
			continue;

@@ -103,6 +124,9 @@ static void tipc_bcbase_select_primary(struct net *net)
		if ((i ^ tipc_own_addr(net)) & 1)
			break;
	}
	prim = bb->primary_bearer;
	if (prim != INVALID_BEARER_ID)
		bb->bcast_support = tipc_bearer_bcast_support(net, prim);
}

void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
@@ -170,42 +194,128 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
	__skb_queue_purge(&_xmitq);
}

/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
 *                    and to identified node local sockets
static void tipc_bcast_select_xmit_method(struct net *net, int dests,
					  struct tipc_mc_method *method)
{
	struct tipc_bc_base *bb = tipc_bc_base(net);
	unsigned long exp = method->expires;

	/* Broadcast supported by used bearer/bearers? */
	if (!bb->bcast_support) {
		method->rcast = true;
		return;
	}
	/* Any destinations which don't support replicast ? */
	if (!bb->rcast_support) {
		method->rcast = false;
		return;
	}
	/* Can current method be changed ? */
	method->expires = jiffies + TIPC_METHOD_EXPIRE;
	if (method->mandatory || time_before(jiffies, exp))
		return;

	/* Determine method to use now */
	method->rcast = dests <= bb->bc_threshold;
}

/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
 * @net: the applicable net namespace
 * @list: chain of buffers containing message
 * @pkts: chain of buffers containing message
 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
 * Consumes the buffer chain.
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
 */
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
			   u16 *cong_link_cnt)
{
	struct tipc_link *l = tipc_bc_sndlink(net);
	struct sk_buff_head xmitq, inputq, rcvq;
	struct sk_buff_head xmitq;
	int rc = 0;

	__skb_queue_head_init(&rcvq);
	__skb_queue_head_init(&xmitq);
	skb_queue_head_init(&inputq);

	/* Prepare message clone for local node */
	if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
		return -EHOSTUNREACH;

	tipc_bcast_lock(net);
	if (tipc_link_bc_peers(l))
		rc = tipc_link_xmit(l, list, &xmitq);
		rc = tipc_link_xmit(l, pkts, &xmitq);
	tipc_bcast_unlock(net);

	/* Don't send to local node if adding to link failed */
	if (unlikely(rc && (rc != -ELINKCONG))) {
		__skb_queue_purge(&rcvq);
	tipc_bcbase_xmit(net, &xmitq);
	__skb_queue_purge(pkts);
	if (rc == -ELINKCONG) {
		*cong_link_cnt = 1;
		rc = 0;
	}
	return rc;
}

	/* Broadcast to all nodes, inluding local node */
	tipc_bcbase_xmit(net, &xmitq);
	tipc_sk_mcast_rcv(net, &rcvq, &inputq);
	__skb_queue_purge(list);
/* tipc_rcast_xmit - replicate and send a message to given destination nodes
 * @net: the applicable net namespace
 * @pkts: chain of buffers containing message
 * @dests: list of destination nodes
 * @cong_link_cnt: returns number of congested links
 * @cong_links: returns identities of congested links
 * Returns 0 if success, otherwise errno
 */
static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
			   struct tipc_nlist *dests, u16 *cong_link_cnt)
{
	struct sk_buff_head _pkts;
	struct u32_item *n, *tmp;
	u32 dst, selector;

	selector = msg_link_selector(buf_msg(skb_peek(pkts)));
	__skb_queue_head_init(&_pkts);

	list_for_each_entry_safe(n, tmp, &dests->list, list) {
		dst = n->value;
		if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
			return -ENOMEM;

		/* Any other return value than -ELINKCONG is ignored */
		if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
			(*cong_link_cnt)++;
	}
	return 0;
}

/* tipc_mcast_xmit - deliver message to indicated destination nodes
 *                   and to identified node local sockets
 * @net: the applicable net namespace
 * @pkts: chain of buffers containing message
 * @method: send method to be used
 * @dests: destination nodes for message.
 * @cong_link_cnt: returns number of encountered congested destination links
 * Consumes buffer chain.
 * Returns 0 if success, otherwise errno
 */
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
		    struct tipc_mc_method *method, struct tipc_nlist *dests,
		    u16 *cong_link_cnt)
{
	struct sk_buff_head inputq, localq;
	int rc = 0;

	skb_queue_head_init(&inputq);
	skb_queue_head_init(&localq);

	/* Clone packets before they are consumed by next call */
	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
		rc = -ENOMEM;
		goto exit;
	}
	/* Send according to determined transmit method */
	if (dests->remote) {
		tipc_bcast_select_xmit_method(net, dests->remote, method);
		if (method->rcast)
			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
		else
			rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
	}

	if (dests->local)
		tipc_sk_mcast_rcv(net, &localq, &inputq);
exit:
	/* This queue should normally be empty by now */
	__skb_queue_purge(pkts);
	return rc;
}

@@ -313,6 +423,7 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
	tipc_bcast_lock(net);
	tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
	tipc_bcbase_select_primary(net);
	tipc_bcbase_calc_bc_threshold(net);
	tipc_bcast_unlock(net);
}

@@ -331,6 +442,7 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
	tipc_bcast_lock(net);
	tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
	tipc_bcbase_select_primary(net);
	tipc_bcbase_calc_bc_threshold(net);
	tipc_bcast_unlock(net);

	tipc_bcbase_xmit(net, &xmitq);
@@ -413,6 +525,8 @@ int tipc_bcast_init(struct net *net)
		goto enomem;
	bb->link = l;
	tn->bcl = l;
	bb->rc_ratio = 25;
	bb->rcast_support = true;
	return 0;
enomem:
	kfree(bb);
@@ -428,3 +542,33 @@ void tipc_bcast_stop(struct net *net)
	kfree(tn->bcbase);
	kfree(tn->bcl);
}

void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
{
	memset(nl, 0, sizeof(*nl));
	INIT_LIST_HEAD(&nl->list);
	nl->self = self;
}

void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
{
	if (node == nl->self)
		nl->local = true;
	else if (u32_push(&nl->list, node))
		nl->remote++;
}

void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
{
	if (node == nl->self)
		nl->local = false;
	else if (u32_del(&nl->list, node))
		nl->remote--;
}

void tipc_nlist_purge(struct tipc_nlist *nl)
{
	u32_list_purge(&nl->list);
	nl->remote = 0;
	nl->local = 0;
}
+31 −2
Original line number Diff line number Diff line
@@ -42,9 +42,35 @@
struct tipc_node;
struct tipc_msg;
struct tipc_nl_msg;
struct tipc_node_map;
struct tipc_nlist;
struct tipc_nitem;
extern const char tipc_bclink_name[];

#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)

struct tipc_nlist {
	struct list_head list;
	u32 self;
	u16 remote;
	bool local;
};

void tipc_nlist_init(struct tipc_nlist *nl, u32 self);
void tipc_nlist_purge(struct tipc_nlist *nl);
void tipc_nlist_add(struct tipc_nlist *nl, u32 node);
void tipc_nlist_del(struct tipc_nlist *nl, u32 node);

/* Cookie to be used between socket and broadcast layer
 * @rcast: replicast (instead of broadcast) was used at previous xmit
 * @mandatory: broadcast/replicast indication was set by user
 * @expires: re-evaluate non-mandatory transmit method if we are past this
 */
struct tipc_mc_method {
	bool rcast;
	bool mandatory;
	unsigned long expires;
};

int tipc_bcast_init(struct net *net);
void tipc_bcast_stop(struct net *net);
void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
@@ -53,7 +79,10 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
int  tipc_bcast_get_mtu(struct net *net);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bcast_disable_rcast(struct net *net);
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
		    struct tipc_mc_method *method, struct tipc_nlist *dests,
		    u16 *cong_link_cnt);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
			struct tipc_msg *hdr);
+14 −1
Original line number Diff line number Diff line
@@ -431,7 +431,7 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,
	memset(&b->bcast_addr, 0, sizeof(b->bcast_addr));
	memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
	b->bcast_addr.media_id = b->media->type_id;
	b->bcast_addr.broadcast = 1;
	b->bcast_addr.broadcast = TIPC_BROADCAST_SUPPORT;
	b->mtu = dev->mtu;
	b->media->raw2addr(b, &b->addr, (char *)dev->dev_addr);
	rcu_assign_pointer(dev->tipc_ptr, b);
@@ -482,6 +482,19 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
	return 0;
}

bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id)
{
	bool supp = false;
	struct tipc_bearer *b;

	rcu_read_lock();
	b = bearer_get(net, bearer_id);
	if (b)
		supp = (b->bcast_addr.broadcast == TIPC_BROADCAST_SUPPORT);
	rcu_read_unlock();
	return supp;
}

int tipc_bearer_mtu(struct net *net, u32 bearer_id)
{
	int mtu = 0;
+7 −1
Original line number Diff line number Diff line
@@ -60,9 +60,14 @@
#define TIPC_MEDIA_TYPE_IB	2
#define TIPC_MEDIA_TYPE_UDP	3

/* minimum bearer MTU */
/* Minimum bearer MTU */
#define TIPC_MIN_BEARER_MTU	(MAX_H_SIZE + INT_H_SIZE)

/* Identifiers for distinguishing between broadcast/multicast and replicast
 */
#define TIPC_BROADCAST_SUPPORT  1
#define TIPC_REPLICAST_SUPPORT  2

/**
 * struct tipc_media_addr - destination address used by TIPC bearers
 * @value: address info (format defined by media)
@@ -210,6 +215,7 @@ int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
int tipc_bearer_mtu(struct net *net, u32 bearer_id);
bool tipc_bearer_bcast_support(struct net *net, u32 bearer_id);
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
			  struct sk_buff *skb,
			  struct tipc_media_addr *dest);
Loading