and: it seems we sometimes try to clean the segment being written
to. We must avoid that.
+ (long ago I wrote::
+ FIXME When pin fails, we need to remove PinPending from everything!!!
+ and never followed up ... I wonder?
+ )
+
+25Aug2009
+ Orphan handling.
+ Every orphan block goes on a per-fs list and gets removed only
+ if the B_Orphan bit is clear.
+ There are two times when we want to expedite orphan handling.
+ 1/ on rmdir we need to know if the directory is really empty.
+ This requires that we expedite the orphan handling of all
+ blocks. As soon as we find a non-orphan, we can give up.
+ Then we need to make sure the index tree has collapsed. WE
+ can borrow that code from truncate.
+
+ 2/ When writing past Trunc_next. We just pass the block to
+ special orphan handling.
+
+ This requires that orphan handling is re-entrant.
+ For dir, that is protected by i_mutex, but rmdir needs to come
+ in under the radar.
+ For trunc, the iolock on the index blocks should be enough.
+ I wonder if IOLock can be used on dir as well... allowing
+ parallel orphan handling in the one dir even!!.
+
+ We need to ensure exclusion of orphan handling, including:
+ - only one orphan handler at a time
+ - don't run orphan handler while still processing action
+ that makes it an orphan.
+ Maybe if we just use IOLock for that? Does that work? Maybe
+ but it gets messy for directories (on first attempt anyway).
+ For directories we can just use i_mutex.
+ Maybe i_mutex for files as well?
lafs_checkpoint_unlock(fs);
lafs_dirty_inode(inode);
lafs_inode_checkpin(inode);
- lafs_dir_handle_orphans(dir);
clear_bit(B_PinPending, &inodb->b.flags);
putdref(inodb, MKREF(inode_update));
return 0;
lafs_inode_checkpin(inode);
lafs_inode_checkpin(dir);
lafs_checkpoint_unlock(fs);
- lafs_dir_handle_orphans(dir);
return 0;
abort_unlock:
lafs_checkpoint_unlock(fs);
}
lafs_checkpoint_unlock(fs);
- lafs_dir_handle_orphans(old_dir);
return 0;
abort_unlock:
* If ->root is 0, punch a hole
*/
-static int dir_handle_orphan(struct inode *dir,
- struct datablock *b);
-
-/* Called with ino->i_sem down */
-void lafs_dir_handle_orphans(struct inode *ino)
-{
- struct list_head tmp;
- int all_failed = 0;
-
- while (!all_failed) {
- all_failed = 1;
- list_add(&tmp, &LAFSI(ino)->md.file.dirorphans);
- list_del_init(&LAFSI(ino)->md.file.dirorphans);
-
- while (!list_empty(&tmp)) {
- struct datablock *db;
- db = list_entry(tmp.next,
- struct datablock, orphans);
- list_del_init(&db->orphans);
- if (dir_handle_orphan(ino, db) == 0)
- all_failed = 0;
- }
- }
-}
-
-static int dir_handle_orphan(struct inode *dir, struct datablock *b)
+void lafs_dir_handle_orphan(struct datablock *b)
{
+ struct inode *dir = b->b.inode;
struct fs *fs = fs_from_inode(dir);
int bits = dir->i_blkbits-8;
u32 seed = LAFSI(dir)->md.file.seed;
- u32 hash = (b->b.fileaddr-1) & MaxDirHash;
+ u32 hash;
char *buf, *buf2;
struct datablock *b2;
u8 piece, firstpiece;
struct dir_ent de;
int err;
+ /* FIXME if there was an IOerror, this should not be a BUG */
+ LAFS_BUG(!test_bit(B_Valid, &b->b.flags), &b->b);
dprintk("HANDLE ORPHAN h=%x %s\n", (unsigned)hash, strblk(&b->b));
- err = lafs_read_block(b);
- if (err)
- goto abort;
+
lafs_checkpoint_lock(fs);
+
+ /* First test: Does a chain of deleted entries extend beyond
+ * the end of this block. i.e. is the last entry deleted.
+ * If so, look at the next block and see if the chain is still
+ * anchored, or if it can all be released.
+ */
buf = map_dblock(b);
+ hash = (b->b.fileaddr-1) & MaxDirHash;
if (lafs_dir_find(buf, bits, seed, hash, &piece) &&
lafs_dir_extract(buf, bits, &de, piece, NULL)->target == 0) {
loff_t bnum;
b2 = lafs_get_block(dir, bnum, NULL, GFP_KERNEL,
MKREF(dir_orphan));
- err = -ENOMEM;
if (!b2)
goto abort;
- err = lafs_read_block(b2);
+ err = lafs_read_block_async(b2);
if (err)
goto abortb2;
unmap_dblock(b2, buf2);
putdref(b2, MKREF(dir_orphan));
err = lafs_pin_dblock(b);
- // FIXME what about EAGAIN??
if (err)
goto abort;
buf = map_dblock(b);
buf = map_dblock(b);
}
+ /* Second test: if we have an unanchored chain at the start
+ * of the block, then schedule orphan handling for previous block,
+ * and remove the unanchor.
+ */
lafs_dir_find(buf, bits, seed, 0, &firstpiece);
hash = LAFSI(dir)->md.file.seed;
if (firstpiece &&
unmap_dblock(b, buf);
b2 = lafs_get_block(dir, hash, NULL, GFP_KERNEL,
MKREF(dir_orphan));
- err = -ENOMEM;
if (!b2)
goto abort;
- err = lafs_read_block(b2);
+ err = lafs_read_block_async(b2);
if (err)
goto abortb2;
if (lafs_dir_find(buf2, bits, seed, (hash-1) & MaxDirHash,
&piece) &&
lafs_dir_extract(buf2, bits, &de, piece, NULL)->target == 0)
- lafs_make_orphan(fs, b2);
+ err = lafs_make_orphan_nb(fs, b2);
unmap_dblock(b2, buf2);
putdref(b2, MKREF(dir_orphan));
+ if (err)
+ goto abort;
buf = map_dblock(b);
lafs_dir_del_ent(buf, bits, seed, hash);
lafs_dirty_dblock(b);
loff_t bnum;
unmap_dblock(b, buf);
- lafs_erase_dblock(b);
bnum = 1;
- if (lafs_find_next(dir, &bnum) == 0) {
+ err = lafs_find_next(dir, &bnum);
+ if (err < 0)
+ goto abort;
+ if (err == 0) {
if (b->b.fileaddr == 0)
i_size_write(dir, 0);
else {
b2 = lafs_get_block(dir, 0, NULL, GFP_KERNEL,
MKREF(dir_orphan));
- err = -ENOMEM;
if (!b2)
goto abort;
- err = lafs_read_block(b2);
+ err = lafs_read_block_async(b2);
if (err)
goto abortb2;
buf2 = map_dblock(b2);
}
lafs_dirty_inode(dir);
}
+ lafs_erase_dblock(b);
} else {
unmap_dblock(b, buf);
}
lafs_checkpoint_unlock(fs);
- putdref(b, MKREF(dirorphan));
lafs_orphan_release(fs, b);
- return 0;
+ return;
abortb2:
putdref(b2, MKREF(dir_orphan));
abort:
lafs_checkpoint_unlock(fs);
- list_move(&b->orphans, &LAFSI(dir)->md.file.dirorphans);
- return err;
}
/*--------------------------------------------------------------------
init_special_inode(ino, ino->i_mode, ino->i_rdev);
break;
}
-
- INIT_LIST_HEAD(&i->dirorphans);
break;
}
}
}
static int inode_map_free(struct fs *fs, struct inode *filesys, u32 inum);
-static int lafs_inode_handle_orphan(struct datablock *b);
-static void inode_handle_orphan_loop(struct inode *ino);
void lafs_delete_inode(struct inode *ino)
{
putdref(b, MKREF(delete_inode));
clear_inode(ino);
-
- inode_handle_orphan_loop(ino);
-}
-
-static void inode_handle_orphan_loop(struct inode *ino)
-{
- struct datablock *b = lafs_inode_dblock(ino, 0, MKREF(hol));
-
- if (IS_ERR(b))
- return;
- while (test_bit(B_Orphan, &b->b.flags) &&
- (test_bit(I_Trunc, &LAFSI(ino)->iflags) ||
- LAFSI(ino)->type == 0))
- switch (lafs_inode_handle_orphan(b)) {
- case 2: /* wait for checkpoint to finish... */
- msleep(500);
- }
- putdref(b, MKREF(hol));
}
static int prune(void *data, u32 addr, u64 paddr, int len)
return i;
}
-static int lafs_inode_handle_orphan(struct datablock *b)
+void lafs_inode_handle_orphan(struct datablock *b)
{
struct indexblock *ib, *ib2;
struct inode *ino = b->my_inode;
struct fs *fs = fs_from_inode(ino);
u32 trunc_next, next_trunc;
- /* FIXME if the trunc has completed, we don't really
- * need the iblock...
- */
- ib = lafs_make_iblock(ino, 1, 0, MKREF(inode_handle_orphan));
- if (IS_ERR(ib))
- return PTR_ERR(ib);
+ restart:
if (!test_bit(I_Trunc, &LAFSI(ino)->iflags)) {
- if (ino->i_nlink) {
- /* Not an orphan any more */
- lafs_orphan_release(fs, b);
- } else {
+ if (ino->i_nlink == 0) {
LAFS_BUG(LAFSI(ino)->type != 0, &b->b);
- lafs_orphan_release(fs, b);
- if (test_bit(B_Dirty, &ib->b.flags)) {
- lafs_iolock_written(&ib->b);
- lafs_cluster_allocate(&ib->b, 0);
- }
lafs_erase_dblock(b);
clear_bit(I_Deleting, &LAFSI(ino)->iflags);
}
- putiref(ib, MKREF(inode_handle_orphan));
- /* nothing more to do */
- return 0;
+ lafs_orphan_release(fs, b);
+ return;
}
+ ib = lafs_make_iblock(ino, 1, 0, MKREF(inode_handle_orphan));
+ if (IS_ERR(ib))
+ return;
+
/* Here is the guts of 'truncate'. We find the next leaf index
* block and discard all the addresses there-in.
*/
clear_bit(I_Trunc, &LAFSI(ino)->iflags);
wake_up(&fs->trunc_wait);
putiref(ib, MKREF(inode_handle_orphan));
- return 1; /* Try again whenever */
+ goto restart;
}
if (fs->checkpointing) {
putiref(ib, MKREF(inode_handle_orphan));
- return 2; /* means "Try again after the checkpoint" */
+ return; /* Try again after the checkpoint */
}
/* FIXME I need some sort of lock to safely walk
* down this list */
clear_bit(I_Trunc, &LAFSI(ino)->iflags);
wake_up(&fs->trunc_wait);
putiref(ib, MKREF(inode_handle_orphan));
- return 1; /* Try again whenever */
+ goto restart;
}
+ getiref(ib2, MKREF(inode_handle_orphan2));
/* ib2 is an index block beyond EOF with no
* Pinned children.
* Incorporating it should unpin it.
*/
- getiref(ib2, MKREF(inode_handle_orphan2));
- lafs_iolock_written(&ib2->b);
+
+ /* lafs_iolock_written(&ib2->b); - without blocking */
+ if (test_and_set_bit(B_IOLock, &ib2->b.flags)) {
+ putiref(ib2, MKREF(inode_handle_orphan2));
+ goto out2;
+ }
+ set_iolock_info(&ib2->b);
+ if (test_bit(B_Writeback, &ib2->b.flags)) {
+ lafs_iounlock_block(&ib2->b);
+ putiref(ib2, MKREF(inode_handle_orphan2));
+ goto out2;
+ }
do
lafs_incorporate(fs, ib2);
while (ib2->uninc_table.pending_cnt || ib2->uninc);
else
lafs_iounlock_block(&ib2->b);
- putiref(ib2, MKREF(inode_handle_orphan2));
- printk(".");
if (!list_empty(&ib2->b.siblings)) {
static int cnt = 10;
printk("looping on %s\n", strblk(&ib2->b));
if (cnt < 0)
BUG();
}
+ putiref(ib2, MKREF(inode_handle_orphan2));
if (ib->uninc) {
- lafs_iolock_written(&ib->b);
- while (ib->uninc)
- lafs_incorporate(fs, ib);
- lafs_iounlock_block(&ib->b);
+ if (!test_and_set_bit(B_IOLock, &ib->b.flags)) {
+ set_iolock_info(&ib->b);
+ if (!test_bit(B_Writeback, &ib->b.flags))
+ while (ib->uninc)
+ lafs_incorporate(fs, ib);
+ lafs_iounlock_block(&ib->b);
+ }
}
+ out2:
putiref(ib, MKREF(inode_handle_orphan));
- return 1; /* Try again whenever */
+ goto restart;
}
putiref(ib, MKREF(inode_handle_orphan));
ib = lafs_leaf_find(ino, trunc_next, 1, &next_trunc,
- 0, MKREF(inode_handle_orphan3));
+ 1, MKREF(inode_handle_orphan3));
if (IS_ERR(ib))
- BUG(); // FIXME what can I do here?
+ return;
+
/* Ok, trunc_next seems to refer to a block that exists.
* We need to erase it..
*
clear_bit(I_Trunc, &LAFSI(ino)->iflags);
lafs_iounlock_block(&ib->b);
putiref(ib, MKREF(inode_handle_orphan3));
- return 1;
+ goto restart;
}
lafs_checkpoint_lock(fs);
lafs_iounlock_block(&ib->b);
putiref(ib, MKREF(inode_handle_orphan3));
lafs_checkpoint_unlock(fs);
- return 2;
+ return;
}
if (!test_bit(B_Valid, &ib->b.flags) &&
lafs_checkpoint_unlock(fs);
putiref(ib, MKREF(inode_handle_orphan3));
- return 1;
}
void lafs_dirty_inode(struct inode *ino)
abort:
inode_map_new_abort(&imni);
lafs_cluster_update_abort(&ui);
+ printk("After abort: %s\n", strblk(&imni.ib->b));
return ERR_PTR(err);
}
ino->i_sb->s_blocksize - 1)
>> ino->i_sb->s_blocksize_bits;
putdref(db, MKREF(trunc));
-
- inode_handle_orphan_loop(ino);
}
struct inode_operations lafs_special_ino_operations = {
lafs_iocheck_block(dblk(b), 1);
lafs_io_wake(b);
- if (test_bit(B_Async, &b->flags))
+ if (test_bit(B_Async, &b->flags) ||
+ test_bit(B_Orphan, &b->flags))
lafs_wake_cleaner(fs_from_inode(b->inode));
}
int lafs_lock_inode(struct inode *ino);
void lafs_inode_fillblock(struct inode *ino);
struct datablock *lafs_inode_dblock(struct inode *ino, int async, REFARG);
+void lafs_inode_handle_orphan(struct datablock *b);
struct datablock *lafs_get_block(struct inode *ino, unsigned long index,
struct page *p, int gfp, REFARG);
int lafs_dir_empty(char *block);
void lafs_dir_make_index(char *orig, char *new, int psz, u32 target);
-void lafs_dir_handle_orphans(struct inode *ino);
+void lafs_dir_handle_orphan(struct datablock *db);
int lafs_release_page(struct page *page, gfp_t gfp_flags);
void lafs_invalidate_page(struct page *page, unsigned long offset);
int lafs_make_orphan(struct fs *fs, struct datablock *db);
int lafs_make_orphan_nb(struct fs *fs, struct datablock *db);
void lafs_orphan_release(struct fs *fs, struct datablock *b);
-unsigned long lafs_run_orphans(struct fs *fs);
+long lafs_run_orphans(struct fs *fs);
+int lafs_drop_orphan(struct fs *fs, struct datablock *db);
/* Segment.c */
int lafs_prealloc(struct block *b, int type);
/* Committed to being an orphan now */
b->orphan_slot = om->nextfree++;
- getdref(b, MKREF(orphan_list));
spin_lock(&fs->lock);
set_bit(B_Orphan, &b->b.flags);
- list_add_tail(&b->orphans, &fs->pending_orphans);
+ if (list_empty(&b->orphans)) {
+ getdref(b, MKREF(orphan_list));
+ list_add_tail(&b->orphans, &fs->pending_orphans);
+ }
spin_unlock(&fs->lock);
dprintk("%p->orphan_slot=%d (%lu,%lu,%lu) %s\n", b, b->orphan_slot,
LAFSI(b->b.inode)->filesys->i_ino,
}
om->nextfree--;
om->reserved++;
- spin_lock(&fs->lock);
clear_bit(B_Orphan, &b->b.flags);
- list_del_init(&b->orphans);
- spin_unlock(&fs->lock);
- putdref(b, MKREF(orphan_list));
+ /* NOTE we don't remove the block from the orphan
+ * list yet. This allows lafs_run_orphans to keep
+ * track of the current location in the list.
+ * Any other called of handle_orphan must call
+ * drop_orphan afterwards.
+ * so don't: list_del_init(&b->orphans);
+ * and don't: putdref(b, MKREF(orphan_list));
+ */
/* Now drop the reservation we just synthesised */
orphan_abort(fs);
lafs_checkpoint_unlock(fs);
}
-unsigned long lafs_run_orphans(struct fs *fs)
+static struct mutex *orphan_mutex(struct datablock *db)
+{
+ switch (LAFSI(db->b.inode)->type) {
+ case TypeInodeFile:
+ if (db->my_inode == NULL)
+ return NULL;
+ return &db->my_inode->i_mutex;
+ case TypeDir:
+ return &db->b.inode->i_mutex;
+ default:
+ BUG();
+ }
+}
+
+long lafs_run_orphans(struct fs *fs)
{
+ struct datablock *db, *next, *toput = NULL;
+
+ spin_lock(&fs->lock);
+ list_for_each_entry_safe(db, next, &fs->pending_orphans, orphans) {
+ struct mutex *mx = orphan_mutex(db);
+ if (!mx || !mutex_trylock(mx))
+ continue;
+ spin_unlock(&fs->lock);
+
+ /* The 'orphan_list' reference cannot be dropped
+ * while we own the mutex, so our reference to db is safe
+ */
+
+ if (toput) {
+ putdref(toput, MKREF(orphan_list));
+ toput = NULL;
+ }
+ switch(LAFSI(db->b.inode)->type) {
+ case TypeInodeFile:
+ lafs_inode_handle_orphan(db);
+ break;
+ case TypeDir:
+ lafs_dir_handle_orphan(db);
+ break;
+ }
+
+ spin_lock(&fs->lock);
+ /* the 'next' entry might have been deleted, so
+ * get the current value. 'db' cannot have been removed
+ * from the list as we held the mutex.
+ */
+ mutex_unlock(mx);
+ next = list_entry(db->orphans.next, struct datablock,
+ orphans);
+ if (!test_bit(B_Orphan, &db->b.flags)) {
+ list_del_init(&db->orphans);
+ toput = db;
+ }
+ }
+ spin_unlock(&fs->lock);
+ if (toput)
+ putdref(toput, MKREF(orphan_list));
+ if (list_empty(&fs->pending_orphans))
+ /* unmount might be waiting... */
+ wake_up(&fs->async_complete);
return MAX_SCHEDULE_TIMEOUT;
}
+
+int lafs_drop_orphan(struct fs *fs, struct datablock *db)
+{
+ /* This block was an orphan but isn't any more.
+ * Remove it from the list.
+ * It must be IOlocked, and this call must not come
+ * from lafs_run_orphans.
+ */
+ struct mutex *mx = orphan_mutex(db);
+ BUG_ON(mx && !mutex_is_locked(mx));
+ if (test_bit(B_Orphan, &db->b.flags))
+ return 0;
+ spin_lock(&fs->lock);
+ if (test_bit(B_Orphan, &db->b.flags) ||
+ list_empty(&db->orphans)) {
+ /* Is an orphan again, or it is already removed */
+ spin_unlock(&fs->lock);
+ return 0;
+ } else {
+ list_del_init(&db->orphans);
+ spin_unlock(&fs->lock);
+ putdref(db, MKREF(orphan_list));
+ return 1;
+ }
+}
struct list_head free_index;
- struct list_head orphans; /* on list of pending truncates */
loff_t trunc_next; /* next block to truncate from */
union {
struct fs_md {
/* u32 linkcount; */
/* for directories */
u32 seed;
- /* lists of blocks which need orphan-treatment */
- struct list_head dirorphans;
} file;
struct orphan_md {
u32 nextfree;
/* This is the main sb, not a snapshot or
* subordinate fs
*/
+ wait_event(fs->async_complete,
+ list_empty(&fs->pending_orphans));
lafs_checkpoint_lock(fs);
/* Don't incorporate any more segusage/quota updates. */
lafs_checkpoint_start(fs);
li->update_cluster = 0;
init_rwsem(&li->ind_sem);
- INIT_LIST_HEAD(&li->orphans);
INIT_LIST_HEAD(&li->free_index);
return &li->vfs_inode;