Wed Sep 23 21:19:32 PDT 2009 Kevan Carstensen * Alter CiphertextDownloader to work with servers_of_happiness Tue Nov 3 19:32:41 PST 2009 Kevan Carstensen * Alter the signature of set_shareholders in IEncoder to add a 'servermap' parameter, which gives IEncoders enough information to perform a sane check for servers_of_happiness. Wed Nov 4 03:12:22 PST 2009 Kevan Carstensen * Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness instead of shares_of_happiness. Mon Nov 16 11:28:05 PST 2009 Kevan Carstensen * Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly servers, fixing an issue in #778 Mon Nov 16 13:24:59 PST 2009 Kevan Carstensen * Change stray "shares_of_happiness" to "servers_of_happiness" Tue Nov 17 17:45:42 PST 2009 Kevan Carstensen * Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganize some things. Sun Nov 22 16:24:05 PST 2009 Kevan Carstensen * Alter the error message returned when peer selection fails The Tahoe2PeerSelector returned either NoSharesError or NotEnoughSharesError for a variety of error conditions that weren't informatively described by them. This patch creates a new error, UploadHappinessError, replaces uses of NoSharesError and NotEnoughSharesError with it, and alters the error message raised with the errors to be more in line with the new servers_of_happiness behavior. See ticket #834 for more information. Fri Dec 4 20:30:37 PST 2009 Kevan Carstensen * Change "UploadHappinessError" to "UploadUnhappinessError" Wed Dec 30 13:03:44 PST 2009 Kevan Carstensen * Alter the error message when an upload fails, per some comments in #778. When I first implemented #778, I just altered the error messages to refer to servers where they referred to shares. The resulting error messages weren't very good. These are a bit better. Fri Dec 4 19:40:05 PST 2009 "Kevan Carstensen" * Alter wording in 'interfaces.py' to be correct wrt #778 Fri May 7 15:11:47 PDT 2010 Kevan Carstensen * Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. New patches: [Alter CiphertextDownloader to work with servers_of_happiness Kevan Carstensen **20090924041932 Ignore-this: e81edccf0308c2d3bedbc4cf217da197 ] hunk ./src/allmydata/immutable/download.py 1039 # Repairer (uploader) needs the encodingparams. self._target.set_encodingparams(( self._verifycap.needed_shares, - self._verifycap.total_shares, # I don't think the target actually cares about "happy". + 0, # see ticket #778 for why this is self._verifycap.total_shares, self._vup.segment_size )) [Alter the signature of set_shareholders in IEncoder to add a 'servermap' parameter, which gives IEncoders enough information to perform a sane check for servers_of_happiness. Kevan Carstensen **20091104033241 Ignore-this: b3a6649a8ac66431beca1026a31fed94 ] { hunk ./src/allmydata/interfaces.py 1341 Once this is called, set_size() and set_params() may not be called. """ - def set_shareholders(shareholders): + def set_shareholders(shareholders, servermap): """Tell the encoder where to put the encoded shares. 'shareholders' must be a dictionary that maps share number (an integer ranging from hunk ./src/allmydata/interfaces.py 1344 - 0 to n-1) to an instance that provides IStorageBucketWriter. This - must be performed before start() can be called.""" + 0 to n-1) to an instance that provides IStorageBucketWriter. + 'servermap' is a dictionary that maps share number (as defined above) + to a peerid. This must be performed before start() can be called.""" def start(): """Begin the encode/upload process. This involves reading encrypted } [Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness instead of shares_of_happiness. Kevan Carstensen **20091104111222 Ignore-this: abb3283314820a8bbf9b5d0cbfbb57c8 ] { hunk ./src/allmydata/immutable/encode.py 121 assert not self._codec k, happy, n, segsize = params self.required_shares = k - self.shares_of_happiness = happy + self.servers_of_happiness = happy self.num_shares = n self.segment_size = segsize self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize)) hunk ./src/allmydata/immutable/encode.py 179 if name == "storage_index": return self._storage_index elif name == "share_counts": - return (self.required_shares, self.shares_of_happiness, + return (self.required_shares, self.servers_of_happiness, self.num_shares) elif name == "num_segments": return self.num_segments hunk ./src/allmydata/immutable/encode.py 194 else: raise KeyError("unknown parameter name '%s'" % name) - def set_shareholders(self, landlords): + def set_shareholders(self, landlords, servermap): assert isinstance(landlords, dict) for k in landlords: assert IStorageBucketWriter.providedBy(landlords[k]) hunk ./src/allmydata/immutable/encode.py 199 self.landlords = landlords.copy() + assert isinstance(servermap, dict) + self.servermap = servermap.copy() def start(self): """ Returns a Deferred that will fire with the verify cap (an instance of hunk ./src/allmydata/immutable/encode.py 491 # even more UNUSUAL self.log("they weren't in our list of landlords", parent=ln, level=log.WEIRD, umid="TQGFRw") - if len(self.landlords) < self.shares_of_happiness: - msg = "lost too many shareholders during upload (still have %d, want %d): %s" % \ - (len(self.landlords), self.shares_of_happiness, why) - if self.landlords: + del(self.servermap[shareid]) + servers_left = list(set(self.servermap.values())) + if len(servers_left) < self.servers_of_happiness: + msg = "lost too many servers during upload (still have %d, want %d): %s" % \ + (len(servers_left), + self.servers_of_happiness, why) + if servers_left: raise NotEnoughSharesError(msg) else: raise NoSharesError(msg) hunk ./src/allmydata/immutable/encode.py 502 self.log("but we can still continue with %s shares, we'll be happy " - "with at least %s" % (len(self.landlords), - self.shares_of_happiness), + "with at least %s" % (len(servers_left), + self.servers_of_happiness), parent=ln) def _gather_responses(self, dl): hunk ./src/allmydata/immutable/upload.py 131 self.buckets.update(b) return (alreadygot, set(b.keys())) +def servers_with_shares(existing_shares, used_peers=None): + servers = [] + if used_peers: + peers = list(used_peers.copy()) + # We do this because the preexisting shares list goes by peerid. + peers = [x.peerid for x in peers] + servers.extend(peers) + servers.extend(existing_shares.values()) + return list(set(servers)) + +def shares_by_server(existing_shares): + servers = {} + for server in set(existing_shares.values()): + servers[server] = set([x for x in existing_shares.keys() + if existing_shares[x] == server]) + return servers + class Tahoe2PeerSelector: def __init__(self, upload_id, logparent=None, upload_status=None): hunk ./src/allmydata/immutable/upload.py 164 def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, - num_segments, total_shares, shares_of_happiness): + num_segments, total_shares, servers_of_happiness): """ @return: (used_peers, already_peers), where used_peers is a set of PeerTracker instances that have agreed to hold some shares hunk ./src/allmydata/immutable/upload.py 177 self._status.set_status("Contacting Peers..") self.total_shares = total_shares - self.shares_of_happiness = shares_of_happiness + self.servers_of_happiness = servers_of_happiness self.homeless_shares = range(total_shares) # self.uncontacted_peers = list() # peers we haven't asked yet hunk ./src/allmydata/immutable/upload.py 242 d = defer.maybeDeferred(self._loop) return d + def _loop(self): if not self.homeless_shares: hunk ./src/allmydata/immutable/upload.py 245 - # all done - msg = ("placed all %d shares, " - "sent %d queries to %d peers, " - "%d queries placed some shares, %d placed none, " - "got %d errors" % - (self.total_shares, - self.query_count, self.num_peers_contacted, - self.good_query_count, self.bad_query_count, - self.error_count)) - log.msg("peer selection successful for %s: %s" % (self, msg), + effective_happiness = servers_with_shares( + self.preexisting_shares, + self.use_peers) + if self.servers_of_happiness <= len(effective_happiness): + msg = ("placed all %d shares, " + "sent %d queries to %d peers, " + "%d queries placed some shares, %d placed none, " + "got %d errors" % + (self.total_shares, + self.query_count, self.num_peers_contacted, + self.good_query_count, self.bad_query_count, + self.error_count)) + log.msg("peer selection successful for %s: %s" % (self, msg), parent=self._log_parent) hunk ./src/allmydata/immutable/upload.py 259 - return (self.use_peers, self.preexisting_shares) + return (self.use_peers, self.preexisting_shares) + else: + delta = self.servers_of_happiness - len(effective_happiness) + shares = shares_by_server(self.preexisting_shares) + # Each server in shares maps to a set of shares stored on it. + # Since we want to keep at least one share on each server + # that has one (otherwise we'd only be making + # the situation worse by removing distinct servers), + # each server has len(its shares) - 1 to spread around. + shares_to_spread = sum([len(list(sharelist)) - 1 + for (server, sharelist) + in shares.items()]) + if delta <= len(self.uncontacted_peers) and \ + shares_to_spread >= delta: + # Loop through the allocated shares, removing + items = shares.items() + while len(self.homeless_shares) < delta: + servernum, sharelist = items.pop() + if len(sharelist) > 1: + share = sharelist.pop() + self.homeless_shares.append(share) + del(self.preexisting_shares[share]) + items.append((servernum, sharelist)) + return self._loop() + else: + raise NotEnoughSharesError("shares could only be placed on %d " + "servers (%d were requested)" % + (len(effective_happiness), + self.servers_of_happiness)) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) hunk ./src/allmydata/immutable/upload.py 336 else: # no more peers. If we haven't placed enough shares, we fail. placed_shares = self.total_shares - len(self.homeless_shares) - if placed_shares < self.shares_of_happiness: + effective_happiness = servers_with_shares( + self.preexisting_shares, + self.use_peers) + if len(effective_happiness) < self.servers_of_happiness: msg = ("placed %d shares out of %d total (%d homeless), " hunk ./src/allmydata/immutable/upload.py 341 - "want to place %d, " + "want to place on %d servers, " "sent %d queries to %d peers, " "%d queries placed some shares, %d placed none, " "got %d errors" % hunk ./src/allmydata/immutable/upload.py 347 (self.total_shares - len(self.homeless_shares), self.total_shares, len(self.homeless_shares), - self.shares_of_happiness, + self.servers_of_happiness, self.query_count, self.num_peers_contacted, self.good_query_count, self.bad_query_count, self.error_count)) hunk ./src/allmydata/immutable/upload.py 394 level=log.NOISY, parent=self._log_parent) progress = False for s in alreadygot: + if self.preexisting_shares.has_key(s): + old_size = len(servers_with_shares(self.preexisting_shares)) + new_candidate = self.preexisting_shares.copy() + new_candidate[s] = peer.peerid + new_size = len(servers_with_shares(new_candidate)) + if old_size >= new_size: continue self.preexisting_shares[s] = peer.peerid if s in self.homeless_shares: self.homeless_shares.remove(s) hunk ./src/allmydata/immutable/upload.py 825 for peer in used_peers: assert isinstance(peer, PeerTracker) buckets = {} + servermap = already_peers.copy() for peer in used_peers: buckets.update(peer.buckets) for shnum in peer.buckets: hunk ./src/allmydata/immutable/upload.py 830 self._peer_trackers[shnum] = peer + servermap[shnum] = peer.peerid assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) hunk ./src/allmydata/immutable/upload.py 832 - encoder.set_shareholders(buckets) + encoder.set_shareholders(buckets, servermap) def _encrypted_done(self, verifycap): """ Returns a Deferred that will fire with the UploadResults instance. """ replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] _servers_with_shares _servers_with_unique_shares replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] servers_with_shares servers_with_unique_shares } [Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly servers, fixing an issue in #778 Kevan Carstensen **20091116192805 Ignore-this: 15289f4d709e03851ed0587b286fd955 ] { hunk ./src/allmydata/immutable/upload.py 117 d.addCallback(self._got_reply) return d + def query_allocated(self): + d = self._storageserver.callRemote("get_buckets", + self.storage_index) + d.addCallback(self._got_allocate_reply) + return d + + def _got_allocate_reply(self, buckets): + return (self.peerid, buckets) + def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) b = {} hunk ./src/allmydata/immutable/upload.py 195 self._started_second_pass = False self.use_peers = set() # PeerTrackers that have shares assigned to them self.preexisting_shares = {} # sharenum -> peerid holding the share + # We don't try to allocate shares to these servers, since they've + # said that they're incapable of storing shares of the size that + # we'd want to store. We keep them around because they may have + # existing shares for this storage index, which we want to know + # about for accurate servers_of_happiness accounting + self.readonly_peers = [] peers = storage_broker.get_servers_for_index(storage_index) if not peers: hunk ./src/allmydata/immutable/upload.py 227 (peerid, conn) = peer v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] - peers = [peer for peer in peers - if _get_maxsize(peer) >= allocated_size] - if not peers: - raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size) + new_peers = [peer for peer in peers + if _get_maxsize(peer) >= allocated_size] + old_peers = list(set(peers).difference(set(new_peers))) + peers = new_peers # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. hunk ./src/allmydata/immutable/upload.py 241 storage_index) file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) - - trackers = [ PeerTracker(peerid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, + def _make_trackers(peers): + return [ PeerTracker(peerid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, peerid), hunk ./src/allmydata/immutable/upload.py 248 - ) - for (peerid, conn) in peers ] - self.uncontacted_peers = trackers - - d = defer.maybeDeferred(self._loop) + bucket_cancel_secret_hash(file_cancel_secret, + peerid)) + for (peerid, conn) in peers] + self.uncontacted_peers = _make_trackers(peers) + self.readonly_peers = _make_trackers(old_peers) + # Talk to the readonly servers to get an idea of what servers + # have what shares (if any) for this storage index + d = defer.maybeDeferred(self._existing_shares) + d.addCallback(lambda ign: self._loop()) return d hunk ./src/allmydata/immutable/upload.py 259 + def _existing_shares(self): + if self.readonly_peers: + peer = self.readonly_peers.pop() + assert isinstance(peer, PeerTracker) + d = peer.query_allocated() + d.addCallback(self._handle_allocate_response) + return d + + def _handle_allocate_response(self, (peer, buckets)): + for bucket in buckets: + self.preexisting_shares[bucket] = peer + if self.homeless_shares: + self.homeless_shares.remove(bucket) + return self._existing_shares() def _loop(self): if not self.homeless_shares: } [Change stray "shares_of_happiness" to "servers_of_happiness" Kevan Carstensen **20091116212459 Ignore-this: 1c971ba8c3c4d2e7ba9f020577b28b73 ] { hunk ./docs/architecture.txt 183 place a quantity known as "shares of happiness", we'll do the upload anyways. If we cannot place at least this many, the upload is declared a failure. -The current defaults use k=3, shares_of_happiness=7, and N=10, meaning that +The current defaults use k=3, servers_of_happiness=7, and N=10, meaning that we'll try to place 10 shares, we'll be happy if we can place 7, and we need to get back any 3 to recover the file. This results in a 3.3x expansion factor. In general, you should set N about equal to the number of nodes in hunk ./src/allmydata/immutable/upload.py 411 pass else: # No more peers, so this upload might fail (it depends upon - # whether we've hit shares_of_happiness or not). Log the last + # whether we've hit servers_of_happiness or not). Log the last # failure we got: if a coding error causes all peers to fail # in the same way, this allows the common failure to be seen # by the uploader and should help with debugging hunk ./src/allmydata/interfaces.py 809 class NotEnoughSharesError(Exception): """Download was unable to get enough shares, or upload was unable to - place 'shares_of_happiness' shares.""" + place 'servers_of_happiness' shares.""" class NoSharesError(Exception): """Upload or Download was unable to get any shares at all.""" hunk ./src/allmydata/interfaces.py 1308 pushed. 'share_counts': return a tuple describing how many shares are used: - (needed_shares, shares_of_happiness, total_shares) + (needed_shares, servers_of_happiness, total_shares) 'num_segments': return an int with the number of segments that will be encoded. hunk ./src/allmydata/test/test_encode.py 768 def test_lost_one_shareholder(self): # we have enough shareholders when we start, but one segment in we # lose one of them. The upload should still succeed, as long as we - # still have 'shares_of_happiness' peers left. + # still have 'servers_of_happiness' peers left. modemap = dict([(i, "good") for i in range(9)] + [(i, "lost") for i in range(9, 10)]) return self.send_and_recover((4,8,10), bucket_modes=modemap) hunk ./src/allmydata/test/test_encode.py 776 def test_lost_one_shareholder_early(self): # we have enough shareholders when we choose peers, but just before # we send the 'start' message, we lose one of them. The upload should - # still succeed, as long as we still have 'shares_of_happiness' peers + # still succeed, as long as we still have 'servers_of_happiness' peers # left. modemap = dict([(i, "good") for i in range(9)] + [(i, "lost-early") for i in range(9, 10)]) } [Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganize some things. Kevan Carstensen **20091118014542 Ignore-this: a6cb032cbff74f4f9d4238faebd99868 ] { hunk ./src/allmydata/immutable/upload.py 141 return (alreadygot, set(b.keys())) def servers_with_unique_shares(existing_shares, used_peers=None): + """ + I accept a dict of shareid -> peerid mappings (and optionally a list + of PeerTracker instances) and return a list of servers that have shares. + """ servers = [] hunk ./src/allmydata/immutable/upload.py 146 + existing_shares = existing_shares.copy() if used_peers: hunk ./src/allmydata/immutable/upload.py 148 + peerdict = {} + for peer in used_peers: + peerdict.update(dict([(i, peer.peerid) for i in peer.buckets])) + for k in peerdict.keys(): + if existing_shares.has_key(k): + # Prevent overcounting; favor the bucket, and not the + # prexisting share. + del(existing_shares[k]) peers = list(used_peers.copy()) # We do this because the preexisting shares list goes by peerid. peers = [x.peerid for x in peers] hunk ./src/allmydata/immutable/upload.py 164 return list(set(servers)) def shares_by_server(existing_shares): + """ + I accept a dict of shareid -> peerid mappings, and return a dict + of peerid -> shareid mappings + """ servers = {} for server in set(existing_shares.values()): servers[server] = set([x for x in existing_shares.keys() hunk ./src/allmydata/immutable/upload.py 174 if existing_shares[x] == server]) return servers +def should_add_server(existing_shares, server, bucket): + """ + I tell my caller whether the servers_of_happiness number will be + increased or decreased if a particular server is added as the peer + already holding a particular share. I take a dictionary, a peerid, + and a bucket as arguments, and return a boolean. + """ + old_size = len(servers_with_unique_shares(existing_shares)) + new_candidate = existing_shares.copy() + new_candidate[bucket] = server + new_size = len(servers_with_unique_shares(new_candidate)) + return old_size < new_size + class Tahoe2PeerSelector: def __init__(self, upload_id, logparent=None, upload_status=None): hunk ./src/allmydata/immutable/upload.py 294 peer = self.readonly_peers.pop() assert isinstance(peer, PeerTracker) d = peer.query_allocated() - d.addCallback(self._handle_allocate_response) + d.addCallback(self._handle_existing_response) return d hunk ./src/allmydata/immutable/upload.py 297 - def _handle_allocate_response(self, (peer, buckets)): + def _handle_existing_response(self, (peer, buckets)): for bucket in buckets: hunk ./src/allmydata/immutable/upload.py 299 - self.preexisting_shares[bucket] = peer - if self.homeless_shares: - self.homeless_shares.remove(bucket) + if should_add_server(self.preexisting_shares, peer, bucket): + self.preexisting_shares[bucket] = peer + if self.homeless_shares and bucket in self.homeless_shares: + self.homeless_shares.remove(bucket) return self._existing_shares() def _loop(self): hunk ./src/allmydata/immutable/upload.py 346 items.append((servernum, sharelist)) return self._loop() else: - raise NotEnoughSharesError("shares could only be placed on %d " - "servers (%d were requested)" % - (len(effective_happiness), - self.servers_of_happiness)) + raise NotEnoughSharesError("shares could only be placed " + "on %d servers (%d were requested)" % + (len(effective_happiness), + self.servers_of_happiness)) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) hunk ./src/allmydata/immutable/upload.py 425 # we placed enough to be happy, so we're done if self._status: self._status.set_status("Placed all shares") - return self.use_peers + return (self.use_peers, self.preexisting_shares) def _got_response(self, res, peer, shares_to_ask, put_peer_here): if isinstance(res, failure.Failure): hunk ./src/allmydata/immutable/upload.py 456 level=log.NOISY, parent=self._log_parent) progress = False for s in alreadygot: - if self.preexisting_shares.has_key(s): - old_size = len(servers_with_unique_shares(self.preexisting_shares)) - new_candidate = self.preexisting_shares.copy() - new_candidate[s] = peer.peerid - new_size = len(servers_with_unique_shares(new_candidate)) - if old_size >= new_size: continue - self.preexisting_shares[s] = peer.peerid - if s in self.homeless_shares: - self.homeless_shares.remove(s) - progress = True + if should_add_server(self.preexisting_shares, + peer.peerid, s): + self.preexisting_shares[s] = peer.peerid + if s in self.homeless_shares: + self.homeless_shares.remove(s) + progress = True # the PeerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. } [Alter the error message returned when peer selection fails Kevan Carstensen **20091123002405 Ignore-this: b2a7dc163edcab8d9613bfd6907e5166 The Tahoe2PeerSelector returned either NoSharesError or NotEnoughSharesError for a variety of error conditions that weren't informatively described by them. This patch creates a new error, UploadHappinessError, replaces uses of NoSharesError and NotEnoughSharesError with it, and alters the error message raised with the errors to be more in line with the new servers_of_happiness behavior. See ticket #834 for more information. ] { hunk ./src/allmydata/immutable/encode.py 14 from allmydata.util.assertutil import _assert, precondition from allmydata.codec import CRSEncoder from allmydata.interfaces import IEncoder, IStorageBucketWriter, \ - IEncryptedUploadable, IUploadStatus, NotEnoughSharesError, NoSharesError + IEncryptedUploadable, IUploadStatus, UploadHappinessError + """ The goal of the encoder is to turn the original file into a series of hunk ./src/allmydata/immutable/encode.py 498 msg = "lost too many servers during upload (still have %d, want %d): %s" % \ (len(servers_left), self.servers_of_happiness, why) - if servers_left: - raise NotEnoughSharesError(msg) - else: - raise NoSharesError(msg) + raise UploadHappinessError(msg) self.log("but we can still continue with %s shares, we'll be happy " "with at least %s" % (len(servers_left), self.servers_of_happiness), hunk ./src/allmydata/immutable/encode.py 508 d = defer.DeferredList(dl, fireOnOneErrback=True) def _eatNotEnoughSharesError(f): # all exceptions that occur while talking to a peer are handled - # in _remove_shareholder. That might raise NotEnoughSharesError, + # in _remove_shareholder. That might raise UploadHappinessError, # which will cause the DeferredList to errback but which should hunk ./src/allmydata/immutable/encode.py 510 - # otherwise be consumed. Allow non-NotEnoughSharesError exceptions + # otherwise be consumed. Allow non-UploadHappinessError exceptions # to pass through as an unhandled errback. We use this in lieu of # consumeErrors=True to allow coding errors to be logged. hunk ./src/allmydata/immutable/encode.py 513 - f.trap(NotEnoughSharesError, NoSharesError) + f.trap(UploadHappinessError) return None for d0 in dl: d0.addErrback(_eatNotEnoughSharesError) hunk ./src/allmydata/immutable/upload.py 20 from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ - NotEnoughSharesError, NoSharesError, NoServersError, \ - InsufficientVersionError + NoServersError, InsufficientVersionError, UploadHappinessError from allmydata.immutable import layout from pycryptopp.cipher.aes import AES hunk ./src/allmydata/immutable/upload.py 119 def query_allocated(self): d = self._storageserver.callRemote("get_buckets", self.storage_index) - d.addCallback(self._got_allocate_reply) return d hunk ./src/allmydata/immutable/upload.py 121 - def _got_allocate_reply(self, buckets): - return (self.peerid, buckets) - def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) b = {} hunk ./src/allmydata/immutable/upload.py 187 def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 + # Peers that are working normally, but full. + self.full_count = 0 self.error_count = 0 self.num_peers_contacted = 0 self.last_failure_msg = None hunk ./src/allmydata/immutable/upload.py 291 peer = self.readonly_peers.pop() assert isinstance(peer, PeerTracker) d = peer.query_allocated() - d.addCallback(self._handle_existing_response) + d.addBoth(self._handle_existing_response, peer.peerid) + self.num_peers_contacted += 1 + self.query_count += 1 + log.msg("asking peer %s for any existing shares for upload id %s" + % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id), + level=log.NOISY, parent=self._log_parent) + if self._status: + self._status.set_status("Contacting Peer %s to find " + "any existing shares" + % idlib.shortnodeid_b2a(peer.peerid)) return d hunk ./src/allmydata/immutable/upload.py 303 - def _handle_existing_response(self, (peer, buckets)): - for bucket in buckets: - if should_add_server(self.preexisting_shares, peer, bucket): - self.preexisting_shares[bucket] = peer - if self.homeless_shares and bucket in self.homeless_shares: - self.homeless_shares.remove(bucket) + def _handle_existing_response(self, res, peer): + if isinstance(res, failure.Failure): + log.msg("%s got error during existing shares check: %s" + % (idlib.shortnodeid_b2a(peer), res), + level=log.UNUSUAL, parent=self._log_parent) + self.error_count += 1 + self.bad_query_count += 1 + else: + buckets = res + log.msg("response from peer %s: alreadygot=%s" + % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), + level=log.NOISY, parent=self._log_parent) + for bucket in buckets: + if should_add_server(self.preexisting_shares, peer, bucket): + self.preexisting_shares[bucket] = peer + if self.homeless_shares and bucket in self.homeless_shares: + self.homeless_shares.remove(bucket) + self.full_count += 1 + self.bad_query_count += 1 return self._existing_shares() def _loop(self): hunk ./src/allmydata/immutable/upload.py 365 items.append((servernum, sharelist)) return self._loop() else: - raise NotEnoughSharesError("shares could only be placed " + raise UploadHappinessError("shares could only be placed " "on %d servers (%d were requested)" % (len(effective_happiness), self.servers_of_happiness)) hunk ./src/allmydata/immutable/upload.py 424 msg = ("placed %d shares out of %d total (%d homeless), " "want to place on %d servers, " "sent %d queries to %d peers, " - "%d queries placed some shares, %d placed none, " - "got %d errors" % + "%d queries placed some shares, %d placed none " + "(of which %d placed none due to the server being" + " full and %d placed none due to an error)" % (self.total_shares - len(self.homeless_shares), self.total_shares, len(self.homeless_shares), self.servers_of_happiness, hunk ./src/allmydata/immutable/upload.py 432 self.query_count, self.num_peers_contacted, self.good_query_count, self.bad_query_count, - self.error_count)) + self.full_count, self.error_count)) msg = "peer selection failed for %s: %s" % (self, msg) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) hunk ./src/allmydata/immutable/upload.py 437 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent) - if placed_shares: - raise NotEnoughSharesError(msg) - else: - raise NoSharesError(msg) + raise UploadHappinessError(msg) else: # we placed enough to be happy, so we're done if self._status: hunk ./src/allmydata/immutable/upload.py 451 log.msg("%s got error during peer selection: %s" % (peer, res), level=log.UNUSUAL, parent=self._log_parent) self.error_count += 1 + self.bad_query_count += 1 self.homeless_shares = list(shares_to_ask) + self.homeless_shares if (self.uncontacted_peers or self.contacted_peers hunk ./src/allmydata/immutable/upload.py 479 self.preexisting_shares[s] = peer.peerid if s in self.homeless_shares: self.homeless_shares.remove(s) - progress = True # the PeerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. hunk ./src/allmydata/immutable/upload.py 495 self.good_query_count += 1 else: self.bad_query_count += 1 + self.full_count += 1 if still_homeless: # In networks with lots of space, this is very unusual and hunk ./src/allmydata/interfaces.py 808 """ class NotEnoughSharesError(Exception): - """Download was unable to get enough shares, or upload was unable to - place 'servers_of_happiness' shares.""" + """Download was unable to get enough shares""" class NoSharesError(Exception): hunk ./src/allmydata/interfaces.py 811 - """Upload or Download was unable to get any shares at all.""" + """Download was unable to get any shares at all.""" + +class UploadHappinessError(Exception): + """Upload was unable to satisfy 'servers_of_happiness'""" class UnableToFetchCriticalDownloadDataError(Exception): """I was unable to fetch some piece of critical data which is supposed to } [Change "UploadHappinessError" to "UploadUnhappinessError" Kevan Carstensen **20091205043037 Ignore-this: 236b64ab19836854af4993bb5c1b221a ] { replace ./src/allmydata/immutable/encode.py [A-Za-z_0-9] UploadHappinessError UploadUnhappinessError replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] UploadHappinessError UploadUnhappinessError replace ./src/allmydata/interfaces.py [A-Za-z_0-9] UploadHappinessError UploadUnhappinessError } [Alter the error message when an upload fails, per some comments in #778. Kevan Carstensen **20091230210344 Ignore-this: ba97422b2f9737c46abeb828727beb1 When I first implemented #778, I just altered the error messages to refer to servers where they referred to shares. The resulting error messages weren't very good. These are a bit better. ] { hunk ./src/allmydata/immutable/upload.py 200 def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, - num_segments, total_shares, servers_of_happiness): + num_segments, total_shares, needed_shares, + servers_of_happiness): """ @return: (used_peers, already_peers), where used_peers is a set of PeerTracker instances that have agreed to hold some shares hunk ./src/allmydata/immutable/upload.py 215 self.total_shares = total_shares self.servers_of_happiness = servers_of_happiness + self.needed_shares = needed_shares self.homeless_shares = range(total_shares) # self.uncontacted_peers = list() # peers we haven't asked yet hunk ./src/allmydata/immutable/upload.py 230 # existing shares for this storage index, which we want to know # about for accurate servers_of_happiness accounting self.readonly_peers = [] + # These peers have shares -- any shares -- for our SI. We keep track + # of these to write an error message with them later. + self.peers_with_shares = [] peers = storage_broker.get_servers_for_index(storage_index) if not peers: hunk ./src/allmydata/immutable/upload.py 317 self.bad_query_count += 1 else: buckets = res + if buckets: + self.peers_with_shares.append(peer) log.msg("response from peer %s: alreadygot=%s" % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), level=log.NOISY, parent=self._log_parent) hunk ./src/allmydata/immutable/upload.py 331 self.bad_query_count += 1 return self._existing_shares() + def _get_progress_message(self): + if not self.homeless_shares: + msg = "placed all %d shares, " % (self.total_shares) + else: + msg = ("placed %d shares out of %d total (%d homeless), " % + (self.total_shares - len(self.homeless_shares), + self.total_shares, + len(self.homeless_shares))) + return (msg + "want to place shares on at least %d servers such that " + "any %d of them have enough shares to recover the file, " + "sent %d queries to %d peers, " + "%d queries placed some shares, %d placed none " + "(of which %d placed none due to the server being" + " full and %d placed none due to an error)" % + (self.servers_of_happiness, self.needed_shares, + self.query_count, self.num_peers_contacted, + self.good_query_count, self.bad_query_count, + self.full_count, self.error_count)) + + def _loop(self): if not self.homeless_shares: effective_happiness = servers_with_unique_shares( hunk ./src/allmydata/immutable/upload.py 357 self.preexisting_shares, self.use_peers) if self.servers_of_happiness <= len(effective_happiness): - msg = ("placed all %d shares, " - "sent %d queries to %d peers, " - "%d queries placed some shares, %d placed none, " - "got %d errors" % - (self.total_shares, - self.query_count, self.num_peers_contacted, - self.good_query_count, self.bad_query_count, - self.error_count)) - log.msg("peer selection successful for %s: %s" % (self, msg), - parent=self._log_parent) + msg = ("peer selection successful for %s: %s" % (self, + self._get_progress_message())) + log.msg(msg, parent=self._log_parent) return (self.use_peers, self.preexisting_shares) else: delta = self.servers_of_happiness - len(effective_happiness) hunk ./src/allmydata/immutable/upload.py 375 if delta <= len(self.uncontacted_peers) and \ shares_to_spread >= delta: # Loop through the allocated shares, removing + # one from each server that has more than one and putting + # it back into self.homeless_shares until we've done + # this delta times. items = shares.items() while len(self.homeless_shares) < delta: servernum, sharelist = items.pop() hunk ./src/allmydata/immutable/upload.py 388 items.append((servernum, sharelist)) return self._loop() else: - raise UploadUnhappinessError("shares could only be placed " - "on %d servers (%d were requested)" % - (len(effective_happiness), - self.servers_of_happiness)) + peer_count = len(list(set(self.peers_with_shares))) + # If peer_count < needed_shares, then the second error + # message is nonsensical, so we use this one. + if peer_count < self.needed_shares: + msg = ("shares could only be placed or found on %d " + "server(s). " + "We were asked to place shares on at least %d " + "server(s) such that any %d of them have " + "enough shares to recover the file." % + (peer_count, + self.servers_of_happiness, + self.needed_shares)) + # Otherwise, if we've placed on at least needed_shares + # peers, but there isn't an x-happy subset of those peers + # for x < needed_shares, we use this error message. + elif len(effective_happiness) < self.needed_shares: + msg = ("shares could be placed or found on %d " + "server(s), but they are not spread out evenly " + "enough to ensure that any %d of these servers " + "would have enough shares to recover the file. " + "We were asked to place " + "shares on at least %d servers such that any " + "%d of them have enough shares to recover the " + "file." % + (peer_count, + self.needed_shares, + self.servers_of_happiness, + self.needed_shares)) + # Otherwise, if there is an x-happy subset of peers where + # x >= needed_shares, but x < shares_of_happiness, then + # we use this message. + else: + msg = ("shares could only be placed on %d server(s) " + "such that any %d of them have enough shares " + "to recover the file, but we were asked to use " + "at least %d such servers." % + (len(effective_happiness), + self.needed_shares, + self.servers_of_happiness)) + raise UploadUnhappinessError(msg) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) hunk ./src/allmydata/immutable/upload.py 480 self.preexisting_shares, self.use_peers) if len(effective_happiness) < self.servers_of_happiness: - msg = ("placed %d shares out of %d total (%d homeless), " - "want to place on %d servers, " - "sent %d queries to %d peers, " - "%d queries placed some shares, %d placed none " - "(of which %d placed none due to the server being" - " full and %d placed none due to an error)" % - (self.total_shares - len(self.homeless_shares), - self.total_shares, len(self.homeless_shares), - self.servers_of_happiness, - self.query_count, self.num_peers_contacted, - self.good_query_count, self.bad_query_count, - self.full_count, self.error_count)) - msg = "peer selection failed for %s: %s" % (self, msg) + msg = ("peer selection failed for %s: %s" % (self, + self._get_progress_message())) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) log.msg(msg, level=log.UNUSUAL, parent=self._log_parent) hunk ./src/allmydata/immutable/upload.py 534 self.use_peers.add(peer) progress = True + if allocated or alreadygot: + self.peers_with_shares.append(peer.peerid) + not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) hunk ./src/allmydata/immutable/upload.py 931 d = peer_selector.get_shareholders(storage_broker, secret_holder, storage_index, share_size, block_size, - num_segments, n, desired) + num_segments, n, k, desired) def _done(res): self._peer_selection_elapsed = time.time() - peer_selection_started return res } [Alter wording in 'interfaces.py' to be correct wrt #778 "Kevan Carstensen" **20091205034005 Ignore-this: c9913c700ac14e7a63569458b06980e0 ] hunk ./src/allmydata/interfaces.py 1277 def set_params(params): """Override the default encoding parameters. 'params' is a tuple of (k,d,n), where 'k' is the number of required shares, 'd' is the - shares_of_happiness, and 'n' is the total number of shares that will + servers_of_happiness, and 'n' is the total number of shares that will be created. Encoding parameters can be set in three ways. 1: The Encoder class [Fix up the behavior of #778, per reviewers' comments Kevan Carstensen **20100507221147 Ignore-this: a55aa984472284e4fd8bdbdf706c918a - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. ] { hunk ./src/allmydata/immutable/encode.py 10 from allmydata import uri from allmydata.storage.server import si_b2a from allmydata.hashtree import HashTree -from allmydata.util import mathutil, hashutil, base32, log +from allmydata.util import mathutil, hashutil, base32, log, happinessutil from allmydata.util.assertutil import _assert, precondition from allmydata.codec import CRSEncoder from allmydata.interfaces import IEncoder, IStorageBucketWriter, \ hunk ./src/allmydata/immutable/encode.py 201 assert IStorageBucketWriter.providedBy(landlords[k]) self.landlords = landlords.copy() assert isinstance(servermap, dict) + for v in servermap.itervalues(): + assert isinstance(v, set) self.servermap = servermap.copy() def start(self): hunk ./src/allmydata/immutable/encode.py 489 level=log.UNUSUAL, failure=why) if shareid in self.landlords: self.landlords[shareid].abort() + peerid = self.landlords[shareid].get_peerid() + assert peerid del self.landlords[shareid] hunk ./src/allmydata/immutable/encode.py 492 + self.servermap[shareid].remove(peerid) + if not self.servermap[shareid]: + del self.servermap[shareid] else: # even more UNUSUAL self.log("they weren't in our list of landlords", parent=ln, hunk ./src/allmydata/immutable/encode.py 499 level=log.WEIRD, umid="TQGFRw") - del(self.servermap[shareid]) - servers_left = list(set(self.servermap.values())) - if len(servers_left) < self.servers_of_happiness: - msg = "lost too many servers during upload (still have %d, want %d): %s" % \ - (len(servers_left), - self.servers_of_happiness, why) + happiness = happinessutil.servers_of_happiness(self.servermap) + if happiness < self.servers_of_happiness: + peerids = set(happinessutil.shares_by_server(self.servermap).keys()) + msg = happinessutil.failure_message(len(peerids), + self.required_shares, + self.servers_of_happiness, + happiness) + msg = "%s: %s" % (msg, why) raise UploadUnhappinessError(msg) self.log("but we can still continue with %s shares, we'll be happy " hunk ./src/allmydata/immutable/encode.py 509 - "with at least %s" % (len(servers_left), + "with at least %s" % (happiness, self.servers_of_happiness), parent=ln) hunk ./src/allmydata/immutable/encode.py 515 def _gather_responses(self, dl): d = defer.DeferredList(dl, fireOnOneErrback=True) - def _eatNotEnoughSharesError(f): + def _eatUploadUnhappinessError(f): # all exceptions that occur while talking to a peer are handled # in _remove_shareholder. That might raise UploadUnhappinessError, # which will cause the DeferredList to errback but which should hunk ./src/allmydata/immutable/encode.py 525 f.trap(UploadUnhappinessError) return None for d0 in dl: - d0.addErrback(_eatNotEnoughSharesError) + d0.addErrback(_eatUploadUnhappinessError) return d def finish_hashing(self): hunk ./src/allmydata/immutable/layout.py 245 def abort(self): return self._rref.callRemoteOnly("abort") + + def get_peerid(self): + if self._nodeid: + return self._nodeid + return None + class WriteBucketProxy_v2(WriteBucketProxy): fieldsize = 8 fieldstruct = ">Q" hunk ./src/allmydata/immutable/upload.py 16 from allmydata.storage.server import si_b2a from allmydata.immutable import encode from allmydata.util import base32, dictutil, idlib, log, mathutil +from allmydata.util.happinessutil import servers_of_happiness, \ + shares_by_server, merge_peers, \ + failure_message from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ hunk ./src/allmydata/immutable/upload.py 120 return d def query_allocated(self): - d = self._storageserver.callRemote("get_buckets", - self.storage_index) - return d + return self._storageserver.callRemote("get_buckets", + self.storage_index) def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) hunk ./src/allmydata/immutable/upload.py 137 self.buckets.update(b) return (alreadygot, set(b.keys())) -def servers_with_unique_shares(existing_shares, used_peers=None): - """ - I accept a dict of shareid -> peerid mappings (and optionally a list - of PeerTracker instances) and return a list of servers that have shares. - """ - servers = [] - existing_shares = existing_shares.copy() - if used_peers: - peerdict = {} - for peer in used_peers: - peerdict.update(dict([(i, peer.peerid) for i in peer.buckets])) - for k in peerdict.keys(): - if existing_shares.has_key(k): - # Prevent overcounting; favor the bucket, and not the - # prexisting share. - del(existing_shares[k]) - peers = list(used_peers.copy()) - # We do this because the preexisting shares list goes by peerid. - peers = [x.peerid for x in peers] - servers.extend(peers) - servers.extend(existing_shares.values()) - return list(set(servers)) - -def shares_by_server(existing_shares): - """ - I accept a dict of shareid -> peerid mappings, and return a dict - of peerid -> shareid mappings - """ - servers = {} - for server in set(existing_shares.values()): - servers[server] = set([x for x in existing_shares.keys() - if existing_shares[x] == server]) - return servers - -def should_add_server(existing_shares, server, bucket): - """ - I tell my caller whether the servers_of_happiness number will be - increased or decreased if a particular server is added as the peer - already holding a particular share. I take a dictionary, a peerid, - and a bucket as arguments, and return a boolean. - """ - old_size = len(servers_with_unique_shares(existing_shares)) - new_candidate = existing_shares.copy() - new_candidate[bucket] = server - new_size = len(servers_with_unique_shares(new_candidate)) - return old_size < new_size class Tahoe2PeerSelector: hunk ./src/allmydata/immutable/upload.py 162 @return: (used_peers, already_peers), where used_peers is a set of PeerTracker instances that have agreed to hold some shares for us (the shnum is stashed inside the PeerTracker), - and already_peers is a dict mapping shnum to a peer - which claims to already have the share. + and already_peers is a dict mapping shnum to a set of peers + which claim to already have the share. """ if self._status: hunk ./src/allmydata/immutable/upload.py 174 self.needed_shares = needed_shares self.homeless_shares = range(total_shares) - # self.uncontacted_peers = list() # peers we haven't asked yet self.contacted_peers = [] # peers worth asking again self.contacted_peers2 = [] # peers that we have asked again self._started_second_pass = False hunk ./src/allmydata/immutable/upload.py 178 self.use_peers = set() # PeerTrackers that have shares assigned to them - self.preexisting_shares = {} # sharenum -> peerid holding the share - # We don't try to allocate shares to these servers, since they've - # said that they're incapable of storing shares of the size that - # we'd want to store. We keep them around because they may have - # existing shares for this storage index, which we want to know - # about for accurate servers_of_happiness accounting - self.readonly_peers = [] - # These peers have shares -- any shares -- for our SI. We keep track - # of these to write an error message with them later. - self.peers_with_shares = [] + self.preexisting_shares = {} # shareid => set(peerids) holding shareid + # We don't try to allocate shares to these servers, since they've said + # that they're incapable of storing shares of the size that we'd want + # to store. We keep them around because they may have existing shares + # for this storage index, which we want to know about for accurate + # servers_of_happiness accounting + # (this is eventually a list, but it is initialized later) + self.readonly_peers = None + # These peers have shares -- any shares -- for our SI. We keep + # track of these to write an error message with them later. + self.peers_with_shares = set([]) hunk ./src/allmydata/immutable/upload.py 190 - peers = storage_broker.get_servers_for_index(storage_index) - if not peers: - raise NoServersError("client gave us zero peers") - # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree # (instead of a HashTree) because we don't require actual hashing hunk ./src/allmydata/immutable/upload.py 202 num_share_hashes, EXTENSION_SIZE, None) allocated_size = wbp.get_allocated_size() + all_peers = storage_broker.get_servers_for_index(storage_index) + if not all_peers: + raise NoServersError("client gave us zero peers") # filter the list of peers according to which ones can accomodate # this request. This excludes older peers (which used a 4-byte size hunk ./src/allmydata/immutable/upload.py 214 (peerid, conn) = peer v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] - new_peers = [peer for peer in peers - if _get_maxsize(peer) >= allocated_size] - old_peers = list(set(peers).difference(set(new_peers))) - peers = new_peers + writable_peers = [peer for peer in all_peers + if _get_maxsize(peer) >= allocated_size] + readonly_peers = set(all_peers) - set(writable_peers) # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. hunk ./src/allmydata/immutable/upload.py 228 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) def _make_trackers(peers): - return [ PeerTracker(peerid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, - peerid)) + return [PeerTracker(peerid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, + peerid), + bucket_cancel_secret_hash(file_cancel_secret, + peerid)) for (peerid, conn) in peers] hunk ./src/allmydata/immutable/upload.py 237 - self.uncontacted_peers = _make_trackers(peers) - self.readonly_peers = _make_trackers(old_peers) - # Talk to the readonly servers to get an idea of what servers - # have what shares (if any) for this storage index + self.uncontacted_peers = _make_trackers(writable_peers) + self.readonly_peers = _make_trackers(readonly_peers) + # We now ask peers that can't hold any new shares about existing + # shares that they might have for our SI. Once this is done, we + # start placing the shares that we haven't already accounted + # for. d = defer.maybeDeferred(self._existing_shares) d.addCallback(lambda ign: self._loop()) return d hunk ./src/allmydata/immutable/upload.py 248 def _existing_shares(self): + """ + I loop through the list of peers that aren't accepting any new + shares for this upload, asking each of them to tell me about the + shares they already have for this upload's SI. + """ if self.readonly_peers: peer = self.readonly_peers.pop() assert isinstance(peer, PeerTracker) hunk ./src/allmydata/immutable/upload.py 270 return d def _handle_existing_response(self, res, peer): + """ + I handle responses to the queries sent by + Tahoe2PeerSelector._existing_shares. + """ if isinstance(res, failure.Failure): log.msg("%s got error during existing shares check: %s" % (idlib.shortnodeid_b2a(peer), res), hunk ./src/allmydata/immutable/upload.py 283 else: buckets = res if buckets: - self.peers_with_shares.append(peer) + self.peers_with_shares.add(peer) log.msg("response from peer %s: alreadygot=%s" % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), level=log.NOISY, parent=self._log_parent) hunk ./src/allmydata/immutable/upload.py 288 for bucket in buckets: - if should_add_server(self.preexisting_shares, peer, bucket): - self.preexisting_shares[bucket] = peer - if self.homeless_shares and bucket in self.homeless_shares: - self.homeless_shares.remove(bucket) + self.preexisting_shares.setdefault(bucket, set()).add(peer) + if self.homeless_shares and bucket in self.homeless_shares: + self.homeless_shares.remove(bucket) self.full_count += 1 self.bad_query_count += 1 return self._existing_shares() hunk ./src/allmydata/immutable/upload.py 317 def _loop(self): if not self.homeless_shares: - effective_happiness = servers_with_unique_shares( - self.preexisting_shares, - self.use_peers) - if self.servers_of_happiness <= len(effective_happiness): + merged = merge_peers(self.preexisting_shares, self.use_peers) + effective_happiness = servers_of_happiness(merged) + if self.servers_of_happiness <= effective_happiness: msg = ("peer selection successful for %s: %s" % (self, self._get_progress_message())) log.msg(msg, parent=self._log_parent) hunk ./src/allmydata/immutable/upload.py 325 return (self.use_peers, self.preexisting_shares) else: - delta = self.servers_of_happiness - len(effective_happiness) + # We're not okay right now, but maybe we can fix it by + # redistributing some shares. In cases where one or two + # servers has, before the upload, all or most of the + # shares for a given SI, this can work by allowing _loop + # a chance to spread those out over the other peers, + delta = self.servers_of_happiness - effective_happiness shares = shares_by_server(self.preexisting_shares) # Each server in shares maps to a set of shares stored on it. # Since we want to keep at least one share on each server hunk ./src/allmydata/immutable/upload.py 342 in shares.items()]) if delta <= len(self.uncontacted_peers) and \ shares_to_spread >= delta: - # Loop through the allocated shares, removing - # one from each server that has more than one and putting - # it back into self.homeless_shares until we've done - # this delta times. items = shares.items() while len(self.homeless_shares) < delta: hunk ./src/allmydata/immutable/upload.py 344 - servernum, sharelist = items.pop() + # Loop through the allocated shares, removing + # one from each server that has more than one + # and putting it back into self.homeless_shares + # until we've done this delta times. + server, sharelist = items.pop() if len(sharelist) > 1: share = sharelist.pop() self.homeless_shares.append(share) hunk ./src/allmydata/immutable/upload.py 352 - del(self.preexisting_shares[share]) - items.append((servernum, sharelist)) + self.preexisting_shares[share].remove(server) + if not self.preexisting_shares[share]: + del self.preexisting_shares[share] + items.append((server, sharelist)) return self._loop() else: hunk ./src/allmydata/immutable/upload.py 358 - peer_count = len(list(set(self.peers_with_shares))) + # Redistribution won't help us; fail. + peer_count = len(self.peers_with_shares) # If peer_count < needed_shares, then the second error # message is nonsensical, so we use this one. hunk ./src/allmydata/immutable/upload.py 362 - if peer_count < self.needed_shares: - msg = ("shares could only be placed or found on %d " - "server(s). " - "We were asked to place shares on at least %d " - "server(s) such that any %d of them have " - "enough shares to recover the file." % - (peer_count, - self.servers_of_happiness, - self.needed_shares)) - # Otherwise, if we've placed on at least needed_shares - # peers, but there isn't an x-happy subset of those peers - # for x < needed_shares, we use this error message. - elif len(effective_happiness) < self.needed_shares: - msg = ("shares could be placed or found on %d " - "server(s), but they are not spread out evenly " - "enough to ensure that any %d of these servers " - "would have enough shares to recover the file. " - "We were asked to place " - "shares on at least %d servers such that any " - "%d of them have enough shares to recover the " - "file." % - (peer_count, - self.needed_shares, - self.servers_of_happiness, - self.needed_shares)) - # Otherwise, if there is an x-happy subset of peers where - # x >= needed_shares, but x < shares_of_happiness, then - # we use this message. - else: - msg = ("shares could only be placed on %d server(s) " - "such that any %d of them have enough shares " - "to recover the file, but we were asked to use " - "at least %d such servers." % - (len(effective_happiness), - self.needed_shares, - self.servers_of_happiness)) + msg = failure_message(peer_count, + self.needed_shares, + self.servers_of_happiness, + effective_happiness) raise UploadUnhappinessError(msg) if self.uncontacted_peers: hunk ./src/allmydata/immutable/upload.py 415 else: # no more peers. If we haven't placed enough shares, we fail. placed_shares = self.total_shares - len(self.homeless_shares) - effective_happiness = servers_with_unique_shares( - self.preexisting_shares, - self.use_peers) - if len(effective_happiness) < self.servers_of_happiness: + merged = merge_peers(self.preexisting_shares, self.use_peers) + effective_happiness = servers_of_happiness(merged) + if effective_happiness < self.servers_of_happiness: msg = ("peer selection failed for %s: %s" % (self, self._get_progress_message())) if self.last_failure_msg: hunk ./src/allmydata/immutable/upload.py 460 level=log.NOISY, parent=self._log_parent) progress = False for s in alreadygot: - if should_add_server(self.preexisting_shares, - peer.peerid, s): - self.preexisting_shares[s] = peer.peerid - if s in self.homeless_shares: - self.homeless_shares.remove(s) + self.preexisting_shares.setdefault(s, set()).add(peer.peerid) + if s in self.homeless_shares: + self.homeless_shares.remove(s) # the PeerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. hunk ./src/allmydata/immutable/upload.py 471 progress = True if allocated or alreadygot: - self.peers_with_shares.append(peer.peerid) + self.peers_with_shares.add(peer.peerid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) hunk ./src/allmydata/immutable/upload.py 877 def set_shareholders(self, (used_peers, already_peers), encoder): """ @param used_peers: a sequence of PeerTracker objects - @paran already_peers: a dict mapping sharenum to a peerid that - claims to already have this share + @paran already_peers: a dict mapping sharenum to a set of peerids + that claim to already have this share """ self.log("_send_shares, used_peers is %s" % (used_peers,)) # record already-present shares in self._results hunk ./src/allmydata/immutable/upload.py 893 buckets.update(peer.buckets) for shnum in peer.buckets: self._peer_trackers[shnum] = peer - servermap[shnum] = peer.peerid + servermap.setdefault(shnum, set()).add(peer.peerid) assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) encoder.set_shareholders(buckets, servermap) hunk ./src/allmydata/interfaces.py 1348 must be a dictionary that maps share number (an integer ranging from 0 to n-1) to an instance that provides IStorageBucketWriter. 'servermap' is a dictionary that maps share number (as defined above) - to a peerid. This must be performed before start() can be called.""" + to a set of peerids. This must be performed before start() can be + called.""" def start(): """Begin the encode/upload process. This involves reading encrypted addfile ./src/allmydata/util/happinessutil.py hunk ./src/allmydata/util/happinessutil.py 1 +""" +I contain utilities useful for calculating servers_of_happiness, and for +reporting it in messages +""" + +def failure_message(peer_count, k, happy, effective_happy): + # If peer_count < needed_shares, this error message makes more + # sense than any of the others, so use it. + if peer_count < k: + msg = ("shares could only be placed or found on %d " + "server(s). " + "We were asked to place shares on at least %d " + "server(s) such that any %d of them have " + "enough shares to recover the file." % + (peer_count, happy, k)) + # Otherwise, if we've placed on at least needed_shares + # peers, but there isn't an x-happy subset of those peers + # for x >= needed_shares, we use this error message. + elif effective_happy < k: + msg = ("shares could be placed or found on %d " + "server(s), but they are not spread out evenly " + "enough to ensure that any %d of these servers " + "would have enough shares to recover the file. " + "We were asked to place " + "shares on at least %d servers such that any " + "%d of them have enough shares to recover the " + "file." % + (peer_count, k, happy, k)) + # Otherwise, if there is an x-happy subset of peers where + # x >= needed_shares, but x < servers_of_happiness, then + # we use this message. + else: + msg = ("shares could only be placed on %d server(s) " + "such that any %d of them have enough shares " + "to recover the file, but we were asked to " + "place shares on at least %d such servers." % + (effective_happy, k, happy)) + return msg + + +def shares_by_server(servermap): + """ + I accept a dict of shareid -> set(peerid) mappings, and return a + dict of peerid -> set(shareid) mappings. My argument is a dictionary + with sets of peers, indexed by shares, and I transform that into a + dictionary of sets of shares, indexed by peerids. + """ + ret = {} + for shareid, peers in servermap.iteritems(): + assert isinstance(peers, set) + for peerid in peers: + ret.setdefault(peerid, set()).add(shareid) + return ret + +def merge_peers(servermap, used_peers=None): + """ + I accept a dict of shareid -> set(peerid) mappings, and optionally a + set of PeerTrackers. If no set of PeerTrackers is provided, I return + my first argument unmodified. Otherwise, I update a copy of my first + argument to include the shareid -> peerid mappings implied in the + set of PeerTrackers, returning the resulting dict. + """ + if not used_peers: + return servermap + + assert(isinstance(servermap, dict)) + assert(isinstance(used_peers, set)) + + # Since we mutate servermap, and are called outside of a + # context where it is okay to do that, make a copy of servermap and + # work with it. + servermap = servermap.copy() + for peer in used_peers: + for shnum in peer.buckets: + servermap.setdefault(shnum, set()).add(peer.peerid) + return servermap + +def servers_of_happiness(sharemap): + """ + I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I + return the 'servers_of_happiness' number that sharemap results in. + + To calculate the 'servers_of_happiness' number for the sharemap, I + construct a bipartite graph with servers in one partition of vertices + and shares in the other, and with an edge between a server s and a share t + if s is to store t. I then compute the size of a maximum matching in + the resulting graph; this is then returned as the 'servers_of_happiness' + for my arguments. + + For example, consider the following layout: + + server 1: shares 1, 2, 3, 4 + server 2: share 6 + server 3: share 3 + server 4: share 4 + server 5: share 2 + + From this, we can construct the following graph: + + L = {server 1, server 2, server 3, server 4, server 5} + R = {share 1, share 2, share 3, share 4, share 6} + V = L U R + E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), + (server 1, share 4), (server 2, share 6), (server 3, share 3), + (server 4, share 4), (server 5, share 2)} + G = (V, E) + + Note that G is bipartite since every edge in e has one endpoint in L + and one endpoint in R. + + A matching in a graph G is a subset M of E such that, for any vertex + v in V, v is incident to at most one edge of M. A maximum matching + in G is a matching that is no smaller than any other matching. For + this graph, a matching of cardinality 5 is: + + M = {(server 1, share 1), (server 2, share 6), + (server 3, share 3), (server 4, share 4), + (server 5, share 2)} + + Since G is bipartite, and since |L| = 5, we cannot have an M' such + that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and + as long as k <= 5, we can see that the layout above has + servers_of_happiness = 5, which matches the results here. + """ + if sharemap == {}: + return 0 + sharemap = shares_by_server(sharemap) + graph = flow_network_for(sharemap) + # This is an implementation of the Ford-Fulkerson method for finding + # a maximum flow in a flow network applied to a bipartite graph. + # Specifically, it is the Edmonds-Karp algorithm, since it uses a + # BFS to find the shortest augmenting path at each iteration, if one + # exists. + # + # The implementation here is an adapation of an algorithm described in + # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = residual_network(graph, flow_function) + while augmenting_path_for(residual_graph): + path = augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v): residual_function[u][v], path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, residual_function = residual_network(graph, + flow_function) + num_servers = len(sharemap) + # The value of a flow is the total flow out of the source vertex + # (vertex 0, in our graph). We could just as well sum across all of + # f[0], but we know that vertex 0 only has edges to the servers in + # our graph, so we can stop after summing flow across those. The + # value of a flow computed in this way is the size of a maximum + # matching on the bipartite graph described above. + return sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) + +def flow_network_for(sharemap): + """ + I take my argument, a dict of peerid -> set(shareid) mappings, and + turn it into a flow network suitable for use with Edmonds-Karp. I + then return the adjacency list representation of that network. + + Specifically, I build G = (V, E), where: + V = { peerid in sharemap } U { shareid in sharemap } U {s, t} + E = {(s, peerid) for each peerid} + U {(peerid, shareid) if peerid is to store shareid } + U {(shareid, t) for each shareid} + + s and t will be source and sink nodes when my caller starts treating + the graph I return like a flow network. Without s and t, the + returned graph is bipartite. + """ + # Servers don't have integral identifiers, and we can't make any + # assumptions about the way shares are indexed -- it's possible that + # there are missing shares, for example. So before making a graph, + # we re-index so that all of our vertices have integral indices, and + # that there aren't any holes. We start indexing at 1, so that we + # can add a source node at index 0. + sharemap, num_shares = reindex(sharemap, base_index=1) + num_servers = len(sharemap) + graph = [] # index -> [index], an adjacency list + # Add an entry at the top (index 0) that has an edge to every server + # in sharemap + graph.append(sharemap.keys()) + # For each server, add an entry that has an edge to every share that it + # contains (or will contain). + for k in sharemap: + graph.append(sharemap[k]) + # For each share, add an entry that has an edge to the sink. + sink_num = num_servers + num_shares + 1 + for i in xrange(num_shares): + graph.append([sink_num]) + # Add an empty entry for the sink, which has no outbound edges. + graph.append([]) + return graph + +def reindex(sharemap, base_index): + """ + Given sharemap, I map peerids and shareids to integers that don't + conflict with each other, so they're useful as indices in a graph. I + return a sharemap that is reindexed appropriately, and also the + number of distinct shares in the resulting sharemap as a convenience + for my caller. base_index tells me where to start indexing. + """ + shares = {} # shareid -> vertex index + num = base_index + ret = {} # peerid -> [shareid], a reindexed sharemap. + # Number the servers first + for k in sharemap: + ret[num] = sharemap[k] + num += 1 + # Number the shares + for k in ret: + for shnum in ret[k]: + if not shares.has_key(shnum): + shares[shnum] = num + num += 1 + ret[k] = map(lambda x: shares[x], ret[k]) + return (ret, len(shares)) + +def residual_network(graph, f): + """ + I return the residual network and residual capacity function of the + flow network represented by my graph and f arguments. graph is a + flow network in adjacency-list form, and f is a flow in graph. + """ + new_graph = [[] for i in xrange(len(graph))] + cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))] + for i in xrange(len(graph)): + for v in graph[i]: + if f[i][v] == 1: + # We add an edge (v, i) with cf[v,i] = 1. This means + # that we can remove 1 unit of flow from the edge (i, v) + new_graph[v].append(i) + cf[v][i] = 1 + cf[i][v] = -1 + else: + # We add the edge (i, v), since we're not using it right + # now. + new_graph[i].append(v) + cf[i][v] = 1 + cf[v][i] = -1 + return (new_graph, cf) + +def augmenting_path_for(graph): + """ + I return an augmenting path, if there is one, from the source node + to the sink node in the flow network represented by my graph argument. + If there is no augmenting path, I return False. I assume that the + source node is at index 0 of graph, and the sink node is at the last + index. I also assume that graph is a flow network in adjacency list + form. + """ + bfs_tree = bfs(graph, 0) + if bfs_tree[len(graph) - 1]: + n = len(graph) - 1 + path = [] # [(u, v)], where u and v are vertices in the graph + while n != 0: + path.insert(0, (bfs_tree[n], n)) + n = bfs_tree[n] + return path + return False + +def bfs(graph, s): + """ + Perform a BFS on graph starting at s, where graph is a graph in + adjacency list form, and s is a node in graph. I return the + predecessor table that the BFS generates. + """ + # This is an adaptation of the BFS described in "Introduction to + # Algorithms", Cormen et al, 2nd ed., p. 532. + # WHITE vertices are those that we haven't seen or explored yet. + WHITE = 0 + # GRAY vertices are those we have seen, but haven't explored yet + GRAY = 1 + # BLACK vertices are those we have seen and explored + BLACK = 2 + color = [WHITE for i in xrange(len(graph))] + predecessor = [None for i in xrange(len(graph))] + distance = [-1 for i in xrange(len(graph))] + queue = [s] # vertices that we haven't explored yet. + color[s] = GRAY + distance[s] = 0 + while queue: + n = queue.pop(0) + for v in graph[n]: + if color[v] == WHITE: + color[v] = GRAY + distance[v] = distance[n] + 1 + predecessor[v] = n + queue.append(v) + color[n] = BLACK + return predecessor } Context: [Dependency on Windmill test framework is not needed yet. david-sarah@jacaranda.org**20100504161043 Ignore-this: be088712bec650d4ef24766c0026ebc8 ] [tests: pass z to tar so that BSD tar will know to ungzip zooko@zooko.com**20100504090628 Ignore-this: 1339e493f255e8fc0b01b70478f23a09 ] [setup: update comments and URLs in setup.cfg zooko@zooko.com**20100504061653 Ignore-this: f97692807c74bcab56d33100c899f829 ] [setup: reorder and extend the show-tool-versions script, the better to glean information about our new buildslaves zooko@zooko.com**20100504045643 Ignore-this: 836084b56b8d4ee8f1de1f4efb706d36 ] [CLI: Support for https url in option --node-url Francois Deppierraz **20100430185609 Ignore-this: 1717176b4d27c877e6bc67a944d9bf34 This patch modifies the regular expression used for verifying of '--node-url' parameter. Support for accessing a Tahoe gateway over HTTPS was already present, thanks to Python's urllib. ] [backupdb.did_create_directory: use REPLACE INTO, not INSERT INTO + ignore error Brian Warner **20100428050803 Ignore-this: 1fca7b8f364a21ae413be8767161e32f This handles the case where we upload a new tahoe directory for a previously-processed local directory, possibly creating a new dircap (if the metadata had changed). Now we replace the old dirhash->dircap record. The previous behavior left the old record in place (with the old dircap and timestamps), so we'd never stop creating new directories and never converge on a null backup. ] ["tahoe webopen": add --info flag, to get ?t=info Brian Warner **20100424233003 Ignore-this: 126b0bb6db340fabacb623d295eb45fa Also fix some trailing whitespace. ] [docs: install.html http-equiv refresh to quickstart.html zooko@zooko.com**20100421165708 Ignore-this: 52b4b619f9dde5886ae2cd7f1f3b734b ] [docs: install.html -> quickstart.html zooko@zooko.com**20100421155757 Ignore-this: 6084e203909306bed93efb09d0e6181d It is not called "installing" because that implies that it is going to change the configuration of your operating system. It is not called "building" because that implies that you need developer tools like a compiler. Also I added a stern warning against looking at the "InstallDetails" wiki page, which I have renamed to "AdvancedInstall". ] [Fix another typo in tahoe_storagespace munin plugin david-sarah@jacaranda.org**20100416220935 Ignore-this: ad1f7aa66b554174f91dfb2b7a3ea5f3 ] [Add dependency on windmill >= 1.3 david-sarah@jacaranda.org**20100416190404 Ignore-this: 4437a7a464e92d6c9012926b18676211 ] [licensing: phrase the OpenSSL-exemption in the vocabulary of copyright instead of computer technology, and replicate the exemption from the GPL to the TGPPL zooko@zooko.com**20100414232521 Ignore-this: a5494b2f582a295544c6cad3f245e91 ] [munin-tahoe_storagespace freestorm77@gmail.com**20100221203626 Ignore-this: 14d6d6a587afe1f8883152bf2e46b4aa Plugin configuration rename ] [setup: add licensing declaration for setuptools (noticed by the FSF compliance folks) zooko@zooko.com**20100309184415 Ignore-this: 2dfa7d812d65fec7c72ddbf0de609ccb ] [setup: fix error in licensing declaration from Shawn Willden, as noted by the FSF compliance division zooko@zooko.com**20100309163736 Ignore-this: c0623d27e469799d86cabf67921a13f8 ] [CREDITS to Jacob Appelbaum zooko@zooko.com**20100304015616 Ignore-this: 70db493abbc23968fcc8db93f386ea54 ] [desert-island-build-with-proper-versions jacob@appelbaum.net**20100304013858] [docs: a few small edits to try to guide newcomers through the docs zooko@zooko.com**20100303231902 Ignore-this: a6aab44f5bf5ad97ea73e6976bc4042d These edits were suggested by my watching over Jake Appelbaum's shoulder as he completely ignored/skipped/missed install.html and also as he decided that debian.txt wouldn't help him with basic installation. Then I threw in a few docs edits that have been sitting around in my sandbox asking to be committed for months. ] [TAG allmydata-tahoe-1.6.1 david-sarah@jacaranda.org**20100228062314 Ignore-this: eb5f03ada8ea953ee7780e7fe068539 ] Patch bundle hash: 76de9ddd55631daa395a88a9dabc3c46ebc43997