#include <linux/unistd.h>
#include <linux/sunrpc/clnt.h>
#include <linux/file.h>
+#include <linux/workqueue.h>
#include <net/sock.h>
#include <net/checksum.h>
#endif
#define XPRT_MAX_BACKOFF (8)
+#define XPRT_IDLE_TIMEOUT (5*60*HZ)
/*
* Local functions
{
struct rpc_rqst *req = task->tk_rqstp;
- if (!xprt->snd_task) {
- if (xprt->nocong || __xprt_get_cong(xprt, task)) {
- xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
- req->rq_ntrans++;
- }
- }
+ if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) {
+ if (task == xprt->snd_task)
+ return 1;
+ if (task == NULL)
+ return 0;
+ goto out_sleep;
}
- if (xprt->snd_task != task) {
- dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
- task->tk_timeout = 0;
- task->tk_status = -EAGAIN;
- if (req && req->rq_ntrans)
- rpc_sleep_on(&xprt->resend, task, NULL, NULL);
- else
- rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+ if (xprt->nocong || __xprt_get_cong(xprt, task)) {
+ xprt->snd_task = task;
+ if (req) {
+ req->rq_bytes_sent = 0;
+ req->rq_ntrans++;
+ }
+ return 1;
}
- return xprt->snd_task == task;
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_LOCKED, &xprt->sockstate);
+ smp_mb__after_clear_bit();
+out_sleep:
+ dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt);
+ task->tk_timeout = 0;
+ task->tk_status = -EAGAIN;
+ if (req && req->rq_ntrans)
+ rpc_sleep_on(&xprt->resend, task, NULL, NULL);
+ else
+ rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+ return 0;
}
static inline int
{
struct rpc_task *task;
- if (xprt->snd_task)
+ if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
return;
+ if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
+ goto out_unlock;
task = rpc_wake_up_next(&xprt->resend);
if (!task) {
- if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
- return;
task = rpc_wake_up_next(&xprt->sending);
if (!task)
- return;
+ goto out_unlock;
}
if (xprt->nocong || __xprt_get_cong(xprt, task)) {
struct rpc_rqst *req = task->tk_rqstp;
req->rq_bytes_sent = 0;
req->rq_ntrans++;
}
+ return;
}
+out_unlock:
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_LOCKED, &xprt->sockstate);
+ smp_mb__after_clear_bit();
}
/*
static void
__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{
- if (xprt->snd_task == task)
+ if (xprt->snd_task == task) {
xprt->snd_task = NULL;
- __xprt_lock_write_next(xprt);
+ smp_mb__before_clear_bit();
+ clear_bit(XPRT_LOCKED, &xprt->sockstate);
+ smp_mb__after_clear_bit();
+ __xprt_lock_write_next(xprt);
+ }
}
static inline void
sock_release(sock);
}
+static void
+xprt_socket_autoclose(void *args)
+{
+ struct rpc_xprt *xprt = (struct rpc_xprt *)args;
+
+ xprt_close(xprt);
+ xprt_release_write(xprt, NULL);
+}
+
/*
* Mark a transport as disconnected
*/
spin_unlock_bh(&xprt->sock_lock);
}
+/*
+ * Used to allow disconnection when we've been idle
+ */
+static void
+xprt_init_autodisconnect(unsigned long data)
+{
+ struct rpc_xprt *xprt = (struct rpc_xprt *)data;
+
+ spin_lock(&xprt->sock_lock);
+ if (!list_empty(&xprt->recv) || xprt->shutdown)
+ goto out_abort;
+ if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
+ goto out_abort;
+ spin_unlock(&xprt->sock_lock);
+ /* Let keventd close the socket */
+ schedule_work(&xprt->task_cleanup);
+ return;
+out_abort:
+ spin_unlock(&xprt->sock_lock);
+}
+
/*
* Attempt to connect a TCP socket.
*
spin_lock(&xprt->xprt_lock);
do_xprt_reserve(task);
spin_unlock(&xprt->xprt_lock);
+ if (task->tk_rqstp)
+ del_timer_sync(&xprt->timer);
}
}
__xprt_put_cong(xprt, req);
if (!list_empty(&req->rq_list))
list_del(&req->rq_list);
+ xprt->last_used = jiffies;
+ if (list_empty(&xprt->recv) && !xprt->shutdown)
+ mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT);
spin_unlock_bh(&xprt->sock_lock);
task->tk_rqstp = NULL;
memset(req, 0, sizeof(*req)); /* mark unused */
init_waitqueue_head(&xprt->cong_wait);
INIT_LIST_HEAD(&xprt->recv);
+ INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
+ init_timer(&xprt->timer);
+ xprt->timer.function = xprt_init_autodisconnect;
+ xprt->timer.data = (unsigned long) xprt;
+ xprt->last_used = jiffies;
/* Set timeout parameters */
if (to) {
rpc_wake_up(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait))
wake_up(&xprt->cong_wait);
+ del_timer_sync(&xprt->timer);
}
/*