]> git.neil.brown.name Git - LaFS.git/commitdiff
Honour EmptyIndex during index lookup.
authorNeilBrown <neilb@suse.de>
Tue, 22 Jun 2010 05:32:05 +0000 (15:32 +1000)
committerNeilBrown <neilb@suse.de>
Tue, 22 Jun 2010 20:59:47 +0000 (06:59 +1000)
If we find an EmptyIndex that isn't first in the parent, we must
choose an earlier block.
We must check EmptyIndex after getting the lock.
If we have to drop a lock to do IO, and the unlocked block has a
->parent pointer, then we need to retry from the top.
'next' needs special care as it is could point to an
EmptyIndex block, so it is possible for leafs earlier in the tree to
have higher fileaddr (unlikely but possible).

Signed-off-by: NeilBrown <neilb@suse.de>
index.c

diff --git a/index.c b/index.c
index 4483ec1ed0ceec250c16d80951e36d519db429b5..49c2c789c333c1929dc44aeaa5e3c8c07addff07 100644 (file)
--- a/index.c
+++ b/index.c
@@ -1309,10 +1309,19 @@ index_lookup(void *bf, int len, u32 target, u32 *addrp, u32 *nextp)
        cp += lo*10;
        p = decode48(cp);
        addr = decode32(cp);
-       *addrp = addr;
+
+       if (p == 0)
+               /* Nothing in this block */
+               return 0;
+
+       if (addr > target) {
+               /* target is before first address */
+               p = 0;
+               cp -= 10;
+       } else
+               *addrp = addr;
 
        if (cp+10 <= (unsigned char *) buf + len && nextp) {
-               /* FIXME what about non-dense blocks? */
                u64 p2 = decode48(cp);
                u32 nxt = decode32(cp);
                if (p2)
@@ -1332,23 +1341,51 @@ int lafs_index_empty(struct indexblock *ib)
        if (test_bit(B_InoIdx, &ib->b.flags))
                phys = index_lookup(buf + li->metadata_size,
                                    blocksize - li->metadata_size,
-                                   ib->b.fileaddr, &addr, NULL);
+                                   0xFFFFFFFF, &addr, NULL);
        else
                phys = index_lookup(buf, blocksize,
-                                   ib->b.fileaddr, &addr, NULL);
+                                   0xFFFFFFFF, &addr, NULL);
        unmap_iblock(ib, buf);
 
        return phys == 0;
 }
 
