Commit b37fe1f9 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: support versioned reply

In versioned reply, inodestat, dirstat and lease are encoded with
version, compat_version and struct_len.

Based on a patch from Jos Collin <jcollin@redhat.com>.

Link: http://tracker.ceph.com/issues/26936


Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 75c9627e
Loading
Loading
Loading
Loading
+165 −49
Original line number Diff line number Diff line
@@ -65,6 +65,29 @@ static const struct ceph_connection_operations mds_con_ops;
 * mds reply parsing
 */

static int parse_reply_info_quota(void **p, void *end,
				  struct ceph_mds_reply_info_in *info)
{
	u8 struct_v, struct_compat;
	u32 struct_len;

	ceph_decode_8_safe(p, end, struct_v, bad);
	ceph_decode_8_safe(p, end, struct_compat, bad);
	/* struct_v is expected to be >= 1. we only
	 * understand encoding with struct_compat == 1. */
	if (!struct_v || struct_compat != 1)
		goto bad;
	ceph_decode_32_safe(p, end, struct_len, bad);
	ceph_decode_need(p, end, struct_len, bad);
	end = *p + struct_len;
	ceph_decode_64_safe(p, end, info->max_bytes, bad);
	ceph_decode_64_safe(p, end, info->max_files, bad);
	*p = end;
	return 0;
bad:
	return -EIO;
}

/*
 * parse individual inode info
 */
