Commit 0d9c1ab3 authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: preallocate message data items



Currently message data items are allocated with ceph_msg_data_create()
in setup_request_data() inside send_request().  send_request() has never
been allowed to fail, so each allocation is followed by a BUG_ON:

  data = ceph_msg_data_create(...);
  BUG_ON(!data);

It's been this way since support for multiple message data items was
added in commit 6644ed7b ("libceph: make message data be a pointer")
in 3.10.

There is no reason to delay the allocation of message data items until
the last possible moment and we certainly don't need a linked list of
them as they are only ever appended to the end and never erased.  Make
ceph_msg_new2() take max_data_items and adapt the rest of the code.

Reported-by: default avatarJerry Lee <leisurelysw24@gmail.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 26f887e0
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -2071,7 +2071,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
	if (req->r_old_dentry_drop)
		len += req->r_old_dentry->d_name.len;

	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
	if (!msg) {
		msg = ERR_PTR(-ENOMEM);
		goto out_free2;
@@ -3129,7 +3129,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
	if (!pagelist)
		goto fail_nopagelist;

	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
	if (!reply)
		goto fail_nomsg;

+5 −19
Original line number Diff line number Diff line
@@ -82,22 +82,6 @@ enum ceph_msg_data_type {
	CEPH_MSG_DATA_BVECS,	/* data source/destination is a bio_vec array */
};

static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
{
	switch (type) {
	case CEPH_MSG_DATA_NONE:
	case CEPH_MSG_DATA_PAGES:
	case CEPH_MSG_DATA_PAGELIST:
#ifdef CONFIG_BLOCK
	case CEPH_MSG_DATA_BIO:
#endif /* CONFIG_BLOCK */
	case CEPH_MSG_DATA_BVECS:
		return true;
	default:
		return false;
	}
}

#ifdef CONFIG_BLOCK

struct ceph_bio_iter {
@@ -181,7 +165,6 @@ struct ceph_bvec_iter {
} while (0)

struct ceph_msg_data {
	struct list_head		links;	/* ceph_msg->data */
	enum ceph_msg_data_type		type;
	union {
#ifdef CONFIG_BLOCK
@@ -202,7 +185,6 @@ struct ceph_msg_data {

struct ceph_msg_data_cursor {
	size_t			total_resid;	/* across all data items */
	struct list_head	*data_head;	/* = &ceph_msg->data */

	struct ceph_msg_data	*data;		/* current data item */
	size_t			resid;		/* bytes not yet consumed */
@@ -240,7 +222,9 @@ struct ceph_msg {
	struct ceph_buffer *middle;

	size_t				data_length;
	struct list_head		data;
	struct ceph_msg_data		*data;
	int				num_data_items;
	int				max_data_items;
	struct ceph_msg_data_cursor	cursor;

	struct ceph_connection *con;
@@ -381,6 +365,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
			     struct ceph_bvec_iter *bvec_pos);

struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
			       gfp_t flags, bool can_fail);
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
				     bool can_fail);

+6 −5
Original line number Diff line number Diff line
@@ -13,14 +13,15 @@ struct ceph_msgpool {
	mempool_t *pool;
	int type;               /* preallocated message type */
	int front_len;          /* preallocated payload size */
	int max_data_items;
};

extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
			     int front_len, int size, bool blocking,
int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
		      int front_len, int max_data_items, int size,
		      const char *name);
extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
					 int front_len);
struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
				  int max_data_items);
extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);

#endif
+39 −67
Original line number Diff line number Diff line
@@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
/* Slab caches for frequently-allocated structures */

static struct kmem_cache	*ceph_msg_cache;
static struct kmem_cache	*ceph_msg_data_cache;

/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
@@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void)
	if (!ceph_msg_cache)
		return -ENOMEM;

	BUG_ON(ceph_msg_data_cache);
	ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
	if (ceph_msg_data_cache)
	return 0;

	kmem_cache_destroy(ceph_msg_cache);
	ceph_msg_cache = NULL;

	return -ENOMEM;
}

