diff -rN -u old-ticket1395/src/allmydata/immutable/checker.py new-ticket1395/src/allmydata/immutable/checker.py --- old-ticket1395/src/allmydata/immutable/checker.py 2011-06-17 00:04:59.542717505 -0600 +++ new-ticket1395/src/allmydata/immutable/checker.py 2011-06-17 00:04:59.715717505 -0600 @@ -617,14 +617,18 @@ # to free up the RAM return None def _get_blocks(vrbp): - ds = [] - for blocknum in range(veup.num_segments): + def _get_block(ign, blocknum): db = vrbp.get_block(blocknum) db.addCallback(_discard_result) - ds.append(db) - # this gatherResults will fire once every block of this share has - # been downloaded and verified, or else it will errback. - return deferredutil.gatherResults(ds) + return db + dbs = defer.succeed(None) + for blocknum in range(veup.num_segments): + dbs.addCallback(_get_block, blocknum) + # The Deferred we return will fire after every block of this + # share has been downloaded and verified successfully, or else it + # will errback as soon as the first error is observed. + return dbs + d.addCallback(_get_blocks) # if none of those errbacked, the blocks (and the hashes above them) diff -rN -u old-ticket1395/src/allmydata/test/test_checker.py new-ticket1395/src/allmydata/test/test_checker.py --- old-ticket1395/src/allmydata/test/test_checker.py 2011-06-17 00:04:59.586717505 -0600 +++ new-ticket1395/src/allmydata/test/test_checker.py 2011-06-17 00:04:59.729717505 -0600 @@ -1,3 +1,4 @@ +from allmydata.util import mockutil import simplejson from twisted.trial import unittest @@ -319,3 +320,67 @@ d.addCallback(lambda ign: self.failUnless(really_did_break)) return d + +class CounterHolder(object): + def __init__(self): + self._num_active_block_fetches = 0 + self._max_active_block_fetches = 0 + +from allmydata.immutable.checker import ValidatedReadBucketProxy +class MockVRBP(ValidatedReadBucketProxy): + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder): + ValidatedReadBucketProxy.__init__(self, sharenum, bucket, + share_hash_tree, num_blocks, + block_size, share_size) + self.counterholder = counterholder + + def get_block(self, blocknum): + self.counterholder._num_active_block_fetches += 1 + if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches: + self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches + d = ValidatedReadBucketProxy.get_block(self, blocknum) + def _mark_no_longer_active(res): + self.counterholder._num_active_block_fetches -= 1 + return res + d.addBoth(_mark_no_longer_active) + return d + +class TooParallel(GridTestMixin, unittest.TestCase): + # bug #1395: immutable verifier was aggressively parallized, checking all + # blocks of all shares at the same time, blowing our memory budget and + # crashing with MemoryErrors on >1GB files. + + @mockutil.patch('allmydata.immutable.checker.ValidatedReadBucketProxy') + def test_immutable(self, mockVRBPC): + self.basedir = "checker/TooParallel/immutable" + + # If any code asks to instantiate a ValidatedReadBucketProxy, + # we give them a MockVRBP which is configured to use our + # CounterHolder. + counterholder = CounterHolder() + def make_mock_VRBP(*args, **kwargs): + return MockVRBP(counterholder=counterholder, *args, **kwargs) + mockVRBPC.side_effect = make_mock_VRBP + + self.set_up_grid(num_servers=4) + c0 = self.g.clients[0] + c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, + "happy": 4, + "n": 4, + "max_segment_size": 5, + } + self.uris = {} + DATA = "data" * 100 # 400/5 = 80 blocks + d = c0.upload(Data(DATA, convergence="")) + def _do_check(ur): + n = c0.create_node_from_uri(ur.uri) + return n.check(Monitor(), verify=True) + d.addCallback(_do_check) + def _check(cr): + # the verifier works on all 4 shares in parallel, but only + # fetches one block from each share at a time, so we expect to + # see 4 parallel fetches + self.failUnlessEqual(counterholder._max_active_block_fetches, 4) + d.addCallback(_check) + return d + test_immutable.timeout = 10 diff -rN -u old-ticket1395/src/allmydata/util/mockutil.py new-ticket1395/src/allmydata/util/mockutil.py --- old-ticket1395/src/allmydata/util/mockutil.py 1969-12-31 17:00:00.000000000 -0700 +++ new-ticket1395/src/allmydata/util/mockutil.py 2011-06-17 00:04:59.744717505 -0600 @@ -0,0 +1,53 @@ +import mock + +from mock import wraps, DEFAULT, _importer +from mock import _patch as original_under_patch + +Deferred = None +try: + from twisted.internet import defer + Deferred = defer.Deferred +except ImportError: + pass + +# copied from Michael Foord's mock.py and modified + +class _deferrable_under_patch(original_under_patch): + def decorate_callable(self, func): + if hasattr(func, 'patchings'): + func.patchings.append(self) + return func + + @wraps(func) + def patched(*args, **keywargs): + # don't use a with here (backwards compatability with 2.5) + extra_args = [] + for patching in patched.patchings: + arg = patching.__enter__() + if patching.new is DEFAULT: + extra_args.append(arg) + args += tuple(extra_args) + def cleanup(res): + for patching in reversed(getattr(patched, 'patchings', [])): + patching.__exit__() + return res + singleton = {} + retval = singleton + try: + retval = func(*args, **keywargs) + except: + cleanup(None) + raise + if Deferred is None or not isinstance(retval, Deferred): + return cleanup(retval) + retval.addBoth(cleanup) + return retval + + patched.patchings = [self] + if hasattr(func, 'func_code'): + # not in Python 3 + patched.compat_co_firstlineno = getattr( + func, "compat_co_firstlineno", + func.func_code.co_firstlineno + ) + return patched