@@ -72,8 +95,24 @@ static int parse_reply_info_in(void **p, void *end,
			       struct ceph_mds_reply_info_in *info,
			       u64 features)
{
	int err = -EIO;
	int err = 0;
	u8 struct_v = 0;

	if (features == (u64)-1) {
		u32 struct_len;
		u8 struct_compat;
		ceph_decode_8_safe(p, end, struct_v, bad);
		ceph_decode_8_safe(p, end, struct_compat, bad);
		/* struct_v is expected to be >= 1. we only understand
		 * encoding with struct_compat == 1. */
		if (!struct_v || struct_compat != 1)
			goto bad;
		ceph_decode_32_safe(p, end, struct_len, bad);
		ceph_decode_need(p, end, struct_len, bad);
		end = *p + struct_len;
	}

	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
	info->in = *p;
	*p += sizeof(struct ceph_mds_reply_inode) +
		sizeof(*info->in->fragtree.splits) *
@@ -91,6 +130,35 @@ static int parse_reply_info_in(void **p, void *end,
	info->xattr_data = *p;
	*p += info->xattr_len;

	if (features == (u64)-1) {
		/* inline data */
		ceph_decode_64_safe(p, end, info->inline_version, bad);
		ceph_decode_32_safe(p, end, info->inline_len, bad);
		ceph_decode_need(p, end, info->inline_len, bad);
		info->inline_data = *p;
		*p += info->inline_len;
		/* quota */
		err = parse_reply_info_quota(p, end, info);
		if (err < 0)
			goto out_bad;
		/* pool namespace */
		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
		if (info->pool_ns_len > 0) {
			ceph_decode_need(p, end, info->pool_ns_len, bad);
			info->pool_ns_data = *p;
			*p += info->pool_ns_len;
		}
		/* btime, change_attr */
		{
			struct ceph_timespec btime;
			u64 change_attr;
			ceph_decode_need(p, end, sizeof(btime), bad);
			ceph_decode_copy(p, &btime, sizeof(btime));
			ceph_decode_64_safe(p, end, change_attr, bad);
		}

		*p = end;
	} else {
		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
			ceph_decode_64_safe(p, end, info->inline_version, bad);
			ceph_decode_32_safe(p, end, info->inline_len, bad);
@@ -101,20 +169,9 @@ static int parse_reply_info_in(void **p, void *end,
			info->inline_version = CEPH_INLINE_NONE;

		if (features & CEPH_FEATURE_MDS_QUOTA) {
		u8 struct_v, struct_compat;
		u32 struct_len;

		/*
		 * both struct_v and struct_compat are expected to be >= 1
		 */
		ceph_decode_8_safe(p, end, struct_v, bad);
		ceph_decode_8_safe(p, end, struct_compat, bad);
		if (!struct_v || !struct_compat)
			goto bad;
		ceph_decode_32_safe(p, end, struct_len, bad);
		ceph_decode_need(p, end, struct_len, bad);
		ceph_decode_64_safe(p, end, info->max_bytes, bad);
		ceph_decode_64_safe(p, end, info->max_files, bad);
			err = parse_reply_info_quota(p, end, info);
			if (err < 0)
				goto out_bad;
		} else {
			info->max_bytes = 0;
			info->max_files = 0;
@@ -130,12 +187,72 @@ static int parse_reply_info_in(void **p, void *end,
				*p += info->pool_ns_len;
			}
		}

	}
	return 0;
bad:
	err = -EIO;
out_bad:
	return err;
}

static int parse_reply_info_dir(void **p, void *end,
				struct ceph_mds_reply_dirfrag **dirfrag,
				u64 features)
{
	if (features == (u64)-1) {
		u8 struct_v, struct_compat;
		u32 struct_len;
		ceph_decode_8_safe(p, end, struct_v, bad);
		ceph_decode_8_safe(p, end, struct_compat, bad);
		/* struct_v is expected to be >= 1. we only understand
		 * encoding whose struct_compat == 1. */
		if (!struct_v || struct_compat != 1)
			goto bad;
		ceph_decode_32_safe(p, end, struct_len, bad);
		ceph_decode_need(p, end, struct_len, bad);
		end = *p + struct_len;
	}

	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
	*dirfrag = *p;
	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
	if (unlikely(*p > end))
		goto bad;
	if (features == (u64)-1)
		*p = end;
	return 0;
bad:
	return -EIO;
}

static int parse_reply_info_lease(void **p, void *end,
				  struct ceph_mds_reply_lease **lease,
				  u64 features)
{
	if (features == (u64)-1) {
		u8 struct_v, struct_compat;
		u32 struct_len;
		ceph_decode_8_safe(p, end, struct_v, bad);
		ceph_decode_8_safe(p, end, struct_compat, bad);
		/* struct_v is expected to be >= 1. we only understand
		 * encoding whose struct_compat == 1. */
		if (!struct_v || struct_compat != 1)
			goto bad;
		ceph_decode_32_safe(p, end, struct_len, bad);
		ceph_decode_need(p, end, struct_len, bad);
		end = *p + struct_len;
	}

	ceph_decode_need(p, end, sizeof(**lease), bad);
	*lease = *p;
	*p += sizeof(**lease);
	if (features == (u64)-1)
		*p = end;
	return 0;
bad:
	return -EIO;
}

/*
 * parse a normal reply, which may contain a (dir+)dentry and/or a
 * target inode.
@@ -151,20 +268,18 @@ static int parse_reply_info_trace(void **p, void *end,
		if (err < 0)
			goto out_bad;

		if (unlikely(*p + sizeof(*info->dirfrag) > end))
			goto bad;
		info->dirfrag = *p;
		*p += sizeof(*info->dirfrag) +
			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
		if (unlikely(*p > end))
			goto bad;
		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
		if (err < 0)
			goto out_bad;

		ceph_decode_32_safe(p, end, info->dname_len, bad);
		ceph_decode_need(p, end, info->dname_len, bad);
		info->dname = *p;
		*p += info->dname_len;
		info->dlease = *p;
		*p += sizeof(*info->dlease);

		err = parse_reply_info_lease(p, end, &info->dlease, features);
		if (err < 0)
			goto out_bad;
	}

	if (info->head->is_target) {
@@ -187,20 +302,16 @@ out_bad:
/*
 * parse readdir results
 */
static int parse_reply_info_dir(void **p, void *end,
static int parse_reply_info_readdir(void **p, void *end,
				struct ceph_mds_reply_info_parsed *info,
				u64 features)
{
	u32 num, i = 0;
	int err;

	info->dir_dir = *p;
	if (*p + sizeof(*info->dir_dir) > end)
		goto bad;
	*p += sizeof(*info->dir_dir) +
		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
	if (*p > end)
		goto bad;
	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
	if (err < 0)
		goto out_bad;

	ceph_decode_need(p, end, sizeof(num) + 2, bad);
	num = ceph_decode_32(p);
@@ -226,15 +337,16 @@ static int parse_reply_info_dir(void **p, void *end,
	while (num) {
		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
		/* dentry */
		ceph_decode_need(p, end, sizeof(u32)*2, bad);
		rde->name_len = ceph_decode_32(p);
		ceph_decode_32_safe(p, end, rde->name_len, bad);
		ceph_decode_need(p, end, rde->name_len, bad);
		rde->name = *p;
		*p += rde->name_len;
		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
		rde->lease = *p;
		*p += sizeof(struct ceph_mds_reply_lease);

		/* dentry lease */
		err = parse_reply_info_lease(p, end, &rde->lease, features);
		if (err)
			goto out_bad;
		/* inode */
		err = parse_reply_info_in(p, end, &rde->inode, features);
		if (err < 0)
@@ -285,7 +397,8 @@ static int parse_reply_info_create(void **p, void *end,
				  struct ceph_mds_reply_info_parsed *info,
				  u64 features)
{
	if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
	if (features == (u64)-1 ||
	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
		if (*p == end) {
			info->has_create_ino = false;
		} else {
@@ -314,7 +427,7 @@ static int parse_reply_info_extra(void **p, void *end,
	if (op == CEPH_MDS_OP_GETFILELOCK)
		return parse_reply_info_filelock(p, end, info, features);
	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
		return parse_reply_info_dir(p, end, info, features);
		return parse_reply_info_readdir(p, end, info, features);
	else if (op == CEPH_MDS_OP_CREATE)
		return parse_reply_info_create(p, end, info, features);
	else
@@ -2657,6 +2770,9 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)

	dout("handle_reply tid %lld result %d\n", tid, result);
	rinfo = &req->r_reply_info;
	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
		err = parse_reply_info(msg, rinfo, (u64)-1);
	else
		err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
	mutex_unlock(&mdsc->mutex);

+1 −0
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@
#define CEPHFS_FEATURES_CLIENT_SUPPORTED { 	\
	0, 1, 2, 3, 4, 5, 6, 7,			\
	CEPHFS_FEATURE_MIMIC,			\
	CEPHFS_FEATURE_REPLY_ENCODING,		\
	CEPHFS_FEATURE_LAZY_CAP_WANTED,		\
	CEPHFS_FEATURE_MULTI_RECONNECT,		\
}