Commit fce7a974 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: refactor ceph_sync_read()



Avoid allocating memory for the entire user request: striped_read()
does a synchronous OSD request per object, so it doesn't need more than
object size worth of pages at a time.

[ Preserve the comment, changelog. ]

Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 74c9e6bf
Loading
Loading
Loading
Loading
+106 −113
Original line number Diff line number Diff line
@@ -556,91 +556,27 @@ enum {
	READ_INLINE =  3,
};

/*
 * Read a range of bytes striped over one or more objects.  Iterate over
 * objects we stripe over.  (That's not atomic, but good enough for now.)
 *
 * If we get a short result from the OSD, check against i_size; we need to
 * only return a short read to the caller if we hit EOF.
 */
static int striped_read(struct inode *inode,
			u64 pos, u64 len,
			struct page **pages, int num_pages,
			int page_align, int *checkeof)
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_inode_info *ci = ceph_inode(inode);
	u64 this_len;
	loff_t i_size;
	int page_idx;
	int ret, read = 0;
	bool hit_stripe, was_short;

	/*
	 * we may need to do multiple reads.  not atomic, unfortunately.
	 */
more:
	this_len = len;
	page_idx = (page_align + read) >> PAGE_SHIFT;
	ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
				  &ci->i_layout, pos, &this_len,
				  ci->i_truncate_seq, ci->i_truncate_size,
				  pages + page_idx, num_pages - page_idx,
				  ((page_align + read) & ~PAGE_MASK));
	if (ret == -ENOENT)
		ret = 0;
	hit_stripe = this_len < len;
	was_short = ret >= 0 && ret < this_len;
	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");

	i_size = i_size_read(inode);
	if (ret >= 0) {
		if (was_short && (pos + ret < i_size)) {
			int zlen = min(this_len - ret, i_size - pos - ret);
			int zoff = page_align + read + ret;
			dout(" zero gap %llu to %llu\n",
			     pos + ret, pos + ret + zlen);
			ceph_zero_page_vector_range(zoff, zlen, pages);
			ret += zlen;
		}

		read += ret;
		pos += ret;
		len -= ret;

		/* hit stripe and need continue*/
		if (len && hit_stripe && pos < i_size)
			goto more;
	}

	if (read > 0) {
		ret = read;
		/* did we bounce off eof? */
		if (pos + len > i_size)
			*checkeof = CHECK_EOF;
	}

	dout("striped_read returns %d\n", ret);
	return ret;
}

/*
 * Completely synchronous read and write methods.  Direct from __user
 * buffer to osd, or directly to user pages (if O_DIRECT).
 *
 * If the read spans object boundary, just do multiple reads.
 * If the read spans object boundary, just do multiple reads.  (That's not
 * atomic, but good enough for now.)
 *
 * If we get a short result from the OSD, check against i_size; we need to
 * only return a short read to the caller if we hit EOF.
 */
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
			      int *checkeof)
			      int *retry_op)
{
	struct file *file = iocb->ki_filp;
	struct inode *inode = file_inode(file);
	struct page **pages;
	u64 off = iocb->ki_pos;
	int num_pages;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_osd_client *osdc = &fsc->client->osdc;
	ssize_t ret;
	size_t len = iov_iter_count(to);
	u64 off = iocb->ki_pos;
	u64 len = iov_iter_count(to);

	dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,21 +589,79 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
	 * but it will at least behave sensibly when they are
	 * in sequence.
	 */
	ret = filemap_write_and_wait_range(inode->i_mapping, off,
						off + len);
	ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
	if (ret < 0)
		return ret;

	if (unlikely(to->type & ITER_PIPE)) {
	ret = 0;
	while ((len = iov_iter_count(to)) > 0) {
		struct ceph_osd_request *req;
		struct page **pages;
		int num_pages;
		size_t page_off;
		u64 i_size;
		bool more;

		req = ceph_osdc_new_request(osdc, &ci->i_layout,
					ci->i_vino, off, &len, 0, 1,
					CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
					NULL, ci->i_truncate_seq,
					ci->i_truncate_size, false);
		if (IS_ERR(req)) {
			ret = PTR_ERR(req);
			break;
		}

		more = len < iov_iter_count(to);

		if (unlikely(to->type & ITER_PIPE)) {
			ret = iov_iter_get_pages_alloc(to, &pages, len,
						       &page_off);
		if (ret <= 0)
			return -ENOMEM;
			if (ret <= 0) {
				ceph_osdc_put_request(req);
				ret = -ENOMEM;
				break;
			}
			num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
			if (ret < len) {
				len = ret;
				osd_req_op_extent_update(req, 0, len);
				more = false;
			}
		} else {
			num_pages = calc_pages_for(off, len);
			page_off = off & ~PAGE_MASK;
			pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
			if (IS_ERR(pages)) {
				ceph_osdc_put_request(req);
				ret = PTR_ERR(pages);
				break;
			}
		}

		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
						 false, false);
		ret = ceph_osdc_start_request(osdc, req, false);
		if (!ret)
			ret = ceph_osdc_wait_request(osdc, req);
		ceph_osdc_put_request(req);

		i_size = i_size_read(inode);
		dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
		     off, len, ret, i_size, (more ? " MORE" : ""));

		if (ret == -ENOENT)
			ret = 0;
		if (ret >= 0 && ret < len && (off + ret < i_size)) {
			int zlen = min(len - ret, i_size - off - ret);
			int zoff = page_off + ret;
			dout("sync_read zero gap %llu~%llu\n",
                             off + ret, off + ret + zlen);
			ceph_zero_page_vector_range(zoff, zlen, pages);
			ret += zlen;
		}

		ret = striped_read(inode, off, ret, pages, num_pages,
				   page_off, checkeof);
		if (unlikely(to->type & ITER_PIPE)) {
			if (ret > 0) {
				iov_iter_advance(to, ret);
				off += ret;
@@ -676,38 +670,37 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
			}
			ceph_put_page_vector(pages, num_pages, false);
		} else {
		num_pages = calc_pages_for(off, len);
		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
		if (IS_ERR(pages))
			return PTR_ERR(pages);

		ret = striped_read(inode, off, len, pages, num_pages,
				   (off & ~PAGE_MASK), checkeof);
		if (ret > 0) {
			int l, k = 0;
			size_t left = ret;

			while (left) {
				size_t page_off = off & ~PAGE_MASK;
				size_t copy = min_t(size_t, left,
						    PAGE_SIZE - page_off);
				l = copy_page_to_iter(pages[k++], page_off,
						      copy, to);
				off += l;
				left -= l;
				if (l < copy)
			int idx = 0;
			size_t left = ret > 0 ? ret : 0;
			while (left > 0) {
				size_t len, copied;
				page_off = off & ~PAGE_MASK;
				len = min_t(size_t, left, PAGE_SIZE - page_off);
				copied = copy_page_to_iter(pages[idx++],
							   page_off, len, to);
				off += copied;
				left -= copied;
				if (copied < len) {
					ret = -EFAULT;
					break;
				}
			}
			ceph_release_page_vector(pages, num_pages);
		}

		if (ret <= 0 || off >= i_size || !more)
			break;
	}

	if (off > iocb->ki_pos) {
		if (ret >= 0 &&
		    iov_iter_count(to) > 0 && off >= i_size_read(inode))
			*retry_op = CHECK_EOF;
		ret = off - iocb->ki_pos;
		iocb->ki_pos = off;
	}

	dout("sync_read result %zd\n", ret);
	dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
	return ret;
}