From: NeilBrown Date: Fri, 13 Aug 2010 11:48:36 +0000 (+1000) Subject: Separate thread management from the cleaning. X-Git-Url: http://git.neil.brown.name/?a=commitdiff_plain;h=7bc09a2943b80104da4e061ccc6e38c5c9b466cf;p=LaFS.git Separate thread management from the cleaning. The thread does a lot more than just 'clean' so don't call it the 'cleaner' any more - just the 'thread' or 'lafsd'. Signed-off-by: NeilBrown --- diff --git a/Makefile b/Makefile index 11c5ad1..6b40ab7 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ obj-m += lafs.o lafs-y := super.o io.o roll.o dir.o inode.o index.o block.o file.o link.o dir-avl.o \ snapshot.o quota.o summary.o modify.o checkpoint.o cluster.o orphan.o \ - segments.o clean.o + segments.o clean.o thread.o else diff --git a/checkpoint.c b/checkpoint.c index 5c2dd61..baf4e23 100644 --- a/checkpoint.c +++ b/checkpoint.c @@ -509,7 +509,7 @@ static void finish_checkpoint(struct fs *fs, int youth) fs->checkpoint_youth = youth; fs->newblocks = 0; - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); } unsigned long lafs_do_checkpoint(struct fs *fs) @@ -556,7 +556,7 @@ unsigned long long lafs_checkpoint_start(struct fs *fs) * space to check again */ clear_bit(CleanerBlocks, &fs->fsstate); fs->prime_sb->s_dirt = 0; - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); return cp; } diff --git a/clean.c b/clean.c index b140737..653f3d2 100644 --- a/clean.c +++ b/clean.c @@ -7,168 +7,6 @@ */ #include "lafs.h" -#include - -/* - * This is the cleaner thread that runs whenever the filesystem is - * mounted writable. - * It does a lot more than clean, though that is what it does most of. - * Where possible, the thread does not block on IO, though it might - * block on memory allocation. - * Some tasks need to read in data from disk to complete. These just - * schedule the read and signal that they should be re-tried later - * when the read might have completed. - * - * Such reads are marked as async and the 'struct fs' tracks how many - * async reads are pending. Tasks are retried when this number gets low. - * - * The particular tasks are: - * Cleaning. This goes through stages. - * choose a segment (or a collection of segments) - * Read the write-cluster header for that segment (as there can be - * multiple write clusters, we might come back here several times) - * Follow the indexes to all blocks that could credibly be in that cluster - * and load the block if it is found. - * As blocks are found, schedule them for the cleaner cluster. - * Occasionally flush the cleaner cluster. - * - * Orphan handling - * Orphans are kept on 2 lists using the datablock.orphans list. - * - orphans that can be processed now - * - orphans that can be processed after some async IO has completed - * To process an orphan we call a handler based on the inode type. - * This can be TypeInodeFile (for truncates, unlinks) and - * TypeDirectory for directory cleaning. - * These will need to take an i_mutex. If they fail, they are put on the - * delayed list and will be retried after async IO completes, or a - * time has passed. - * - * Run a checkpoint - * This blocks any other tasks from running until the checkpoint - * finishes. It will block on writing out the clusters. - * Any cleaner-segment will be flushed first - * This is triggered on a sys_sync or each time a configurable number of - * segments has been written. In the later case we don't start the - * checkpoint until the segments currently being cleaned are finished - * with. - * - * Scan the segment usage files. - * This is a lazy scan which decays youth if needed, and looks for - * segments that should be cleaned or re-used. - * - * ?? Call cluster_flush if a cluster has been pending for a while - * This really shouldn't be needed.... - * - * - * Every time we wake up, we give every task a chance to do work. - * Each task is responsible for its own rate-limiting. - * Each task can return a wakeup time. We set a timeout to wake at the - * soonest of these. - * We may be woken sooner by another process requesting action. - */ - -static unsigned long do_clean(struct fs *fs); - -static int cleaner(void *data) -{ - struct fs *fs = data; - long timeout = MAX_SCHEDULE_TIMEOUT; - long to; - set_bit(CleanerNeeded, &fs->fsstate); - - while (!kthread_should_stop()) { - /* We need to wait INTERRUPTIBLE so that - * we don't add to the load-average. - * That means we need to be sure no signals are - * pending - */ - if (signal_pending(current)) - flush_signals(current); - - wait_event_interruptible_timeout - (fs->async_complete, - kthread_should_stop() || - test_bit(CleanerNeeded, &fs->fsstate), - timeout); - clear_bit(CleanerNeeded, &fs->fsstate); - - if (test_bit(FlushNeeded, &fs->fsstate) || - test_bit(SecondFlushNeeded, &fs->fsstate)) { - /* only push a flush now if it can happen - * immediately. - */ - struct wc *wc = &fs->wc[0]; - if (mutex_trylock(&wc->lock)) { - int can_flush = 1; - int which = (wc->pending_next + 3) % 4; - if (wc->pending_vfy_type[which] == VerifyNext && - atomic_read(&wc->pending_cnt[which]) > 1) - can_flush = 0; - which = (which + 3) % 4; - if (wc->pending_vfy_type[which] == VerifyNext2 && - atomic_read(&wc->pending_cnt[which]) > 1) - can_flush = 0; - mutex_unlock(&wc->lock); - if (can_flush) - lafs_cluster_flush(fs, 0); - } - } - - timeout = MAX_SCHEDULE_TIMEOUT; - to = lafs_do_checkpoint(fs); - if (to < timeout) - timeout = to; - - to = lafs_run_orphans(fs); - if (to < timeout) - timeout = to; - - to = lafs_scan_seg(fs); - if (to < timeout) - timeout = to; - - to = do_clean(fs); - if (to < timeout) - timeout = to; - - lafs_clusters_done(fs); - cond_resched(); - } - return 0; -} - -int lafs_start_cleaner(struct fs *fs) -{ - if (test_and_set_bit(CleanerRunning, &fs->fsstate)) - return 0; /* already running */ - - fs->cleaner_thread = kthread_run(cleaner, fs, "lafs_cleaner"); - if (fs->cleaner_thread == NULL) - clear_bit(CleanerRunning, &fs->fsstate); - return fs->cleaner_thread ? 0 : -ENOMEM; -} - -void lafs_stop_cleaner(struct fs *fs) -{ - if (fs->cleaner_thread) - kthread_stop(fs->cleaner_thread); - fs->cleaner_thread = NULL; -} - -void lafs_wake_cleaner(struct fs *fs) -{ - set_bit(CleanerNeeded, &fs->fsstate); - wake_up(&fs->async_complete); -} - -void lafs_trigger_flush(struct block *b) -{ - struct fs *fs = fs_from_inode(b->inode); - - if (test_bit(B_Writeback, &b->flags) && - !test_and_set_bit(FlushNeeded, &fs->fsstate)) - lafs_wake_cleaner(fs); -} static int mark_cleaning(struct block *b) { @@ -367,7 +205,7 @@ static int try_clean(struct fs *fs, struct toclean *tc) tc->ss = 0; } tc->ch = NULL; - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); break; } if (((((char *)tc->desc) - (char *)tc->gh)+3)/4 @@ -566,14 +404,14 @@ void lafs_unclean(struct datablock *db) iput(db->b.inode); if (test_and_clear_bit(B_Async, &db->b.flags)) { putdref(db, MKREF(async)); - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); } } mutex_unlock(&fs->cleaner.lock); } } -static unsigned long do_clean(struct fs *fs) +unsigned long lafs_do_clean(struct fs *fs) { /* * If the cleaner is inactive, we need to decide whether to diff --git a/cluster.c b/cluster.c index cfd70ea..bf46087 100644 --- a/cluster.c +++ b/cluster.c @@ -1439,7 +1439,7 @@ static void cluster_end_io(struct bio *bio, int err, wake_up(&wc->pending_wait); if (test_bit(FlushNeeded, &fs->fsstate) || test_bit(SecondFlushNeeded, &fs->fsstate)) - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); } } diff --git a/file.c b/file.c index 3d763ce..0da3335 100644 --- a/file.c +++ b/file.c @@ -429,7 +429,7 @@ static void lafs_sync_page(struct page *page) set_bit(FlushNeeded, &fs->fsstate); else if (want_flush == 1) set_bit(SecondFlushNeeded, &fs->fsstate); - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); } static int diff --git a/inode.c b/inode.c index f47f99e..e6ffbd4 100644 --- a/inode.c +++ b/inode.c @@ -188,7 +188,7 @@ lafs_iget(struct super_block *sb, ino_t inum, int async) out: if (b && test_and_clear_bit(B_Async, &b->b.flags)) { putdref(b, MKREF(async)); - lafs_wake_cleaner(fs_from_sb(sb)); + lafs_wake_thread(fs_from_sb(sb)); } putdref(b, MKREF(iget)); return ino; diff --git a/io.c b/io.c index 5fafb99..138cd98 100644 --- a/io.c +++ b/io.c @@ -101,7 +101,7 @@ bi_async_complete(struct bio *bio, int error) else ac->state = 4; bio_put(bio); - lafs_wake_cleaner(ac->fs); + lafs_wake_thread(ac->fs); } static void @@ -243,7 +243,7 @@ lafs_iounlock_block(struct block *b) lafs_io_wake(b); if (test_bit(B_Async, &b->flags)) - lafs_wake_cleaner(fs_from_inode(b->inode)); + lafs_wake_thread(fs_from_inode(b->inode)); } void lafs_writeback_done(struct block *b) @@ -256,7 +256,7 @@ void lafs_writeback_done(struct block *b) clear_bit(B_Writeback, &b->flags); lafs_io_wake(b); if (test_bit(B_Async, &b->flags)) - lafs_wake_cleaner(fs_from_inode(b->inode)); + lafs_wake_thread(fs_from_inode(b->inode)); } else lafs_iocheck_writeback(dblk(b), 1); } @@ -327,7 +327,7 @@ void lafs_iocheck_writeback(struct datablock *db, int unlock) if (unlock) { lafs_io_wake(&db->b); if (test_bit(B_Async, &db->b.flags)) - lafs_wake_cleaner(fs_from_inode(db->b.inode)); + lafs_wake_thread(fs_from_inode(db->b.inode)); } } diff --git a/lafs.h b/lafs.h index 3542a1a..7ccef89 100644 --- a/lafs.h +++ b/lafs.h @@ -693,10 +693,13 @@ unsigned long lafs_scan_seg(struct fs *fs); int lafs_clean_count(struct fs *fs); /* Cleaner */ -int lafs_start_cleaner(struct fs *fs); -void lafs_stop_cleaner(struct fs *fs); -void lafs_wake_cleaner(struct fs *fs); +unsigned long lafs_do_clean(struct fs *fs); void lafs_unclean(struct datablock *db); + +/* Thread management */ +int lafs_start_thread(struct fs *fs); +void lafs_stop_thread(struct fs *fs); +void lafs_wake_thread(struct fs *fs); void lafs_trigger_flush(struct block *b); /* cluster.c */ diff --git a/orphan.c b/orphan.c index 9eada14..b514c1c 100644 --- a/orphan.c +++ b/orphan.c @@ -585,7 +585,7 @@ void lafs_add_orphan(struct fs *fs, struct datablock *db) getdref(db, MKREF(orphan_list)); } spin_unlock(&fs->lock); - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); } void lafs_orphan_forget(struct fs *fs, struct datablock *db) diff --git a/segments.c b/segments.c index e878339..66c7c0f 100644 --- a/segments.c +++ b/segments.c @@ -658,7 +658,7 @@ int lafs_space_alloc(struct fs *fs, int credits, int why) fs->cleaner.need > watermark + fs->max_segment) { fs->cleaner.need = watermark + fs->max_segment; set_bit(CleanerBlocks, &fs->fsstate); - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); } } else if (why == NewSpace) clear_bit(EmergencyClean, &fs->fsstate); @@ -2005,5 +2005,5 @@ void lafs_dump_usage(void) return; dfs->scan.trace = 1; dfs->scan.done = 0; - lafs_wake_cleaner(dfs); + lafs_wake_thread(dfs); } diff --git a/state.h b/state.h index 806716b..0489fbe 100644 --- a/state.h +++ b/state.h @@ -85,8 +85,8 @@ struct fs { int rolled; /* set when rollforward has completed */ unsigned long fsstate; #define CheckpointNeeded 0 -#define CleanerRunning 1 -#define CleanerNeeded 2 +#define ThreadRunning 1 +#define ThreadNeeded 2 #define FinalCheckpoint 3 #define CleanerDisabled 4 #define OrphansRunning 5 @@ -139,7 +139,7 @@ struct fs { struct page *chead; } seg[4]; } cleaner; - struct task_struct *cleaner_thread; + struct task_struct *thread; unsigned long newblocks; /* number of blocks written since checkpoint * FIXME this should probably be a count diff --git a/super.c b/super.c index 4eddc61..7faf067 100644 --- a/super.c +++ b/super.c @@ -757,7 +757,7 @@ static void lafs_kill_sb(struct super_block *sb) BUG_ON(!list_empty(&fs->clean_leafs)); flush_scheduled_work(); - lafs_stop_cleaner(fs); + lafs_stop_thread(fs); kfree(fs->state); kfree(fs->ss); @@ -906,7 +906,7 @@ lafs_get_sb(struct file_system_type *fs_type, * filesystem */ err = lafs_mount(fs); if (err == 0) - err = lafs_start_cleaner(fs); + err = lafs_start_thread(fs); if (err) deactivate_locked_super(fs->prime_sb); else { @@ -1325,7 +1325,7 @@ static void lafs_drop_inode(struct inode *inode) generic_drop_inode(inode); if (db && test_bit(B_Async, &db->b.flags)) - lafs_wake_cleaner(fs); + lafs_wake_thread(fs); if (db) putdref(db, MKREF(drop)); } diff --git a/thread.c b/thread.c new file mode 100644 index 0000000..6f89119 --- /dev/null +++ b/thread.c @@ -0,0 +1,171 @@ + +/* + * fs/lafs/thread.c + * Copyright (C) 2005-2010 + * Neil Brown + * Released under the GPL, version 2 + */ + +#include "lafs.h" +#include + + +/* + * This is the management thread that runs whenever the filesystem is + * mounted writable. + * It does a lot more than clean, though that is what it does most of. + * Where possible, the thread does not block on IO, though it might + * block on memory allocation. + * Some tasks need to read in data from disk to complete. These just + * schedule the read and signal that they should be re-tried later + * when the read might have completed. + * + * Such reads are marked as async and the 'struct fs' tracks how many + * async reads are pending. Tasks are retried when this number gets low. + * + * The particular tasks are: + * Cleaning. This goes through stages. + * choose a segment (or a collection of segments) + * Read the write-cluster header for that segment (as there can be + * multiple write clusters, we might come back here several times) + * Follow the indexes to all blocks that could credibly be in that cluster + * and load the block if it is found. + * As blocks are found, schedule them for the cleaner cluster. + * Occasionally flush the cleaner cluster. + * + * Orphan handling + * Orphans are kept on 2 lists using the datablock.orphans list. + * - orphans that can be processed now + * - orphans that can be processed after some async IO has completed + * To process an orphan we call a handler based on the inode type. + * This can be TypeInodeFile (for truncates, unlinks) and + * TypeDirectory for directory cleaning. + * These will need to take an i_mutex. If they fail, they are put on the + * delayed list and will be retried after async IO completes, or a + * time has passed. + * + * Run a checkpoint + * This blocks any other tasks from running until the checkpoint + * finishes. It will block on writing out the clusters. + * Any cleaner-segment will be flushed first + * This is triggered on a sys_sync or each time a configurable number of + * segments has been written. In the later case we don't start the + * checkpoint until the segments currently being cleaned are finished + * with. + * + * Scan the segment usage files. + * This is a lazy scan which decays youth if needed, and looks for + * segments that should be cleaned or re-used. + * + * ?? Call cluster_flush if a cluster has been pending for a while + * This really shouldn't be needed.... + * + * + * Every time we wake up, we give every task a chance to do work. + * Each task is responsible for its own rate-limiting. + * Each task can return a wakeup time. We set a timeout to wake at the + * soonest of these. + * We may be woken sooner by another process requesting action. + */ + + +static int lafsd(void *data) +{ + struct fs *fs = data; + long timeout = MAX_SCHEDULE_TIMEOUT; + long to; + set_bit(ThreadNeeded, &fs->fsstate); + + while (!kthread_should_stop()) { + /* We need to wait INTERRUPTIBLE so that + * we don't add to the load-average. + * That means we need to be sure no signals are + * pending + */ + if (signal_pending(current)) + flush_signals(current); + + wait_event_interruptible_timeout + (fs->async_complete, + kthread_should_stop() || + test_bit(ThreadNeeded, &fs->fsstate), + timeout); + clear_bit(ThreadNeeded, &fs->fsstate); + + if (test_bit(FlushNeeded, &fs->fsstate) || + test_bit(SecondFlushNeeded, &fs->fsstate)) { + /* only push a flush now if it can happen + * immediately. + */ + struct wc *wc = &fs->wc[0]; + if (mutex_trylock(&wc->lock)) { + int can_flush = 1; + int which = (wc->pending_next + 3) % 4; + if (wc->pending_vfy_type[which] == VerifyNext && + atomic_read(&wc->pending_cnt[which]) > 1) + can_flush = 0; + which = (which + 3) % 4; + if (wc->pending_vfy_type[which] == VerifyNext2 && + atomic_read(&wc->pending_cnt[which]) > 1) + can_flush = 0; + mutex_unlock(&wc->lock); + if (can_flush) + lafs_cluster_flush(fs, 0); + } + } + + timeout = MAX_SCHEDULE_TIMEOUT; + to = lafs_do_checkpoint(fs); + if (to < timeout) + timeout = to; + + to = lafs_run_orphans(fs); + if (to < timeout) + timeout = to; + + to = lafs_scan_seg(fs); + if (to < timeout) + timeout = to; + + to = lafs_do_clean(fs); + if (to < timeout) + timeout = to; + + lafs_clusters_done(fs); + cond_resched(); + } + return 0; +} + +int lafs_start_thread(struct fs *fs) +{ + if (test_and_set_bit(ThreadRunning, &fs->fsstate)) + return 0; /* already running */ + + fs->thread = kthread_run(lafsd, fs, "lafsd-%d", fs->prime_sb->s_dev); + if (fs->thread == NULL) + clear_bit(ThreadRunning, &fs->fsstate); + return fs->thread ? 0 : -ENOMEM; +} + +void lafs_stop_thread(struct fs *fs) +{ + if (fs->thread) + kthread_stop(fs->thread); + fs->thread = NULL; +} + +void lafs_wake_thread(struct fs *fs) +{ + set_bit(ThreadNeeded, &fs->fsstate); + wake_up(&fs->async_complete); +} + +void lafs_trigger_flush(struct block *b) +{ + struct fs *fs = fs_from_inode(b->inode); + + if (test_bit(B_Writeback, &b->flags) && + !test_and_set_bit(FlushNeeded, &fs->fsstate)) + lafs_wake_thread(fs); +}