diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 0268d8d..ce2c15f 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -480,6 +480,8 @@ class Checker(log.PrefixingLogMixin): fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(), self._verifycap.get_storage_index()) self.file_cancel_secret = fcs + self._num_active_block_fetches = 0 + self._max_active_block_fetches = 0 def _get_renewal_secret(self, seed): return bucket_renewal_secret_hash(self.file_renewal_secret, seed) @@ -554,6 +556,12 @@ class Checker(log.PrefixingLogMixin): f_value=str(f.value), level=log.WEIRD, umid="hEGuQg") + def _debug_start_block_fetch(self): + self._num_active_block_fetches += 1 + if self._num_active_block_fetches > self._max_active_block_fetches: + self._max_active_block_fetches = self._num_active_block_fetches + def _debug_finish_block_fetch(self): + self._num_active_block_fetches -= 1 def _download_and_verify(self, serverid, sharenum, bucket): """Start an attempt to download and verify every block in this bucket @@ -612,19 +620,24 @@ class Checker(log.PrefixingLogMixin): return d d.addCallback(_got_ueb) - def _discard_result(r): - assert isinstance(r, str), r - # to free up the RAM - return None def _get_blocks(vrbp): - ds = [] - for blocknum in range(veup.num_segments): + def _discard_result(r): + assert isinstance(r, str), r + self._debug_finish_block_fetch() + # to free up the RAM + return None + def _get_block(ign, blocknum): + self._debug_start_block_fetch() db = vrbp.get_block(blocknum) db.addCallback(_discard_result) - ds.append(db) - # this gatherResults will fire once every block of this share has - # been downloaded and verified, or else it will errback. - return deferredutil.gatherResults(ds) + return db + dbs = defer.succeed(None) + for blocknum in range(veup.num_segments): + dbs.addCallback(_get_block, blocknum) + # The Deferred we return will fire after every block of this + # share has been downloaded and verified successfully, or else it + # will errback as soon as the first error is observed. + return dbs d.addCallback(_get_blocks) # if none of those errbacked, the blocks (and the hashes above them) @@ -788,6 +801,7 @@ class Checker(log.PrefixingLogMixin): cr.set_needs_rebalancing(d['count-good-share-hosts'] < d['count-shares-good']) cr.set_data(d) + cr._debug_max_active_block_fetches = self._max_active_block_fetches return cr diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index b302c10..4670018 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -319,3 +319,33 @@ class AddLease(GridTestMixin, unittest.TestCase): d.addCallback(lambda ign: self.failUnless(really_did_break)) return d + +class TooParallel(GridTestMixin, unittest.TestCase): + # bug #1395: immutable verifier was aggressively parallized, checking all + # blocks of all shares at the same time, blowing our memory budget and + # crashing with MemoryErrors on >1GB files. + + def test_immutable(self): + self.basedir = "checker/TooParallel/immutable" + self.set_up_grid(num_servers=4) + c0 = self.g.clients[0] + c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, + "happy": 4, + "n": 4, + "max_segment_size": 5, + } + self.uris = {} + DATA = "data" * 100 # 400/5 = 80 blocks + d = c0.upload(Data(DATA, convergence="")) + def _do_check(ur): + n = c0.create_node_from_uri(ur.uri) + return n.check(Monitor(), verify=True) + d.addCallback(_do_check) + def _check(cr): + # the verifier works on all 4 shares in parallel, but only + # fetches one block from each share at a time, so we expect to + # see at most 4 parallel fetches + self.failUnlessEqual(cr._debug_max_active_block_fetches, 4) + d.addCallback(_check) + return d +