]> git.neil.brown.name Git - history.git/commitdiff
[PATCH] kNFSd: Use ->sendpage to send nfsd (and lockd) replies.
authorNeil Brown <neilb@cse.unsw.edu.au>
Wed, 6 Nov 2002 13:00:28 +0000 (05:00 -0800)
committerLinus Torvalds <torvalds@home.transmeta.com>
Wed, 6 Nov 2002 13:00:28 +0000 (05:00 -0800)
From Hirokazu Takahashi <taka@valinux.co.jp>

As all rpc server replies are now in well defined pages,
we can use ->sendpage to send these replies, and so
make use for zero-copy transmit on network cards that
support it.

fs/nfsd/nfs3xdr.c
fs/nfsd/nfsxdr.c
include/linux/sunrpc/svc.h
include/linux/sunrpc/svcsock.h
net/sunrpc/svc.c
net/sunrpc/svcsock.c

index 963bf3c7bf1daca7d21b2f8498319c90f5adab4f..9acf179940afa56d1b747146973e2af432908956 100644 (file)
@@ -338,7 +338,7 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, u32 *p,
        v=0;
        while (len > 0) {
                pn = rqstp->rq_resused;
-               take_page(rqstp);
+               svc_take_page(rqstp);
                args->vec[v].iov_base = page_address(rqstp->rq_respages[pn]);
                args->vec[v].iov_len = len < PAGE_SIZE? len : PAGE_SIZE;
                v++;
@@ -603,7 +603,7 @@ nfs3svc_encode_readres(struct svc_rqst *rqstp, u32 *p,
                rqstp->rq_res.page_base = 0;
                rqstp->rq_res.page_len = resp->count;
                if (resp->count & 3) {
-                       /* need to page with tail */
+                       /* need to pad the tail */
                        rqstp->rq_res.tail[0].iov_base = p;
                        *p = 0;
                        rqstp->rq_res.tail[0].iov_len = 4 - (resp->count & 3);
index d0895793efb15564cf519ba864ca98bf924fe7cb..f10cc7207e82f7c48aa226dfd5e7af5a493fe6b5 100644 (file)
@@ -239,7 +239,7 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, u32 *p,
        v=0;
        while (len > 0) {
                pn=rqstp->rq_resused;
-               take_page(rqstp);
+               svc_take_page(rqstp);
                args->vec[v].iov_base = page_address(rqstp->rq_respages[pn]);
                args->vec[v].iov_len = len < PAGE_SIZE?len:PAGE_SIZE;
                v++;
@@ -388,7 +388,7 @@ nfssvc_encode_readres(struct svc_rqst *rqstp, u32 *p,
        rqstp->rq_res.page_base = 0;
        rqstp->rq_res.page_len = resp->count;
        if (resp->count & 3) {
-               /* need to pad with tail */
+               /* need to pad the tail */
                rqstp->rq_res.tail[0].iov_base = p;
                *p = 0;
                rqstp->rq_res.tail[0].iov_len = 4 - (resp->count&3);
index 24464d66411ad661faa9c3bd393b3db0bf30f0ff..2fa53b11e52c359a2568550a698e1ba3b61ff2fe 100644 (file)
@@ -15,6 +15,7 @@
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/svcauth.h>
 #include <linux/wait.h>
+#include <linux/mm.h>
 
 /*
  * RPC service.
@@ -171,7 +172,7 @@ xdr_ressize_check(struct svc_rqst *rqstp, u32 *p)
        return vec->iov_len <= PAGE_SIZE;
 }
 
-static int inline take_page(struct svc_rqst *rqstp)
+static int inline svc_take_page(struct svc_rqst *rqstp)
 {
        if (rqstp->rq_arghi <= rqstp->rq_argused)
                return -ENOMEM;
@@ -180,6 +181,27 @@ static int inline take_page(struct svc_rqst *rqstp)
        return 0;
 }
 
+static void inline svc_pushback_allpages(struct svc_rqst *rqstp)
+{
+        while (rqstp->rq_resused) {
+               if (rqstp->rq_respages[--rqstp->rq_resused] == NULL)
+                       continue;
+               rqstp->rq_argpages[rqstp->rq_arghi++] =
+                       rqstp->rq_respages[rqstp->rq_resused];
+               rqstp->rq_respages[rqstp->rq_resused] = NULL;
+       }
+}
+
+static void inline svc_free_allpages(struct svc_rqst *rqstp)
+{
+        while (rqstp->rq_resused) {
+               if (rqstp->rq_respages[--rqstp->rq_resused] == NULL)
+                       continue;
+               put_page(rqstp->rq_respages[rqstp->rq_resused]);
+               rqstp->rq_respages[rqstp->rq_resused] = NULL;
+       }
+}
+
 struct svc_deferred_req {
        struct svc_serv         *serv;
        u32                     prot;   /* protocol (UDP or TCP) */
index 2f90342e4c76d3ac82ba1c211f7dff2df242f7c8..61f37d10b47dea7e6f3a55f330da90d8edf19b55 100644 (file)
@@ -37,6 +37,7 @@ struct svc_sock {
 
        struct list_head        sk_deferred;    /* deferred requests that need to
                                                 * be revisted */
+       struct semaphore        sk_sem;         /* to serialize sending data */
 
        int                     (*sk_recvfrom)(struct svc_rqst *rqstp);
        int                     (*sk_sendto)(struct svc_rqst *rqstp);
index 60cdc3cdb3004c88fef6c578131f8a61e79bb40e..f39325536c6add0911067fa243fd88cd8833a8b3 100644 (file)
@@ -138,8 +138,11 @@ svc_release_buffer(struct svc_rqst *rqstp)
 {
        while (rqstp->rq_arghi)
                put_page(rqstp->rq_argpages[--rqstp->rq_arghi]);
-       while (rqstp->rq_resused)
-               put_page(rqstp->rq_respages[--rqstp->rq_resused]);
+       while (rqstp->rq_resused) {
+               if (rqstp->rq_respages[--rqstp->rq_resused] == NULL)
+                       continue;
+               put_page(rqstp->rq_respages[rqstp->rq_resused]);
+       }
        rqstp->rq_argused = 0;
 }
 
@@ -264,13 +267,14 @@ svc_process(struct svc_serv *serv, struct svc_rqst *rqstp)
        /* setup response xdr_buf.
         * Initially it has just one page 
         */
-       take_page(rqstp); /* must succeed */
+       svc_take_page(rqstp); /* must succeed */
        resv->iov_base = page_address(rqstp->rq_respages[0]);
        resv->iov_len = 0;
        rqstp->rq_res.pages = rqstp->rq_respages+1;
        rqstp->rq_res.len = 0;
        rqstp->rq_res.page_base = 0;
        rqstp->rq_res.page_len = 0;
+       rqstp->rq_res.tail[0].iov_len = 0;
        /* tcp needs a space for the record length... */
        if (rqstp->rq_prot == IPPROTO_TCP)
                svc_putu32(resv, 0);
index 4894ce957549b23ff39746f057cca00c82871550..5d249cb0e92d8037a33c23bd79e83594a3bfa249 100644 (file)
@@ -273,6 +273,11 @@ svc_sock_release(struct svc_rqst *rqstp)
 
        svc_release_skb(rqstp);
 
+       svc_free_allpages(rqstp);
+       rqstp->rq_res.page_len = 0;
+       rqstp->rq_res.page_base = 0;
+
+
        /* Reset response buffer and release
         * the reservation.
         * But first, check that enough space was reserved
@@ -317,38 +322,82 @@ svc_wake_up(struct svc_serv *serv)
  * Generic sendto routine
  */
 static int
-svc_sendto(struct svc_rqst *rqstp, struct iovec *iov, int nr)
+svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
 {
        mm_segment_t    oldfs;
        struct svc_sock *svsk = rqstp->rq_sock;
        struct socket   *sock = svsk->sk_sock;
        struct msghdr   msg;
-       int             i, buflen, len;
+       int             slen;
+       int             len = 0;
+       int             result;
+       int             size;
+       struct page     **ppage = xdr->pages;
+       size_t          base = xdr->page_base;
+       unsigned int    pglen = xdr->page_len;
+       unsigned int    flags = MSG_MORE;
 
-       for (i = buflen = 0; i < nr; i++)
-               buflen += iov[i].iov_len;
+       slen = xdr->len;
 
        msg.msg_name    = &rqstp->rq_addr;
        msg.msg_namelen = sizeof(rqstp->rq_addr);
-       msg.msg_iov     = iov;
-       msg.msg_iovlen  = nr;
+       msg.msg_iov     = NULL;
+       msg.msg_iovlen  = 0;
        msg.msg_control = NULL;
        msg.msg_controllen = 0;
+       msg.msg_flags   = MSG_MORE;
 
-       /* This was MSG_DONTWAIT, but I now want it to wait.
-        * The only thing that it would wait for is memory and
-        * if we are fairly low on memory, then we aren't likely
-        * to make much progress anyway.
-        * sk->sndtimeo is set to 30seconds just in case.
-        */
-       msg.msg_flags   = 0;
+       /* Grab svsk->sk_sem to serialize outgoing data. */
+       down(&svsk->sk_sem);
 
+       /* set the destination */
        oldfs = get_fs(); set_fs(KERNEL_DS);
-       len = sock_sendmsg(sock, &msg, buflen);
+       len = sock_sendmsg(sock, &msg, 0);
        set_fs(oldfs);
+       if (len < 0)
+               goto out;
+
+       /* send head */
+       if (slen == xdr->head[0].iov_len)
+               flags = 0;
+       len = sock->ops->sendpage(sock, rqstp->rq_respages[0], 0, xdr->head[0].iov_len, flags);
+       if (len != xdr->head[0].iov_len)
+               goto out;
+       slen -= xdr->head[0].iov_len;
+       if (slen == 0)
+               goto out;
+
+       /* send page data */
+       size = PAGE_SIZE - base < pglen ? PAGE_SIZE - base : pglen;
+       while (pglen > 0) {
+               if (slen == size)
+                       flags = 0;
+               result = sock->ops->sendpage(sock, *ppage, base, size, flags);
+               if (result > 0)
+                       len += result;
+               if (result != size)
+                       goto out;
+               slen -= size;
+               pglen -= size;
+               size = PAGE_SIZE < pglen ? PAGE_SIZE : pglen;
+               base = 0;
+               ppage++;
+       }
+       /* send tail */
+       if (xdr->tail[0].iov_len) {
+               /* The tail *will* be in respages[0]; */
+               result = sock->ops->sendpage(sock, rqstp->rq_respages[0], 
+                                            ((unsigned long)xdr->tail[0].iov_base)& (PAGE_SIZE-1),
+                                            xdr->tail[0].iov_len, 0);
+
+               if (result > 0)
+                       len += result;
+       }
+out:
+       up(&svsk->sk_sem);
 
-       dprintk("svc: socket %p sendto([%p %Zu... ], %d, %d) = %d (addr %x)\n",
-                       rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, nr, buflen, len,
+       dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %x)\n",
+                       rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len, xdr->len, len,
                rqstp->rq_addr.sin_addr.s_addr);
 
        return len;
@@ -550,35 +599,11 @@ static int
 svc_udp_sendto(struct svc_rqst *rqstp)
 {
        int             error;
-       struct iovec vec[RPCSVC_MAXPAGES];
-       int v;
-       int base, len;
 
-       /* Set up the first element of the reply iovec.
-        * Any other iovecs that may be in use have been taken
-        * care of by the server implementation itself.
-        */
-       vec[0] = rqstp->rq_res.head[0];
-       v=1;
-       base=rqstp->rq_res.page_base;
-       len = rqstp->rq_res.page_len;
-       while (len) {
-               vec[v].iov_base = page_address(rqstp->rq_res.pages[v-1]) + base;
-               vec[v].iov_len = PAGE_SIZE-base;
-               if (len <= vec[v].iov_len)
-                       vec[v].iov_len = len;
-               len -= vec[v].iov_len;
-               base = 0;
-               v++;
-       }
-       if (rqstp->rq_res.tail[0].iov_len) {
-               vec[v] = rqstp->rq_res.tail[0];
-               v++;
-       }
-       error = svc_sendto(rqstp, vec, v);
+       error = svc_sendto(rqstp, &rqstp->rq_res);
        if (error == -ECONNREFUSED)
                /* ICMP error on earlier request. */
-               error = svc_sendto(rqstp, vec, v);
+               error = svc_sendto(rqstp, &rqstp->rq_res);
 
        return error;
 }
@@ -940,9 +965,6 @@ static int
 svc_tcp_sendto(struct svc_rqst *rqstp)
 {
        struct xdr_buf  *xbufp = &rqstp->rq_res;
-       struct iovec vec[RPCSVC_MAXPAGES];
-       int v;
-       int base, len;
        int sent;
        u32 reclen;
 
@@ -953,25 +975,7 @@ svc_tcp_sendto(struct svc_rqst *rqstp)
        reclen = htonl(0x80000000|((xbufp->len ) - 4));
        memcpy(xbufp->head[0].iov_base, &reclen, 4);
 
-       vec[0] = rqstp->rq_res.head[0];
-       v=1;
-       base= xbufp->page_base;
-       len = xbufp->page_len;
-       while (len) {
-               vec[v].iov_base = page_address(xbufp->pages[v-1]) + base;
-               vec[v].iov_len = PAGE_SIZE-base;
-               if (len <= vec[v].iov_len)
-                       vec[v].iov_len = len;
-               len -= vec[v].iov_len;
-               base = 0;
-               v++;
-       }
-       if (xbufp->tail[0].iov_len) {
-               vec[v] = xbufp->tail[0];
-               v++;
-       }
-
-       sent = svc_sendto(rqstp, vec, v);
+       sent = svc_sendto(rqstp, &rqstp->rq_res);
        if (sent != xbufp->len) {
                printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
                       rqstp->rq_sock->sk_server->sv_name,
@@ -1066,9 +1070,8 @@ svc_recv(struct svc_serv *serv, struct svc_rqst *rqstp, long timeout)
 
        /* Initialize the buffers */
        /* first reclaim pages that were moved to response list */
-       while (rqstp->rq_resused) 
-               rqstp->rq_argpages[rqstp->rq_arghi++] =
-                       rqstp->rq_respages[--rqstp->rq_resused];
+       svc_pushback_allpages(rqstp);
+
        /* now allocate needed pages.  If we get a failure, sleep briefly */
        pages = 2 + (serv->sv_bufsz + PAGE_SIZE -1) / PAGE_SIZE;
        while (rqstp->rq_arghi < pages) {
@@ -1238,6 +1241,7 @@ svc_setup_socket(struct svc_serv *serv, struct socket *sock,
        svsk->sk_server = serv;
        svsk->sk_lastrecv = CURRENT_TIME;
        INIT_LIST_HEAD(&svsk->sk_deferred);
+       sema_init(&svsk->sk_sem, 1);
 
        /* Initialize the socket */
        if (sock->type == SOCK_DGRAM)