From: NeilBrown Date: Wed, 2 Sep 2009 00:09:49 +0000 (+1000) Subject: Run orphans from the cleaner thread, not immediately. X-Git-Url: http://git.neil.brown.name/?a=commitdiff_plain;h=9714d9fed94da64b14eeb02c28f30b4437e59186;p=LaFS.git Run orphans from the cleaner thread, not immediately. This involves: making dir_handle_orphans and inode_handle_orphans non-blocking. removing lots of explicit calls to handle_orphans. --- diff --git a/README b/README index 60e283d..65aa005 100644 --- a/README +++ b/README @@ -3329,3 +3329,37 @@ FIXED orphans don't get cleaned up. It seems a 'create' fails and leaves and: it seems we sometimes try to clean the segment being written to. We must avoid that. + (long ago I wrote:: + FIXME When pin fails, we need to remove PinPending from everything!!! + and never followed up ... I wonder? + ) + +25Aug2009 + Orphan handling. + Every orphan block goes on a per-fs list and gets removed only + if the B_Orphan bit is clear. + There are two times when we want to expedite orphan handling. + 1/ on rmdir we need to know if the directory is really empty. + This requires that we expedite the orphan handling of all + blocks. As soon as we find a non-orphan, we can give up. + Then we need to make sure the index tree has collapsed. WE + can borrow that code from truncate. + + 2/ When writing past Trunc_next. We just pass the block to + special orphan handling. + + This requires that orphan handling is re-entrant. + For dir, that is protected by i_mutex, but rmdir needs to come + in under the radar. + For trunc, the iolock on the index blocks should be enough. + I wonder if IOLock can be used on dir as well... allowing + parallel orphan handling in the one dir even!!. + + We need to ensure exclusion of orphan handling, including: + - only one orphan handler at a time + - don't run orphan handler while still processing action + that makes it an orphan. + Maybe if we just use IOLock for that? Does that work? Maybe + but it gets messy for directories (on first attempt anyway). + For directories we can just use i_mutex. + Maybe i_mutex for files as well? diff --git a/dir.c b/dir.c index b15852a..ccaa33b 100644 --- a/dir.c +++ b/dir.c @@ -734,7 +734,6 @@ lafs_unlink(struct inode *dir, struct dentry *de) lafs_checkpoint_unlock(fs); lafs_dirty_inode(inode); lafs_inode_checkpin(inode); - lafs_dir_handle_orphans(dir); clear_bit(B_PinPending, &inodb->b.flags); putdref(inodb, MKREF(inode_update)); return 0; @@ -801,7 +800,6 @@ lafs_rmdir(struct inode *dir, struct dentry *de) lafs_inode_checkpin(inode); lafs_inode_checkpin(dir); lafs_checkpoint_unlock(fs); - lafs_dir_handle_orphans(dir); return 0; abort_unlock: lafs_checkpoint_unlock(fs); @@ -1178,7 +1176,6 @@ lafs_rename(struct inode *old_dir, struct dentry *old_dentry, } lafs_checkpoint_unlock(fs); - lafs_dir_handle_orphans(old_dir); return 0; abort_unlock: @@ -1225,50 +1222,33 @@ lafs_rename(struct inode *old_dir, struct dentry *old_dentry, * If ->root is 0, punch a hole */ -static int dir_handle_orphan(struct inode *dir, - struct datablock *b); - -/* Called with ino->i_sem down */ -void lafs_dir_handle_orphans(struct inode *ino) -{ - struct list_head tmp; - int all_failed = 0; - - while (!all_failed) { - all_failed = 1; - list_add(&tmp, &LAFSI(ino)->md.file.dirorphans); - list_del_init(&LAFSI(ino)->md.file.dirorphans); - - while (!list_empty(&tmp)) { - struct datablock *db; - db = list_entry(tmp.next, - struct datablock, orphans); - list_del_init(&db->orphans); - if (dir_handle_orphan(ino, db) == 0) - all_failed = 0; - } - } -} - -static int dir_handle_orphan(struct inode *dir, struct datablock *b) +void lafs_dir_handle_orphan(struct datablock *b) { + struct inode *dir = b->b.inode; struct fs *fs = fs_from_inode(dir); int bits = dir->i_blkbits-8; u32 seed = LAFSI(dir)->md.file.seed; - u32 hash = (b->b.fileaddr-1) & MaxDirHash; + u32 hash; char *buf, *buf2; struct datablock *b2; u8 piece, firstpiece; struct dir_ent de; int err; + /* FIXME if there was an IOerror, this should not be a BUG */ + LAFS_BUG(!test_bit(B_Valid, &b->b.flags), &b->b); dprintk("HANDLE ORPHAN h=%x %s\n", (unsigned)hash, strblk(&b->b)); - err = lafs_read_block(b); - if (err) - goto abort; + lafs_checkpoint_lock(fs); + + /* First test: Does a chain of deleted entries extend beyond + * the end of this block. i.e. is the last entry deleted. + * If so, look at the next block and see if the chain is still + * anchored, or if it can all be released. + */ buf = map_dblock(b); + hash = (b->b.fileaddr-1) & MaxDirHash; if (lafs_dir_find(buf, bits, seed, hash, &piece) && lafs_dir_extract(buf, bits, &de, piece, NULL)->target == 0) { loff_t bnum; @@ -1280,10 +1260,9 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) b2 = lafs_get_block(dir, bnum, NULL, GFP_KERNEL, MKREF(dir_orphan)); - err = -ENOMEM; if (!b2) goto abort; - err = lafs_read_block(b2); + err = lafs_read_block_async(b2); if (err) goto abortb2; @@ -1294,7 +1273,6 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) unmap_dblock(b2, buf2); putdref(b2, MKREF(dir_orphan)); err = lafs_pin_dblock(b); - // FIXME what about EAGAIN?? if (err) goto abort; buf = map_dblock(b); @@ -1316,6 +1294,10 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) buf = map_dblock(b); } + /* Second test: if we have an unanchored chain at the start + * of the block, then schedule orphan handling for previous block, + * and remove the unanchor. + */ lafs_dir_find(buf, bits, seed, 0, &firstpiece); hash = LAFSI(dir)->md.file.seed; if (firstpiece && @@ -1324,10 +1306,9 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) unmap_dblock(b, buf); b2 = lafs_get_block(dir, hash, NULL, GFP_KERNEL, MKREF(dir_orphan)); - err = -ENOMEM; if (!b2) goto abort; - err = lafs_read_block(b2); + err = lafs_read_block_async(b2); if (err) goto abortb2; @@ -1335,9 +1316,11 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) if (lafs_dir_find(buf2, bits, seed, (hash-1) & MaxDirHash, &piece) && lafs_dir_extract(buf2, bits, &de, piece, NULL)->target == 0) - lafs_make_orphan(fs, b2); + err = lafs_make_orphan_nb(fs, b2); unmap_dblock(b2, buf2); putdref(b2, MKREF(dir_orphan)); + if (err) + goto abort; buf = map_dblock(b); lafs_dir_del_ent(buf, bits, seed, hash); lafs_dirty_dblock(b); @@ -1347,18 +1330,19 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) loff_t bnum; unmap_dblock(b, buf); - lafs_erase_dblock(b); bnum = 1; - if (lafs_find_next(dir, &bnum) == 0) { + err = lafs_find_next(dir, &bnum); + if (err < 0) + goto abort; + if (err == 0) { if (b->b.fileaddr == 0) i_size_write(dir, 0); else { b2 = lafs_get_block(dir, 0, NULL, GFP_KERNEL, MKREF(dir_orphan)); - err = -ENOMEM; if (!b2) goto abort; - err = lafs_read_block(b2); + err = lafs_read_block_async(b2); if (err) goto abortb2; buf2 = map_dblock(b2); @@ -1369,21 +1353,19 @@ static int dir_handle_orphan(struct inode *dir, struct datablock *b) } lafs_dirty_inode(dir); } + lafs_erase_dblock(b); } else { unmap_dblock(b, buf); } lafs_checkpoint_unlock(fs); - putdref(b, MKREF(dirorphan)); lafs_orphan_release(fs, b); - return 0; + return; abortb2: putdref(b2, MKREF(dir_orphan)); abort: lafs_checkpoint_unlock(fs); - list_move(&b->orphans, &LAFSI(dir)->md.file.dirorphans); - return err; } /*-------------------------------------------------------------------- diff --git a/inode.c b/inode.c index 6b3a0ba..d6e916d 100644 --- a/inode.c +++ b/inode.c @@ -240,8 +240,6 @@ if (!ino->i_nlink) ino->i_nlink = 1; init_special_inode(ino, ino->i_mode, ino->i_rdev); break; } - - INIT_LIST_HEAD(&i->dirorphans); break; } } @@ -452,8 +450,6 @@ void lafs_clear_inode(struct inode *ino) } static int inode_map_free(struct fs *fs, struct inode *filesys, u32 inum); -static int lafs_inode_handle_orphan(struct datablock *b); -static void inode_handle_orphan_loop(struct inode *ino); void lafs_delete_inode(struct inode *ino) { @@ -491,24 +487,6 @@ void lafs_delete_inode(struct inode *ino) putdref(b, MKREF(delete_inode)); clear_inode(ino); - - inode_handle_orphan_loop(ino); -} - -static void inode_handle_orphan_loop(struct inode *ino) -{ - struct datablock *b = lafs_inode_dblock(ino, 0, MKREF(hol)); - - if (IS_ERR(b)) - return; - while (test_bit(B_Orphan, &b->b.flags) && - (test_bit(I_Trunc, &LAFSI(ino)->iflags) || - LAFSI(ino)->type == 0)) - switch (lafs_inode_handle_orphan(b)) { - case 2: /* wait for checkpoint to finish... */ - msleep(500); - } - putdref(b, MKREF(hol)); } static int prune(void *data, u32 addr, u64 paddr, int len) @@ -571,39 +549,29 @@ static int prune_some(void *data, u32 addr, u64 paddr, int len) return i; } -static int lafs_inode_handle_orphan(struct datablock *b) +void lafs_inode_handle_orphan(struct datablock *b) { struct indexblock *ib, *ib2; struct inode *ino = b->my_inode; struct fs *fs = fs_from_inode(ino); u32 trunc_next, next_trunc; - /* FIXME if the trunc has completed, we don't really - * need the iblock... - */ - ib = lafs_make_iblock(ino, 1, 0, MKREF(inode_handle_orphan)); - if (IS_ERR(ib)) - return PTR_ERR(ib); + restart: if (!test_bit(I_Trunc, &LAFSI(ino)->iflags)) { - if (ino->i_nlink) { - /* Not an orphan any more */ - lafs_orphan_release(fs, b); - } else { + if (ino->i_nlink == 0) { LAFS_BUG(LAFSI(ino)->type != 0, &b->b); - lafs_orphan_release(fs, b); - if (test_bit(B_Dirty, &ib->b.flags)) { - lafs_iolock_written(&ib->b); - lafs_cluster_allocate(&ib->b, 0); - } lafs_erase_dblock(b); clear_bit(I_Deleting, &LAFSI(ino)->iflags); } - putiref(ib, MKREF(inode_handle_orphan)); - /* nothing more to do */ - return 0; + lafs_orphan_release(fs, b); + return; } + ib = lafs_make_iblock(ino, 1, 0, MKREF(inode_handle_orphan)); + if (IS_ERR(ib)) + return; + /* Here is the guts of 'truncate'. We find the next leaf index * block and discard all the addresses there-in. */ @@ -629,11 +597,11 @@ static int lafs_inode_handle_orphan(struct datablock *b) clear_bit(I_Trunc, &LAFSI(ino)->iflags); wake_up(&fs->trunc_wait); putiref(ib, MKREF(inode_handle_orphan)); - return 1; /* Try again whenever */ + goto restart; } if (fs->checkpointing) { putiref(ib, MKREF(inode_handle_orphan)); - return 2; /* means "Try again after the checkpoint" */ + return; /* Try again after the checkpoint */ } /* FIXME I need some sort of lock to safely walk * down this list */ @@ -670,15 +638,26 @@ static int lafs_inode_handle_orphan(struct datablock *b) clear_bit(I_Trunc, &LAFSI(ino)->iflags); wake_up(&fs->trunc_wait); putiref(ib, MKREF(inode_handle_orphan)); - return 1; /* Try again whenever */ + goto restart; } + getiref(ib2, MKREF(inode_handle_orphan2)); /* ib2 is an index block beyond EOF with no * Pinned children. * Incorporating it should unpin it. */ - getiref(ib2, MKREF(inode_handle_orphan2)); - lafs_iolock_written(&ib2->b); + + /* lafs_iolock_written(&ib2->b); - without blocking */ + if (test_and_set_bit(B_IOLock, &ib2->b.flags)) { + putiref(ib2, MKREF(inode_handle_orphan2)); + goto out2; + } + set_iolock_info(&ib2->b); + if (test_bit(B_Writeback, &ib2->b.flags)) { + lafs_iounlock_block(&ib2->b); + putiref(ib2, MKREF(inode_handle_orphan2)); + goto out2; + } do lafs_incorporate(fs, ib2); while (ib2->uninc_table.pending_cnt || ib2->uninc); @@ -691,8 +670,6 @@ static int lafs_inode_handle_orphan(struct datablock *b) else lafs_iounlock_block(&ib2->b); - putiref(ib2, MKREF(inode_handle_orphan2)); - printk("."); if (!list_empty(&ib2->b.siblings)) { static int cnt = 10; printk("looping on %s\n", strblk(&ib2->b)); @@ -700,23 +677,29 @@ static int lafs_inode_handle_orphan(struct datablock *b) if (cnt < 0) BUG(); } + putiref(ib2, MKREF(inode_handle_orphan2)); if (ib->uninc) { - lafs_iolock_written(&ib->b); - while (ib->uninc) - lafs_incorporate(fs, ib); - lafs_iounlock_block(&ib->b); + if (!test_and_set_bit(B_IOLock, &ib->b.flags)) { + set_iolock_info(&ib->b); + if (!test_bit(B_Writeback, &ib->b.flags)) + while (ib->uninc) + lafs_incorporate(fs, ib); + lafs_iounlock_block(&ib->b); + } } + out2: putiref(ib, MKREF(inode_handle_orphan)); - return 1; /* Try again whenever */ + goto restart; } putiref(ib, MKREF(inode_handle_orphan)); ib = lafs_leaf_find(ino, trunc_next, 1, &next_trunc, - 0, MKREF(inode_handle_orphan3)); + 1, MKREF(inode_handle_orphan3)); if (IS_ERR(ib)) - BUG(); // FIXME what can I do here? + return; + /* Ok, trunc_next seems to refer to a block that exists. * We need to erase it.. * @@ -730,7 +713,7 @@ static int lafs_inode_handle_orphan(struct datablock *b) clear_bit(I_Trunc, &LAFSI(ino)->iflags); lafs_iounlock_block(&ib->b); putiref(ib, MKREF(inode_handle_orphan3)); - return 1; + goto restart; } lafs_checkpoint_lock(fs); @@ -738,7 +721,7 @@ static int lafs_inode_handle_orphan(struct datablock *b) lafs_iounlock_block(&ib->b); putiref(ib, MKREF(inode_handle_orphan3)); lafs_checkpoint_unlock(fs); - return 2; + return; } if (!test_bit(B_Valid, &ib->b.flags) && @@ -802,7 +785,6 @@ static int lafs_inode_handle_orphan(struct datablock *b) lafs_checkpoint_unlock(fs); putiref(ib, MKREF(inode_handle_orphan3)); - return 1; } void lafs_dirty_inode(struct inode *ino) @@ -1343,6 +1325,7 @@ lafs_new_inode(struct fs *fs, struct inode *dir, int type, int inum, int mode, abort: inode_map_new_abort(&imni); lafs_cluster_update_abort(&ui); + printk("After abort: %s\n", strblk(&imni.ib->b)); return ERR_PTR(err); } @@ -1447,8 +1430,6 @@ void lafs_truncate(struct inode *ino) ino->i_sb->s_blocksize - 1) >> ino->i_sb->s_blocksize_bits; putdref(db, MKREF(trunc)); - - inode_handle_orphan_loop(ino); } struct inode_operations lafs_special_ino_operations = { diff --git a/io.c b/io.c index 994c2ab..d2a08ed 100644 --- a/io.c +++ b/io.c @@ -225,7 +225,8 @@ lafs_iounlock_block(struct block *b) lafs_iocheck_block(dblk(b), 1); lafs_io_wake(b); - if (test_bit(B_Async, &b->flags)) + if (test_bit(B_Async, &b->flags) || + test_bit(B_Orphan, &b->flags)) lafs_wake_cleaner(fs_from_inode(b->inode)); } diff --git a/lafs.h b/lafs.h index e943073..a82e394 100644 --- a/lafs.h +++ b/lafs.h @@ -136,6 +136,7 @@ void lafs_dirty_inode(struct inode *ino); int lafs_lock_inode(struct inode *ino); void lafs_inode_fillblock(struct inode *ino); struct datablock *lafs_inode_dblock(struct inode *ino, int async, REFARG); +void lafs_inode_handle_orphan(struct datablock *b); struct datablock *lafs_get_block(struct inode *ino, unsigned long index, struct page *p, int gfp, REFARG); @@ -503,7 +504,7 @@ void lafs_dir_set_target(char *block, int psz, struct dir_ent *de, int pnum); int lafs_dir_empty(char *block); void lafs_dir_make_index(char *orig, char *new, int psz, u32 target); -void lafs_dir_handle_orphans(struct inode *ino); +void lafs_dir_handle_orphan(struct datablock *db); int lafs_release_page(struct page *page, gfp_t gfp_flags); void lafs_invalidate_page(struct page *page, unsigned long offset); @@ -582,7 +583,8 @@ struct block *lafs_get_flushable(struct fs *fs, int phase); int lafs_make_orphan(struct fs *fs, struct datablock *db); int lafs_make_orphan_nb(struct fs *fs, struct datablock *db); void lafs_orphan_release(struct fs *fs, struct datablock *b); -unsigned long lafs_run_orphans(struct fs *fs); +long lafs_run_orphans(struct fs *fs); +int lafs_drop_orphan(struct fs *fs, struct datablock *db); /* Segment.c */ int lafs_prealloc(struct block *b, int type); diff --git a/orphan.c b/orphan.c index ae4a5e5..3cbbf55 100644 --- a/orphan.c +++ b/orphan.c @@ -163,10 +163,12 @@ static void orphan_commit(struct fs *fs, struct datablock *b, struct datablock * /* Committed to being an orphan now */ b->orphan_slot = om->nextfree++; - getdref(b, MKREF(orphan_list)); spin_lock(&fs->lock); set_bit(B_Orphan, &b->b.flags); - list_add_tail(&b->orphans, &fs->pending_orphans); + if (list_empty(&b->orphans)) { + getdref(b, MKREF(orphan_list)); + list_add_tail(&b->orphans, &fs->pending_orphans); + } spin_unlock(&fs->lock); dprintk("%p->orphan_slot=%d (%lu,%lu,%lu) %s\n", b, b->orphan_slot, LAFSI(b->b.inode)->filesys->i_ino, @@ -427,11 +429,15 @@ void lafs_orphan_release(struct fs *fs, struct datablock *b) } om->nextfree--; om->reserved++; - spin_lock(&fs->lock); clear_bit(B_Orphan, &b->b.flags); - list_del_init(&b->orphans); - spin_unlock(&fs->lock); - putdref(b, MKREF(orphan_list)); + /* NOTE we don't remove the block from the orphan + * list yet. This allows lafs_run_orphans to keep + * track of the current location in the list. + * Any other called of handle_orphan must call + * drop_orphan afterwards. + * so don't: list_del_init(&b->orphans); + * and don't: putdref(b, MKREF(orphan_list)); + */ /* Now drop the reservation we just synthesised */ orphan_abort(fs); @@ -441,7 +447,91 @@ void lafs_orphan_release(struct fs *fs, struct datablock *b) lafs_checkpoint_unlock(fs); } -unsigned long lafs_run_orphans(struct fs *fs) +static struct mutex *orphan_mutex(struct datablock *db) +{ + switch (LAFSI(db->b.inode)->type) { + case TypeInodeFile: + if (db->my_inode == NULL) + return NULL; + return &db->my_inode->i_mutex; + case TypeDir: + return &db->b.inode->i_mutex; + default: + BUG(); + } +} + +long lafs_run_orphans(struct fs *fs) { + struct datablock *db, *next, *toput = NULL; + + spin_lock(&fs->lock); + list_for_each_entry_safe(db, next, &fs->pending_orphans, orphans) { + struct mutex *mx = orphan_mutex(db); + if (!mx || !mutex_trylock(mx)) + continue; + spin_unlock(&fs->lock); + + /* The 'orphan_list' reference cannot be dropped + * while we own the mutex, so our reference to db is safe + */ + + if (toput) { + putdref(toput, MKREF(orphan_list)); + toput = NULL; + } + switch(LAFSI(db->b.inode)->type) { + case TypeInodeFile: + lafs_inode_handle_orphan(db); + break; + case TypeDir: + lafs_dir_handle_orphan(db); + break; + } + + spin_lock(&fs->lock); + /* the 'next' entry might have been deleted, so + * get the current value. 'db' cannot have been removed + * from the list as we held the mutex. + */ + mutex_unlock(mx); + next = list_entry(db->orphans.next, struct datablock, + orphans); + if (!test_bit(B_Orphan, &db->b.flags)) { + list_del_init(&db->orphans); + toput = db; + } + } + spin_unlock(&fs->lock); + if (toput) + putdref(toput, MKREF(orphan_list)); + if (list_empty(&fs->pending_orphans)) + /* unmount might be waiting... */ + wake_up(&fs->async_complete); return MAX_SCHEDULE_TIMEOUT; } + +int lafs_drop_orphan(struct fs *fs, struct datablock *db) +{ + /* This block was an orphan but isn't any more. + * Remove it from the list. + * It must be IOlocked, and this call must not come + * from lafs_run_orphans. + */ + struct mutex *mx = orphan_mutex(db); + BUG_ON(mx && !mutex_is_locked(mx)); + if (test_bit(B_Orphan, &db->b.flags)) + return 0; + spin_lock(&fs->lock); + if (test_bit(B_Orphan, &db->b.flags) || + list_empty(&db->orphans)) { + /* Is an orphan again, or it is already removed */ + spin_unlock(&fs->lock); + return 0; + } else { + list_del_init(&db->orphans); + spin_unlock(&fs->lock); + putdref(db, MKREF(orphan_list)); + return 1; + } +} diff --git a/state.h b/state.h index 1016eec..d6240ed 100644 --- a/state.h +++ b/state.h @@ -481,7 +481,6 @@ struct lafs_inode { struct list_head free_index; - struct list_head orphans; /* on list of pending truncates */ loff_t trunc_next; /* next block to truncate from */ union { struct fs_md { @@ -523,8 +522,6 @@ struct lafs_inode { /* u32 linkcount; */ /* for directories */ u32 seed; - /* lists of blocks which need orphan-treatment */ - struct list_head dirorphans; } file; struct orphan_md { u32 nextfree; diff --git a/super.c b/super.c index 9c409b9..9a48b48 100644 --- a/super.c +++ b/super.c @@ -683,6 +683,8 @@ lafs_put_super(struct super_block *sb) /* This is the main sb, not a snapshot or * subordinate fs */ + wait_event(fs->async_complete, + list_empty(&fs->pending_orphans)); lafs_checkpoint_lock(fs); /* Don't incorporate any more segusage/quota updates. */ lafs_checkpoint_start(fs); @@ -901,7 +903,6 @@ static struct inode *lafs_alloc_inode(struct super_block *sb) li->update_cluster = 0; init_rwsem(&li->ind_sem); - INIT_LIST_HEAD(&li->orphans); INIT_LIST_HEAD(&li->free_index); return &li->vfs_inode;