diff --git a/src/allmydata/mutable/checker.py b/src/allmydata/mutable/checker.py index ea288a0..6b629ec 100644 --- a/src/allmydata/mutable/checker.py +++ b/src/allmydata/mutable/checker.py @@ -3,11 +3,12 @@ from allmydata.uri import from_string from allmydata.util import base32, log from allmydata.check_results import CheckAndRepairResults, CheckResults -from allmydata.mutable.common import MODE_CHECK, CorruptShareError +from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError from allmydata.mutable.servermap import ServerMap, ServermapUpdater from allmydata.mutable.retrieve import Retrieve # for verifying class MutableChecker: + SERVERMAP_MODE = MODE_CHECK def __init__(self, node, storage_broker, history, monitor): self._node = node @@ -26,7 +27,8 @@ class MutableChecker: # of finding all of the shares, and getting a good idea of # recoverability, etc, without verifying. u = ServermapUpdater(self._node, self._storage_broker, self._monitor, - servermap, MODE_CHECK, add_lease=add_lease) + servermap, self.SERVERMAP_MODE, + add_lease=add_lease) if self._history: self._history.notify_mapupdate(u.get_status()) d = u.update() @@ -241,6 +243,8 @@ class MutableChecker: class MutableCheckAndRepairer(MutableChecker): + SERVERMAP_MODE = MODE_WRITE # needed to get the privkey + def __init__(self, node, storage_broker, history, monitor): MutableChecker.__init__(self, node, storage_broker, history, monitor) self.cr_results = CheckAndRepairResults(self._storage_index) @@ -264,7 +268,7 @@ class MutableCheckAndRepairer(MutableChecker): self.cr_results.repair_attempted = False return self.cr_results.repair_attempted = True - d = self._node.repair(self.results) + d = self._node.repair(self.results, monitor=self._monitor) def _repair_finished(repair_results): self.cr_results.repair_successful = repair_results.get_successful() r = CheckResults(from_string(self._node.get_uri()), self._storage_index) diff --git a/src/allmydata/mutable/common.py b/src/allmydata/mutable/common.py index 9ce11c5..9ce8e37 100644 --- a/src/allmydata/mutable/common.py +++ b/src/allmydata/mutable/common.py @@ -6,6 +6,7 @@ MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial # creation MODE_READ = "MODE_READ" +MODE_REPAIR = "MODE_REPAIR" # query all peers, get the privkey class NotWriteableError(Exception): pass diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py index 74ed7f0..61ccd39 100644 --- a/src/allmydata/mutable/filenode.py +++ b/src/allmydata/mutable/filenode.py @@ -308,9 +308,10 @@ class MutableFileNode: ################################# # IRepairable - def repair(self, check_results, force=False): + def repair(self, check_results, force=False, monitor=None): assert ICheckResults(check_results) - r = Repairer(self, check_results) + r = Repairer(self, check_results, self._storage_broker, + self._history, monitor) d = r.start(force) return d diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index 4f61ad1..c611a6f 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -15,7 +15,7 @@ from allmydata.storage.server import si_b2a from pycryptopp.cipher.aes import AES from foolscap.api import eventually, fireEventually -from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \ +from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \ UncoordinatedWriteError, NotEnoughServersError from allmydata.mutable.servermap import ServerMap from allmydata.mutable.layout import get_version_from_checkstring,\ @@ -187,7 +187,7 @@ class Publish: # servermap was updated in MODE_WRITE, so we can depend upon the # serverlist computed by that process instead of computing our own. assert self._servermap - assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK) + assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) # we will push a version that is one larger than anything present # in the grid, according to the servermap. self._new_seqnum = self._servermap.highest_seqnum() + 1 @@ -373,7 +373,7 @@ class Publish: # servermap was updated in MODE_WRITE, so we can depend upon the # serverlist computed by that process instead of computing our own. if self._servermap: - assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK) + assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) # we will push a version that is one larger than anything present # in the grid, according to the servermap. self._new_seqnum = self._servermap.highest_seqnum() + 1 diff --git a/src/allmydata/mutable/repairer.py b/src/allmydata/mutable/repairer.py index d0bfeff..94641e4 100644 --- a/src/allmydata/mutable/repairer.py +++ b/src/allmydata/mutable/repairer.py @@ -3,6 +3,8 @@ from zope.interface import implements from twisted.internet import defer from allmydata.interfaces import IRepairResults, ICheckResults from allmydata.mutable.publish import MutableData +from allmydata.mutable.common import MODE_REPAIR +from allmydata.mutable.servermap import ServerMap, ServermapUpdater class RepairResults: implements(IRepairResults) @@ -23,10 +25,13 @@ class MustForceRepairError(Exception): pass class Repairer: - def __init__(self, node, check_results): + def __init__(self, node, check_results, storage_broker, history, monitor): self.node = node self.check_results = ICheckResults(check_results) assert check_results.storage_index == self.node.get_storage_index() + self._storage_broker = storage_broker + self._history = history + self._monitor = monitor def start(self, force=False): # download, then re-publish. If a server had a bad share, try to @@ -55,8 +60,17 @@ class Repairer: # old shares: replace old shares with the latest version # bogus shares (bad sigs): replace the bad one with a good one - smap = self.check_results.get_servermap() + # first, update the servermap in MODE_REPAIR, which files all shares + # and makes sure we get the privkey. + u = ServermapUpdater(self.node, self._storage_broker, self._monitor, + ServerMap(), MODE_REPAIR) + if self._history: + self._history.notify_mapupdate(u.get_status()) + d = u.update() + d.addCallback(self._got_full_servermap, force) + return d + def _got_full_servermap(self, smap, force): best_version = smap.best_recoverable_version() if not best_version: # the file is damaged beyond repair diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index df75518..e908e42 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -12,8 +12,8 @@ from allmydata.storage.server import si_b2a from allmydata.interfaces import IServermapUpdaterStatus from pycryptopp.publickey import rsa -from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ - CorruptShareError +from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \ + MODE_READ, MODE_REPAIR, CorruptShareError from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy class UpdateStatus: @@ -426,7 +426,7 @@ class ServermapUpdater: self._read_size = 1000 self._need_privkey = False - if mode == MODE_WRITE and not self._node.get_privkey(): + if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey(): self._need_privkey = True # check+repair: repair requires the privkey, so if we didn't happen # to ask for it during the check, we'll have problems doing the @@ -497,7 +497,7 @@ class ServermapUpdater: # might not wait for all of their answers to come back) self.num_servers_to_query = k + self.EPSILON - if self.mode == MODE_CHECK: + if self.mode in (MODE_CHECK, MODE_REPAIR): # We want to query all of the servers. initial_servers_to_query = list(full_serverlist) must_query = set(initial_servers_to_query) @@ -1063,7 +1063,7 @@ class ServermapUpdater: parent=lp) return self._done() - if self.mode == MODE_CHECK: + if self.mode == (MODE_CHECK, MODE_REPAIR): # we used self._must_query, and we know there aren't any # responses still waiting, so that means we must be done self.log("done", parent=lp) diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index c878117..a13a00e 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -39,6 +39,12 @@ from allmydata.test.test_download import PausingConsumer, \ PausingAndStoppingConsumer, StoppingConsumer, \ ImmediatelyStoppingConsumer +def eventuaaaaaly(res=None): + d = fireEventually(res) + d.addCallback(fireEventually) + d.addCallback(fireEventually) + return d + # this "FakeStorage" exists to put the share data in RAM and avoid using real # network connections, both to speed up the tests and to reduce the amount of @@ -69,8 +75,8 @@ class FakeStorage: def read(self, peerid, storage_index): shares = self._peers.get(peerid, {}) if self._sequence is None: - return defer.succeed(shares) - d = defer.Deferred() + return eventuaaaaaly(shares) + d = eventuaaaaaly() if not self._pending: self._pending_timer = reactor.callLater(1.0, self._fire_readers) if peerid not in self._pending: @@ -233,11 +239,12 @@ def make_storagebroker(s=None, num_peers=10): storage_broker.test_add_rref(peerid, fss, ann) return storage_broker -def make_nodemaker(s=None, num_peers=10): +def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE): storage_broker = make_storagebroker(s, num_peers) sh = client.SecretHolder("lease secret", "convergence secret") keygen = client.KeyGenerator() - keygen.set_default_keysize(TEST_RSA_KEY_SIZE) + if keysize: + keygen.set_default_keysize(keysize) nodemaker = NodeMaker(storage_broker, sh, None, None, None, {"k": 3, "n": 10}, SDMF_VERSION, keygen) @@ -957,6 +964,20 @@ class PublishMixin: d.addCallback(_created) return d + def publish_empty_sdmf(self): + self.CONTENTS = "" + self.uploadable = MutableData(self.CONTENTS) + self._storage = FakeStorage() + self._nodemaker = make_nodemaker(self._storage, keysize=None) + self._storage_broker = self._nodemaker.storage_broker + d = self._nodemaker.create_mutable_file(self.uploadable, + version=SDMF_VERSION) + def _created(node): + self._fn = node + self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) + d.addCallback(_created) + return d + def publish_multiple(self, version=0): self.CONTENTS = ["Contents 0", @@ -2157,6 +2178,26 @@ class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin): d.addCallback(_check_results) return d + def test_repair_empty(self): + # bug 1689: delete one share of an empty mutable file, then repair. + # In the buggy version, the check that preceeds the retrieve+publish + # cycle uses MODE_READ, instead of MODE_REPAIR, and fails to get the + # privkey that repair needs. + d = self.publish_empty_sdmf() + def _delete_one_share(ign): + shares = self._storage._peers + for peerid in shares: + for shnum in list(shares[peerid]): + if shnum == 0: + del shares[peerid][shnum] + d.addCallback(_delete_one_share) + d.addCallback(lambda ign: self._fn2.check(Monitor())) + d.addCallback(lambda check_results: self._fn2.repair(check_results)) + def _check(crr): + self.failUnlessEqual(crr.get_successful(), True) + d.addCallback(_check) + return d + class DevNullDictionary(dict): def __setitem__(self, key, value): return