+static struct indexblock *find_better(struct inode *ino,
+                                     struct indexblock *ib,
+                                     u32 addr, u32 *next, REFARG)
+{
+       struct indexblock *rv = ib;
+       spin_lock(&ino->i_data.private_lock);
+       list_for_each_entry_continue(ib, &ib->b.parent->children, b.siblings) {
+               if (!test_bit(B_PrimaryRef, &ib->b.flags))
+                       break;
+               if (test_bit(B_EmptyIndex, &ib->b.flags))
+                       continue;
+               if (ib->b.fileaddr > addr) {
+                       if (next && *next > ib->b.fileaddr)
+                               *next = ib->b.fileaddr;
+                       break;
+               }
+               /* This appears to be better. */
+               rv = ib;
+       }
+       getref_locked(&rv->b, REF);
+       spin_unlock(&ino->i_data.private_lock);
+       return rv;
+}
+
 struct indexblock *
 lafs_leaf_find(struct inode *inode, u32 addr, int adopt, u32 *next,
               int async, REFARG)
 {
        /* Find the leaf index block which refers to this address (if any do)
         * Returns the leaf IOLocked.
+        * *next will be set to an address that is higher than addr such
+        * that there are no other leafs before *next.  There might not
+        * actually be a leaf at *next though.
         */
        struct indexblock *ib, *ib2;
+       struct indexblock *hold = NULL;
        u64 iphys;
        u32 iaddr = 0;
        void *buf;
@@ -1361,80 +1398,60 @@ lafs_leaf_find(struct inode *inode, u32 addr, int adopt, u32 *next,
        BUG_ON(li->dblock == NULL);
        BUG_ON(atomic_read(&li->dblock->b.refcnt) == 0);
 
- retry:
+       /* We come back to restart if we needed to unlock to
+        * perform IO and we cannot be sure the parent didn't change.
+        * We hold the last seen index block in 'hold' so that if
+        * no changes happened (the common case) we are certain
+        * that no IO will be required to get back to where
+        * we were.
+        */
+ restart:
        if (next)
                *next = 0xFFFFFFFF;
 
        ib = lafs_make_iblock(inode, adopt, async, REF);
+       err = PTR_ERR(ib);
        if (IS_ERR(ib))
-               return ib;
+               goto err;
        offset = li->metadata_size;
 
        err = -EAGAIN;
        if (!async)
                lafs_iolock_block(&ib->b);
        else if (!lafs_iolock_block_async(&ib->b))
-                goto err;
-               
+                goto err_ib;
+
        while (ib->depth > 1) {
                /* internal index block */
-               /* ALERT:  If the index tree has only just grown, then
-                * this block could be uninitialised - all child blocks
-                * can only be found in the indexing tree.
-                * This actually works as iaddr will be the same as
-                * addr, and iphys will be zero
-                * On the other hand, if we are truncating we might
-                * think there is something here when there isn't.
-                * So we don't use lafs_leaf_find during truncation.
+               /* NOTE:  index block could be empty, or could have
+                * nothing as early as 'addr'.  In that case '0' is
+                * returned implying that the block is either in
+                * cache, or needs to be synthesised as an empty
+                * index block (one level down).
                 */
+               struct indexblock *better;
+               u32 target = addr;
+
+       retry:
                buf = map_iblock(ib);
+               iaddr = ib->b.fileaddr;
                iphys = index_lookup(buf + offset, blocksize - offset,
-                                    addr, &iaddr, next);
+                                    target, &iaddr,
+                                    target == addr ? next : NULL);
                unmap_iblock(ib, buf);
 
                ib2 = lafs_iblock_get(inode, iaddr, ib->depth-1, iphys, REF);
 
                if (!ib2) {
-                       /* if iphys==0, then we were expecting to find a
-                        * cached index block.  If we didn't, something is
-                        * wrong.
-                        */
-                       BUG_ON(iphys==0);
                        err = -ENOMEM;
                        lafs_iounlock_block(&ib->b);
-                       goto err;
-               }
-               if (ib2->b.physaddr != iphys &&
-                   !test_bit(B_Uninc, &ib2->b.flags)) {
-                       dprintk("WARN %llu != %llu\n",
-                               ib2->b.physaddr, iphys);
+                       goto err_ib;
                }
 
-               if (test_bit(B_PhysValid, &ib2->b.flags) &&
-                   ib2->b.physaddr == 0) {
-                       /* this block has been invalidated, we must be
-                        * racing with some sort of truncate.
-                        * try again from the top.
-                        * FIXME should wait for something here.
-                        */
-                       putiref(ib2, REF);
-                       lafs_iounlock_block(&ib->b);
-                       down_read(&ib->b.inode->i_alloc_sem);
-                       /* ib2 should have been dealt with by now */
-                       up_read(&ib->b.inode->i_alloc_sem);
-                       putiref(ib, REF);
-                       goto retry;
-               }
-#ifdef FIXME
-               if (!(ib2->b.flags & B_Linked)) {
-                       struct block *b2 = peer_find(fs, inode->filesys,
-                                                    inode, iaddr,
-                                                    depth, iphys);
-                       if (b2)
-                               list_add(&ib2->peers, &b2->peers);
-                       set_bit(B_Linked, &ib2->b.flags);
-               }
-#endif
+               better = find_better(inode, ib2, addr, next, REF);
+               putiref(ib2, REF);
+               ib2 = better;
+               
                if (!test_bit(B_Valid, &ib2->b.flags)) {
                        lafs_iounlock_block(&ib->b);
                        err = lafs_load_block(&ib2->b);
@@ -1446,55 +1463,61 @@ lafs_leaf_find(struct inode *inode, u32 addr, int adopt, u32 *next,
                                err = lafs_wait_block(&ib2->b);
                        if (err)
                                goto err_ib2;
+
                        err = -EAGAIN;
                        if (!async)
                                lafs_iolock_block(&ib->b);
                        else if (!lafs_iolock_block_async(&ib->b))
                                goto err_ib2;
+
+                       /* While we did IO, the parent could have been
+                        * split, so it is now the wrong parent, or it
+                        * could have become EmptyIndex, though that is
+                        * much less likely.  If it did, then it must
+                        * have had a parent, so the ref we hold means
+                        * it still has a parent.
+                        * So if ib->b.parent, then we might need to 
+                        * retry from the top, and holding a ref on ib
+                        * means that we don't risk live-locking if
+                        * memory for index blocks is very tight.
+                        * If there is no parent, then there is 
+                        * no risk that ib changed.
+                        */
+                       if (ib->b.parent) {
+                               if (hold)
+                                       putiref(hold, MKREF(hold));
+                               hold = getiref(ib, MKREF(hold));
+                               putiref(ib2, REF);
+                               putiref(ib, REF);
+                               goto restart;
+                       }
                }
+
                err = -EAGAIN;
                if (!async)
                        lafs_iolock_block(&ib2->b);
                else if (!lafs_iolock_block_async(&ib2->b))
                        goto err_ib2;
 
-               /* This block might have been split, in which case
-                * we need to consider adjacent siblings.
-                */
-               spin_lock(&inode->i_data.private_lock);
-               while (ib2->b.parent) {
-                       struct block *nxt;
-                       if (ib2->b.siblings.next == &ib2->b.parent->children)
-                               break;
-                       nxt = list_entry(ib2->b.siblings.next, struct block,
-                                        siblings);
-                       if (!test_bit(B_PrimaryRef, &nxt->flags))
-                               break;
-                       if (nxt->fileaddr < ib2->b.fileaddr)
-                               break;
-                       if (nxt->fileaddr > addr &&
-                           (nxt->physaddr != 0 || test_bit(B_Valid,
-                                                           &nxt->flags))) {
-                               if (next && *next > nxt->fileaddr)
-                                       *next = nxt->fileaddr;
-                               break;
-                       }
-                       getref_locked(nxt, REF);
-                       spin_unlock(&inode->i_data.private_lock);
-                       if (!async)
-                               lafs_iolock_block(nxt);
-                       else if (!lafs_iolock_block_async(nxt)) {
-                               putref(nxt, REF);
-                               err = -EAGAIN;
-                               goto err_ib2;
-                       }
-
+               if (ib2->b.fileaddr != ib->b.fileaddr &&
+                   test_bit(B_EmptyIndex, &ib2->b.flags)) {
+                       /* need to look earlier in the parent */
+                       target = ib2->b.fileaddr - 1;
                        lafs_iounlock_block(&ib2->b);
                        putiref(ib2, REF);
-                       ib2 = iblk(nxt);
-                       spin_lock(&inode->i_data.private_lock);
+                       goto retry;
+               }
+
+#ifdef FIXME
+               if (!(ib2->b.flags & B_Linked)) {
+                       struct block *b2 = peer_find(fs, inode->filesys,
+                                                    inode, iaddr,
+                                                    depth, iphys);
+                       if (b2)
+                               list_add(&ib2->peers, &b2->peers);
+                       set_bit(B_Linked, &ib2->b.flags);
                }
-               spin_unlock(&inode->i_data.private_lock);
+#endif
                if (adopt)
                        block_adopt(&ib2->b, ib);
                lafs_iounlock_block(&ib->b);
@@ -1507,8 +1530,11 @@ lafs_leaf_find(struct inode *inode, u32 addr, int adopt, u32 *next,
 
  err_ib2:
        putiref(ib2, REF);
- err:
+ err_ib:
        putiref(ib, REF);
+err:
+       if (hold)
+               putiref(hold, MKREF(hold));
        return ERR_PTR(err);
 }
 
@@ -1528,7 +1554,7 @@ static int __lafs_find_next(struct inode *ino, loff_t *addrp)
        struct indexblock *leaf;
        struct fs *fs = fs_from_inode(ino);
        struct block *b;
-       u32 nexti, nextd;
+       u32 nexti, nextd, nextl;
        int rv = 0;
        int offset;
        char *buf;
@@ -1580,19 +1606,28 @@ static int __lafs_find_next(struct inode *ino, loff_t *addrp)
                        rv = 2;
                }
        }
-
+       /* If rv==2, nextd may not be high enough as it might
+        * be the address of an EmptyIndex block.  So only
+        * use it if nothing better is found.
+        */
+       if (rv == 2)
+               nextl = 0xFFFFFFFF;
+       else
+               nextl = nextd;
        list_for_each_entry(b, &leaf->children, siblings)
                if (b->fileaddr >= *addrp &&
-                   b->fileaddr <= nextd &&
+                   b->fileaddr <= nextl &&
                    test_bit(B_Valid, &b->flags)) {
-                       nextd = b->fileaddr;
+                       nextd = nextl = b->fileaddr;
                        rv = 1;
                }
 
        if (leaf->uninc_table.pending_cnt) {
                spin_lock(&leaf->b.inode->i_data.private_lock);
-               rv = table_find_first(&leaf->uninc_table, *addrp, &nextd)
-                       ?: rv;
+               if (table_find_first(&leaf->uninc_table, *addrp, &nextl)) {
+                       nextd = nextl;
+                       rv = 1;
+               }
                spin_unlock(&leaf->b.inode->i_data.private_lock);
        }
        lafs_iounlock_block(&leaf->b);