static void ceph_msgr_slab_exit(void)
{
	BUG_ON(!ceph_msg_data_cache);
	kmem_cache_destroy(ceph_msg_data_cache);
	ceph_msg_data_cache = NULL;

	BUG_ON(!ceph_msg_cache);
	kmem_cache_destroy(ceph_msg_cache);
	ceph_msg_cache = NULL;
@@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
{
	struct ceph_msg_data_cursor *cursor = &msg->cursor;
	struct ceph_msg_data *data;

	BUG_ON(!length);
	BUG_ON(length > msg->data_length);
	BUG_ON(list_empty(&msg->data));
	BUG_ON(!msg->num_data_items);

	cursor->data_head = &msg->data;
	cursor->total_resid = length;
	data = list_first_entry(&msg->data, struct ceph_msg_data, links);
	cursor->data = data;
	cursor->data = msg->data;

	__ceph_msg_data_cursor_init(cursor);
}
@@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,

	if (!cursor->resid && cursor->total_resid) {
		WARN_ON(!cursor->last_piece);
		BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
		cursor->data = list_next_entry(cursor->data, links);
		cursor->data++;
		__ceph_msg_data_cursor_init(cursor);
		new_piece = true;
	}
@@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con)

static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
	BUG_ON(!msg);
	BUG_ON(!data_len);

	/* Initialize data cursor */

	ceph_msg_data_cursor_init(msg, (size_t)data_len);
@@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con)

	dout("%s %p msg %p\n", __func__, con, msg);

	if (list_empty(&msg->data))
	if (!msg->num_data_items)
		return -EINVAL;

	/*
@@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
	u32 crc = 0;
	int ret;

	BUG_ON(!msg);
	if (list_empty(&msg->data))
	if (!msg->num_data_items)
		return -EIO;

	if (do_datacrc)
@@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con,
	return false;
}

static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
{
	struct ceph_msg_data *data;

	if (WARN_ON(!ceph_msg_data_type_valid(type)))
		return NULL;

	data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
	if (!data)
		return NULL;

	data->type = type;
	INIT_LIST_HEAD(&data->links);

	return data;
	BUG_ON(msg->num_data_items >= msg->max_data_items);
	return &msg->data[msg->num_data_items++];
}

static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
	if (!data)
		return;

	WARN_ON(!list_empty(&data->links));
	if (data->type == CEPH_MSG_DATA_PAGELIST)
		ceph_pagelist_release(data->pagelist);
	kmem_cache_free(ceph_msg_data_cache, data);
}

void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
@@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
	BUG_ON(!pages);
	BUG_ON(!length);

	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
	BUG_ON(!data);
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_PAGES;
	data->pages = pages;
	data->length = length;
	data->alignment = alignment & ~PAGE_MASK;

	list_add_tail(&data->links, &msg->data);
	msg->data_length += length;
}
EXPORT_SYMBOL(ceph_msg_data_add_pages);
@@ -3311,12 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
	BUG_ON(!pagelist);
	BUG_ON(!pagelist->length);

	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
	BUG_ON(!data);
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_PAGELIST;
	refcount_inc(&pagelist->refcnt);
	data->pagelist = pagelist;

	list_add_tail(&data->links, &msg->data);
	msg->data_length += pagelist->length;
}
EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
@@ -3327,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
{
	struct ceph_msg_data *data;

	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
	BUG_ON(!data);
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_BIO;
	data->bio_pos = *bio_pos;
	data->bio_length = length;

	list_add_tail(&data->links, &msg->data);
	msg->data_length += length;
}
EXPORT_SYMBOL(ceph_msg_data_add_bio);
@@ -3343,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
{
	struct ceph_msg_data *data;

	data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
	BUG_ON(!data);
	data = ceph_msg_data_add(msg);
	data->type = CEPH_MSG_DATA_BVECS;
	data->bvec_pos = *bvec_pos;

	list_add_tail(&data->links, &msg->data);
	msg->data_length += bvec_pos->iter.bi_size;
}
EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
@@ -3356,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
 * construct a new message with given type, size
 * the new msg has a ref count of 1.
 */
struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
			      bool can_fail)
struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
			       gfp_t flags, bool can_fail)
{
	struct ceph_msg *m;

@@ -3371,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,

	INIT_LIST_HEAD(&m->list_head);
	kref_init(&m->kref);
	INIT_LIST_HEAD(&m->data);

	/* front */
	if (front_len) {
@@ -3386,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
	}
	m->front_alloc_len = m->front.iov_len = front_len;

	if (max_data_items) {
		m->data = kmalloc_array(max_data_items, sizeof(*m->data),
					flags);
		if (!m->data)
			goto out2;

		m->max_data_items = max_data_items;
	}

	dout("ceph_msg_new %p front %d\n", m, front_len);
	return m;

@@ -3402,6 +3369,13 @@ out:
	}
	return NULL;
}
EXPORT_SYMBOL(ceph_msg_new2);

struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
			      bool can_fail)
{
	return ceph_msg_new2(type, front_len, 0, flags, can_fail);
}
EXPORT_SYMBOL(ceph_msg_new);

/*
@@ -3497,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m)
{
	dout("%s %p\n", __func__, m);
	kvfree(m->front.iov_base);
	kfree(m->data);
	kmem_cache_free(ceph_msg_cache, m);
}

static void ceph_msg_release(struct kref *kref)
{
	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
	struct ceph_msg_data *data, *next;
	int i;

	dout("%s %p\n", __func__, m);
	WARN_ON(!list_empty(&m->list_head));
@@ -3516,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref)
		m->middle = NULL;
	}

	list_for_each_entry_safe(data, next, &m->data, links) {
		list_del_init(&data->links);
		ceph_msg_data_destroy(data);
	}
	m->data_length = 0;
	for (i = 0; i < m->num_data_items; i++)
		ceph_msg_data_destroy(&m->data[i]);

	if (m->pool)
		ceph_msgpool_put(m->pool, m);
+17 −8
Original line number Diff line number Diff line
@@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
	struct ceph_msgpool *pool = arg;
	struct ceph_msg *msg;

	msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
	msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items,
			    gfp_mask, true);
	if (!msg) {
		dout("msgpool_alloc %s failed\n", pool->name);
	} else {
@@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg)
}

int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
		      int front_len, int size, bool blocking, const char *name)
		      int front_len, int max_data_items, int size,
		      const char *name)
{
	dout("msgpool %s init\n", name);
	pool->type = type;
	pool->front_len = front_len;
	pool->max_data_items = max_data_items;
	pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
	if (!pool->pool)
		return -ENOMEM;
@@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool)
	mempool_destroy(pool->pool);
}

struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
				  int front_len)
struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
				  int max_data_items)
{
	struct ceph_msg *msg;

	if (front_len > pool->front_len) {
		dout("msgpool_get %s need front %d, pool size is %d\n",
		       pool->name, front_len, pool->front_len);
	if (front_len > pool->front_len ||
	    max_data_items > pool->max_data_items) {
		pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n",
		    __func__, front_len, max_data_items, pool->name,
		    pool->front_len, pool->max_data_items);
		WARN_ON_ONCE(1);

		/* try to alloc a fresh message */
		return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
		return ceph_msg_new2(pool->type, front_len, max_data_items,
				     GFP_NOFS, false);
	}

	msg = mempool_alloc(pool->pool, GFP_NOFS);
@@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
	msg->front.iov_len = pool->front_len;
	msg->hdr.front_len = cpu_to_le32(pool->front_len);

	msg->data_length = 0;
	msg->num_data_items = 0;

	kref_init(&msg->kref);  /* retake single ref */
	mempool_free(msg, pool->pool);
}
Loading