diff -rN -u old-ticket1395/src/allmydata/immutable/checker.py new-ticket1395/src/allmydata/immutable/checker.py --- old-ticket1395/src/allmydata/immutable/checker.py 2011-06-17 00:16:49.018717504 -0600 +++ new-ticket1395/src/allmydata/immutable/checker.py 2011-06-17 00:16:49.188717504 -0600 @@ -617,14 +617,18 @@ # to free up the RAM return None def _get_blocks(vrbp): - ds = [] - for blocknum in range(veup.num_segments): + def _get_block(ign, blocknum): db = vrbp.get_block(blocknum) db.addCallback(_discard_result) - ds.append(db) - # this gatherResults will fire once every block of this share has - # been downloaded and verified, or else it will errback. - return deferredutil.gatherResults(ds) + return db + dbs = defer.succeed(None) + for blocknum in range(veup.num_segments): + dbs.addCallback(_get_block, blocknum) + # The Deferred we return will fire after every block of this + # share has been downloaded and verified successfully, or else it + # will errback as soon as the first error is observed. + return dbs + d.addCallback(_get_blocks) # if none of those errbacked, the blocks (and the hashes above them) diff -rN -u old-ticket1395/src/allmydata/test/test_checker.py new-ticket1395/src/allmydata/test/test_checker.py --- old-ticket1395/src/allmydata/test/test_checker.py 2011-06-17 00:16:49.061717504 -0600 +++ new-ticket1395/src/allmydata/test/test_checker.py 2011-06-17 00:16:49.202717504 -0600 @@ -1,4 +1,3 @@ - import simplejson from twisted.trial import unittest from allmydata import check_results, uri @@ -319,3 +318,73 @@ d.addCallback(lambda ign: self.failUnless(really_did_break)) return d + +class CounterHolder(object): + def __init__(self): + self._num_active_block_fetches = 0 + self._max_active_block_fetches = 0 + +from allmydata.immutable.checker import ValidatedReadBucketProxy +class MockVRBP(ValidatedReadBucketProxy): + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder): + ValidatedReadBucketProxy.__init__(self, sharenum, bucket, + share_hash_tree, num_blocks, + block_size, share_size) + self.counterholder = counterholder + + def get_block(self, blocknum): + self.counterholder._num_active_block_fetches += 1 + if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches: + self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches + d = ValidatedReadBucketProxy.get_block(self, blocknum) + def _mark_no_longer_active(res): + self.counterholder._num_active_block_fetches -= 1 + return res + d.addBoth(_mark_no_longer_active) + return d + +class TooParallel(GridTestMixin, unittest.TestCase): + # bug #1395: immutable verifier was aggressively parallized, checking all + # blocks of all shares at the same time, blowing our memory budget and + # crashing with MemoryErrors on >1GB files. + + def test_immutable(self): + import allmydata.immutable.checker + origVRBP = allmydata.immutable.checker.ValidatedReadBucketProxy + + self.basedir = "checker/TooParallel/immutable" + + # If any code asks to instantiate a ValidatedReadBucketProxy, + # we give them a MockVRBP which is configured to use our + # CounterHolder. + counterholder = CounterHolder() + def make_mock_VRBP(*args, **kwargs): + return MockVRBP(counterholder=counterholder, *args, **kwargs) + allmydata.immutable.checker.ValidatedReadBucketProxy = make_mock_VRBP + + self.set_up_grid(num_servers=4) + c0 = self.g.clients[0] + c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, + "happy": 4, + "n": 4, + "max_segment_size": 5, + } + self.uris = {} + DATA = "data" * 100 # 400/5 = 80 blocks + d = c0.upload(Data(DATA, convergence="")) + def _do_check(ur): + n = c0.create_node_from_uri(ur.uri) + return n.check(Monitor(), verify=True) + d.addCallback(_do_check) + def _check(cr): + # the verifier works on all 4 shares in parallel, but only + # fetches one block from each share at a time, so we expect to + # see 4 parallel fetches + self.failUnlessEqual(counterholder._max_active_block_fetches, 4) + d.addCallback(_check) + def _clean_up(res): + allmydata.immutable.checker.ValidatedReadBucketProxy = origVRBP + return res + d.addBoth(_clean_up) + return d + test_immutable.timeout = 10