Sun Apr 24 16:19:50 PDT 2011 Kevan Carstensen * immutable/upload.py: New uploader to work with an IPeerSelector object Sun Apr 24 16:20:19 PDT 2011 Kevan Carstensen * immutable/encode.py: Alter the encoder to work with tann IPeerSelector object Sun Apr 24 16:20:47 PDT 2011 Kevan Carstensen * interfaces.py: Add IPeerSelector interfaces Sun Apr 24 16:21:16 PDT 2011 Kevan Carstensen * test: fix existing tests to work with the new uploader + graph combination Mon May 2 20:49:26 PDT 2011 Kevan Carstensen * util/happinessutil.py: Abstract happiness functions behind an IPeerSelector-conforming object Mon May 2 20:50:09 PDT 2011 Kevan Carstensen * test/test_upload: Add tests for HappinessGraph object. New patches: [immutable/upload.py: New uploader to work with an IPeerSelector object Kevan Carstensen **20110424231950 Ignore-this: fb67dcc9e75e4b87a11b77d1352c7eeb ] { hunk ./src/allmydata/immutable/upload.py 16 from allmydata.storage.server import si_b2a from allmydata.immutable import encode from allmydata.util import base32, dictutil, idlib, log, mathutil -from allmydata.util.happinessutil import servers_of_happiness, \ - shares_by_server, merge_servers, \ - failure_message +from allmydata.util.happinessutil import HappinessGraph from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ hunk ./src/allmydata/immutable/upload.py 74 sharesize, blocksize, num_segments, num_share_hashes, storage_index, bucket_renewal_secret, bucket_cancel_secret): + self._server = server self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize hunk ./src/allmydata/immutable/upload.py 95 def __repr__(self): return ("" - % (self._server.name(), si_b2a(self.storage_index)[:5])) + % (idlib.shortnodeid_b2a(self.serverid), + si_b2a(self.storage_index)[:5])) def get_serverid(self): return self._server.get_serverid() hunk ./src/allmydata/immutable/upload.py 117 def ask_about_existing_shares(self): rref = self._server.get_rref() - return rref.callRemote("get_buckets", self.storage_index) + return rref.callRemote("get_buckets", + self.storage_index) def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) hunk ./src/allmydata/immutable/upload.py 156 return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),) class Tahoe2ServerSelector(log.PrefixingLogMixin): - - def __init__(self, upload_id, logparent=None, upload_status=None): + """ + I'm responsible for selecting peers for a particular upload or + repair operation. I stick with the operation until it is done. + """ + # So how do we want this to work? Well, we're basically looking at + # two tasks: repairing and uploading, since these are the two tasks + # that place shares on the grid. + # + # When we're uploading, we're placing shares on the grid that were + # never there before. In this case, we're going to be fairly strict + # about satisfying our distribution criterion. + # + # When we're repairing, we're replacing shares that were there + # before. So we're interested in finding as many of those previously + # existing shares as we can, integrating them into our matching, and + # replacing shares intelligently. We may also want to proceed even + # in the case that we can't distribute shares as well as we might + # like, because doing so will improve the health of the file. + # + # Operations like repairs, check and repairs, and so on should be + # able to hint to us explicitly that we'll be working in the repair + # state, in which case we'll be circumspect about declaring failure + # and looking for other shares. Otherwise, we'll assume that we're + # in the normal upload state, in which case we'll be fairly lax + # about looking for pre-existing shares, and strict about declaring + # success or failure. + # + # The parent upload process will initialize us with as much + # information as it has. We'll then generate a matching based on + # that information and return that to the caller. If the client + # decides to go ahead with the upload, they'll keep me around for + # the upload process. When they lose peers, they'll tell us about + # them and we'll tell them whether they should continue with the + # upload or not. We'll also record things for the upload status at + # the end of the process. + def __init__(self, upload_id, logparent=None, upload_status=None, + repair=False): self.upload_id = upload_id self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 # Servers that are working normally, but full. hunk ./src/allmydata/immutable/upload.py 202 self.last_failure_msg = None self._status = IUploadStatus(upload_status) log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) + # Don't start in the repair state by default. + # + # Note that things that we see during our selection process can + # influence our decision about whether we're in a repair mode or + # not. + self.repair = repair + + # Trigger file discovery behavior if necessary. + self._first_time = True + self._needs_repair_check = False + self.log("starting", level=log.OPERATIONAL) def __repr__(self): hunk ./src/allmydata/immutable/upload.py 223 num_segments, total_shares, needed_shares, servers_of_happiness): """ - @return: (upload_trackers, already_servers), where upload_trackers is - a set of ServerTracker instances that have agreed to hold - some shares for us (the shareids are stashed inside the - ServerTracker), and already_servers is a dict mapping shnum - to a set of serverids which claim to already have the share. + @return: A Deferred that fires when we're done selecting. + The caller can then ask me for my write buckets. """ hunk ./src/allmydata/immutable/upload.py 226 + # How does this process work? + # + # Well, essentially, we know that we have shares 1-N to + # distribute. And we know we have some servers in the storage + # broker. Given that, we want to find a nice matching. But how? + # How does the current one work? + # + # - Prunes the list of servers by those that can store our blocks. + # - Takes the first few of those servers. + # - Tries to store shares on those servers. + # - Checks. + # + # Not that great. How should it work? + # - Prunes the list of servers by those that can store our + # blocks. + # - Ask some proportion of those servers DYHB. Small + # proportion if an upload, larger and more thorough portion if + # not an upload. If upload and we find existing shares, enter + # the repair state and look more thoroughly. + # - Rework happinessutil. It should take a list of servers, a + # list of shares, and a list of edges that we found in the + # previous step, then compute the maximum matching, and give + # us a list of servers that we need to try to allocate + # buckets on. + # - Allocate buckets. If unsuccessful, goto the happinessutil + # and remove the failed server. + # - Once we're done, return the list to our caller. if self._status: self._status.set_status("Contacting Servers..") hunk ./src/allmydata/immutable/upload.py 261 self.servers_of_happiness = servers_of_happiness self.needed_shares = needed_shares - self.homeless_shares = set(range(total_shares)) - self.contacted_trackers = [] # servers worth asking again - self.contacted_trackers2 = [] # servers that we have asked again - self._started_second_pass = False + self.contacted_trackers = set() # used to make an error message. self.use_trackers = set() # ServerTrackers that have shares assigned # to them self.preexisting_shares = {} # shareid => set(serverids) holding shareid hunk ./src/allmydata/immutable/upload.py 265 - - # These servers have shares -- any shares -- for our SI. We keep - # track of these to write an error message with them later. - self.serverids_with_shares = set() - + self.allocated_shares = {} # XXX: Dubious whether this is necessary # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree # (instead of a HashTree) because we don't require actual hashing hunk ./src/allmydata/immutable/upload.py 279 None) allocated_size = wbp.get_allocated_size() all_servers = storage_broker.get_servers_for_psi(storage_index) + if not all_servers: raise NoServersError("client gave us zero servers") hunk ./src/allmydata/immutable/upload.py 291 v0 = server.get_rref().version v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] + + # We'll ask these peers to store shares later. writable_servers = [server for server in all_servers if _get_maxsize(server) >= allocated_size] hunk ./src/allmydata/immutable/upload.py 295 - readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers) # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. hunk ./src/allmydata/immutable/upload.py 304 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret, storage_index) file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, + storage_index) hunk ./src/allmydata/immutable/upload.py 306 - def _make_trackers(servers): - trackers = [] - for s in servers: - seed = s.get_lease_seed() - renew = bucket_renewal_secret_hash(file_renewal_secret, seed) - cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) - st = ServerTracker(s, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - renew, cancel) - trackers.append(st) - return trackers - self.uncontacted_trackers = _make_trackers(writable_servers) + self._trackers = {} + for server in all_servers: + seed = server.get_lease_seed() + renew = bucket_renewal_secret_hash(file_renewal_secret, seed) + cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) + self._trackers[server.get_serverid()] = ServerTracker(server, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + renew, cancel) hunk ./src/allmydata/immutable/upload.py 317 - # We don't try to allocate shares to these servers, since they've - # said that they're incapable of storing shares of the size that we'd - # want to store. We ask them about existing shares for this storage - # index, which we want to know about for accurate - # servers_of_happiness accounting, then we forget about them. - readonly_trackers = _make_trackers(readonly_servers) + # We keep track of serverids separately, since the + # HappinessGraph only cares about ids, not necessarily how we + # choose to implement our server abstractions. + self.writable_serverids = [x.get_serverid() for x in writable_servers] + self.all_serverids = [x.get_serverid() for x in all_servers] + # We return this to the client. It fires when we're done, + # returning our graph instance. + self._done = defer.Deferred() + + self._graph = HappinessGraph(needed_shares, + servers_of_happiness, + total_shares) + + self._place_shares() + + # We'll fire self._done once we know that we're done. + return self._done + + + def _find_existing_shares(self, ignored=None): + """ + I poll some of the peers that I know about for existing shares. + I fire with None when I'm done doing that. + """ + self.log("Locating existing shares") + + if (self.repair and not self._needs_repair_check) or \ + (not self.repair and not self._first_time): + self.log("no existing shares found on the last pass, continuing") + return + + if self.repair and self._needs_repair_check: + self.log("in repair mode, checking all servers for existing shares") + servers_to_ask = set(self.all_serverids) + self._needs_repair_check = False + + else: + assert self._first_time + servers_to_ask = \ + set(self.all_serverids).difference(set(self.writable_serverids)) + self.log("checking %d readonly servers" % len(servers_to_ask)) + + self._first_time = False + + # don't ask peers more than once + servers_to_ask.difference_update(self.contacted_trackers) + + servers_to_ask = map(lambda x: self._trackers[x], servers_to_ask) + + responses = [] + for server in servers_to_ask: + self.log("Asking server %s for any shares that it already has" + % (idlib.shortnodeid_b2a(server.get_serverid()))) + d = server.ask_about_existing_shares() + self.contacted_trackers.add(server.get_serverid()) + d.addBoth(self._handle_existing_response, server) + responses.append(d) hunk ./src/allmydata/immutable/upload.py 375 - # We now ask servers that can't hold any new shares about existing - # shares that they might have for our SI. Once this is done, we - # start placing the shares that we haven't already accounted - # for. - ds = [] - if self._status and readonly_trackers: - self._status.set_status("Contacting readonly servers to find " - "any existing shares") - for tracker in readonly_trackers: - assert isinstance(tracker, ServerTracker) - d = tracker.ask_about_existing_shares() - d.addBoth(self._handle_existing_response, tracker) - ds.append(d) self.num_servers_contacted += 1 self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 377 - self.log("asking server %s for any existing shares" % - (tracker.name(),), level=log.NOISY) - dl = defer.DeferredList(ds) - dl.addCallback(lambda ign: self._loop()) - return dl hunk ./src/allmydata/immutable/upload.py 378 + d = defer.DeferredList(responses) + # If we found any existing shares and need to look more + # thoroughly in another pass, this will catch it. + d.addCallback(self._find_existing_shares) + return d hunk ./src/allmydata/immutable/upload.py 384 - def _handle_existing_response(self, res, tracker): + def _handle_existing_response(self, res, server): """ I handle responses to the queries sent by hunk ./src/allmydata/immutable/upload.py 387 - Tahoe2ServerSelector._existing_shares. + Tahoe2ServerSelector._find_existing_shares. """ hunk ./src/allmydata/immutable/upload.py 389 - serverid = tracker.get_serverid() if isinstance(res, failure.Failure): hunk ./src/allmydata/immutable/upload.py 390 + serverid = server.get_serverid() self.log("%s got error during existing shares check: %s" hunk ./src/allmydata/immutable/upload.py 392 - % (tracker.name(), res), level=log.UNUSUAL) + % (idlib.shortnodeid_b2a(serverid), res), + level=log.UNUSUAL) + self._remove_server(server) self.error_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 397 + self.last_failure_msg = ("last failure (from %s) was: %s" % \ + (server, res)) + else: buckets = res hunk ./src/allmydata/immutable/upload.py 402 - if buckets: - self.serverids_with_shares.add(serverid) self.log("response to get_buckets() from server %s: alreadygot=%s" hunk ./src/allmydata/immutable/upload.py 403 - % (tracker.name(), tuple(sorted(buckets))), + % (server.name(), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: hunk ./src/allmydata/immutable/upload.py 406 - self.preexisting_shares.setdefault(bucket, set()).add(serverid) - self.homeless_shares.discard(bucket) - self.full_count += 1 - self.bad_query_count += 1 + # Add the edge (peer, shnum) somewhere where it will be + # remembered. Preexisting shares is as logical a place + # as any, but we may as well just keep a list of tuples + # around, since that's a pretty easy way to do this. + serverid = server.get_serverid() + self._graph.add_server_with_share(serverid, bucket) + if not self.repair: + self._repair = True + self._needs_repair_check = True hunk ./src/allmydata/immutable/upload.py 416 + if buckets: + self.good_query_count += 1 + + else: + self.bad_query_count += 1 + self.full_count += 1 + + + def _place_shares(self): + """ + I handle the process of placing shares. + """ + # Share placement is one or more iterations of four steps. + # + # 1: Compute a matching in a bipartite graph. + # 2: Verify the suitability of the matching for what we're + # doing. + # 3: Create any write buckets that are needed for the placement. + # 4: Create write buckets for shares that are not already + # placed. + # + # Step 2 can vary depending on what we're doing. In a repair, + # for example, we may want to allow for any matching (so long as + # there is some server out there that we can upload to) because + # placing shares on the grid, even if they are not optimally + # distributed, can help the health of the file. + # + # Step 1 should not fail under any circumstances. If step 2 + # fails, the process will abort with a failure message to the + # caller. Step 3 may fail. If it does, we'll remove the failing + # peer from our graph and return to step 1. If step 4 fails, we + # may care, but probably won't. When we're done with these + # steps, we'll fire self._done with the graph that it can use to + # check everything as the process continues. + # + # Note that this loop will eventually terminate -- we have a + # finite number of peers, and we guarantee that we remove at + # least one peer from the graph on each failure in step 3, so we + # will eventually run out of peers. + self.log("Attempting to place shares") + + if (self.repair and self._needs_repair_check) or self._first_time: + d = self._find_existing_shares() hunk ./src/allmydata/immutable/upload.py 460 - def _get_progress_message(self): - if not self.homeless_shares: - msg = "placed all %d shares, " % (self.total_shares) else: hunk ./src/allmydata/immutable/upload.py 461 - msg = ("placed %d shares out of %d total (%d homeless), " % - (self.total_shares - len(self.homeless_shares), - self.total_shares, - len(self.homeless_shares))) - return (msg + "want to place shares on at least %d servers such that " - "any %d of them have enough shares to recover the file, " - "sent %d queries to %d servers, " - "%d queries placed some shares, %d placed none " - "(of which %d placed none due to the server being" - " full and %d placed none due to an error)" % - (self.servers_of_happiness, self.needed_shares, - self.query_count, self.num_servers_contacted, - self.good_query_count, self.bad_query_count, - self.full_count, self.error_count)) + d = fireEventually() hunk ./src/allmydata/immutable/upload.py 463 + d.addCallback(lambda ignored: self._compute_matching()) + d.addCallback(self._check_matching) + d.addCallback(self._make_write_buckets) + d.addBoth(self._check_for_done) + return d hunk ./src/allmydata/immutable/upload.py 469 - def _loop(self): - if not self.homeless_shares: - merged = merge_servers(self.preexisting_shares, self.use_trackers) - effective_happiness = servers_of_happiness(merged) - if self.servers_of_happiness <= effective_happiness: - msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " - "self.use_trackers: %s, self.preexisting_shares: %s") \ - % (self, self._get_progress_message(), - pretty_print_shnum_to_servers(merged), - [', '.join([str_shareloc(k,v) - for k,v in st.buckets.iteritems()]) - for st in self.use_trackers], - pretty_print_shnum_to_servers(self.preexisting_shares)) - self.log(msg, level=log.OPERATIONAL) - return (self.use_trackers, self.preexisting_shares) - else: - # We're not okay right now, but maybe we can fix it by - # redistributing some shares. In cases where one or two - # servers has, before the upload, all or most of the - # shares for a given SI, this can work by allowing _loop - # a chance to spread those out over the other servers, - delta = self.servers_of_happiness - effective_happiness - shares = shares_by_server(self.preexisting_shares) - # Each server in shares maps to a set of shares stored on it. - # Since we want to keep at least one share on each server - # that has one (otherwise we'd only be making - # the situation worse by removing distinct servers), - # each server has len(its shares) - 1 to spread around. - shares_to_spread = sum([len(list(sharelist)) - 1 - for (server, sharelist) - in shares.items()]) - if delta <= len(self.uncontacted_trackers) and \ - shares_to_spread >= delta: - items = shares.items() - while len(self.homeless_shares) < delta: - # Loop through the allocated shares, removing - # one from each server that has more than one - # and putting it back into self.homeless_shares - # until we've done this delta times. - server, sharelist = items.pop() - if len(sharelist) > 1: - share = sharelist.pop() - self.homeless_shares.add(share) - self.preexisting_shares[share].remove(server) - if not self.preexisting_shares[share]: - del self.preexisting_shares[share] - items.append((server, sharelist)) - for writer in self.use_trackers: - writer.abort_some_buckets(self.homeless_shares) - return self._loop() - else: - # Redistribution won't help us; fail. - server_count = len(self.serverids_with_shares) - failmsg = failure_message(server_count, - self.needed_shares, - self.servers_of_happiness, - effective_happiness) - servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s" - servmsg = servmsgtempl % ( - self, - failmsg, - self._get_progress_message(), - pretty_print_shnum_to_servers(merged) - ) - self.log(servmsg, level=log.INFREQUENT) - return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) hunk ./src/allmydata/immutable/upload.py 470 - if self.uncontacted_trackers: - tracker = self.uncontacted_trackers.pop(0) - # TODO: don't pre-convert all serverids to ServerTrackers - assert isinstance(tracker, ServerTracker) + def _compute_matching(self, ignored=None): + """ + I fire when we know about a decent number of the existing + shares. I compute a matching given this information. I may be + called from a Deferred context, but can compute my results + synchronously and do not return a deferred. + """ + # How does this work? + # + # Well, at this point we've got some shares, some servers that + # we can write to, and some preexisting shares, represented as + # server-share edges. What do we want to do? + # + # - Compute a graph that represents the possibility of our + # shares and writable servers. + # + # The servers of happiness check is current written to take a + # sharemap -- a dict mapping a peer to a set of shares. We + # should assess whether this is still the best option for what + # we want to do. What we really want to do is split up the + # routines to check and to build the graph. Specifically, we + # want to define a complete graph for the set of peers that we + # know we can write to and the set of shares. Then we want to + # add edges to that graph. Then we want to run the check on + # the graph. + # + # This induces the following breakdown of methods, or + # something like it: + # + # - make_complete_bipartite_graph(shares, peers) + # - add_edge(edge, graph) + # - Or make the graph an object. + # + # - servers_of_happiness(graph) + # - Or make a method of the graph object. + # + self.log("Computing matching") hunk ./src/allmydata/immutable/upload.py 508 - shares_to_ask = set(sorted(self.homeless_shares)[:1]) - self.homeless_shares -= shares_to_ask - self.query_count += 1 - self.num_servers_contacted += 1 - if self._status: - self._status.set_status("Contacting Servers [%s] (first query)," - " %d shares left.." - % (tracker.name(), - len(self.homeless_shares))) - d = tracker.query(shares_to_ask) - d.addBoth(self._got_response, tracker, shares_to_ask, - self.contacted_trackers) - return d - elif self.contacted_trackers: - # ask a server that we've already asked. - if not self._started_second_pass: - self.log("starting second pass", - level=log.NOISY) - self._started_second_pass = True - num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_trackers)) - tracker = self.contacted_trackers.pop(0) - shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) - self.homeless_shares -= shares_to_ask + # Only give the peer selector as many peers as it would need to + # maximize happiness. This helps to ensure that the shares are + # where the downloader expects them to be. + self.log("adding %d servers to the graph" % len(self.writable_serverids)) + self._graph.add_servers(self.writable_serverids) + + # Our successors will check this result to see that it is okay + # -- our only job is to compute it. + return self._graph.get_share_assignments() + + + def _check_matching(self, share_assignments): + """ + I check that the matching is okay. + """ + #self.log("Done computing matching and share assignments, " + # "got happiness %d" % happiness) + if self._graph.is_healthy(): + # Try to repair regardless of how healthy the file will be + # when we're done, since it will be more healthy than it + # was. + self.log("Upload is correctly distributed, continuing") + return share_assignments + + if self.repair and len(share_assignments) >= 1: + # If we have a matching at all (meaning that there is at + # least one peer still out there), return True -- we'll be + # helping the file availability. + self.log("Upload is not correctly distributed, but we're " + "repairing so we'll continue the process anyway") + return share_assignments + + # Otherwise, refuse to upload + self.log("Upload fails happiness criterion and we're not repairing, " + "dying") + + # Get the number of peers that we're using. + msg = self._graph.get_failure_message() + msg = "server selection failed for %s: %s (%s)" % \ + (self, msg, self._get_progress_message()) + + # We keep track of the failures that we see, and include the + # last one in the failure message if we have it. This can help + # in debugging. + if self.last_failure_msg: + msg += "\nlast failure was: %s" % self.last_failure_msg + + raise UploadUnhappinessError(msg) + + + def _make_write_buckets(self, share_assignments): + """ + I make a list of write buckets, which I can then return to my + callers who need them to do things. + """ + # So here we're going to ask self.g for the edges that it used + # in its matching. For each of those edges, we need to ensure + # that the peer holds the share. If the peer already holds the + # share, then we don't have to worry about it. Otherwise, we've + # got to use the PeerTracker to ask it to hold the share. + self.log("Got share assignments, starting to allocate shares") + + ds = [] + + for server in share_assignments: + server_tracker = self._trackers[server] + + shares = set(share_assignments[server]) + d = server_tracker.query(shares) self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 578 - if self._status: - self._status.set_status("Contacting Servers [%s] (second query)," - " %d shares left.." - % (tracker.name(), - len(self.homeless_shares))) - d = tracker.query(shares_to_ask) - d.addBoth(self._got_response, tracker, shares_to_ask, - self.contacted_trackers2) - return d - elif self.contacted_trackers2: - # we've finished the second-or-later pass. Move all the remaining - # servers back into self.contacted_trackers for the next pass. - self.contacted_trackers.extend(self.contacted_trackers2) - self.contacted_trackers2[:] = [] - return self._loop() - else: - # no more servers. If we haven't placed enough shares, we fail. - merged = merge_servers(self.preexisting_shares, self.use_trackers) - effective_happiness = servers_of_happiness(merged) - if effective_happiness < self.servers_of_happiness: - msg = failure_message(len(self.serverids_with_shares), - self.needed_shares, - self.servers_of_happiness, - effective_happiness) - msg = ("server selection failed for %s: %s (%s)" % - (self, msg, self._get_progress_message())) - if self.last_failure_msg: - msg += " (%s)" % (self.last_failure_msg,) - self.log(msg, level=log.UNUSUAL) - return self._failed(msg) - else: - # we placed enough to be happy, so we're done - if self._status: - self._status.set_status("Placed all shares") - msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, - self._get_progress_message(), pretty_print_shnum_to_servers(merged))) - self.log(msg, level=log.OPERATIONAL) - return (self.use_trackers, self.preexisting_shares) + self.contacted_trackers.add(server_tracker.get_serverid()) + d.addBoth(self._handle_response, server_tracker, + shares) + ds.append(d) + + # Once we satisfy the dispersal requirement, we also need to + # check for any shares that didn't get used in the maximal + # matching and put those somewhere. + return defer.DeferredList(ds) + + + def _check_for_done(self, results): + """ + I check to see whether we're done. If we are, I fire self._done + with our graph instance. If we aren't, I cause self._done to + errback with a message explaining why. + """ + self.log("Done asking servers; checking to see if we're done") + if isinstance(results, failure.Failure): + # We're checking for doneness because of an exception, most + # likely a happiness error. Raised and uncaught exceptions + # at this point signify upload failure, so we'll pass them + # to our caller and let them deal with it. + return self._done.errback(results) + + if self._graph.needs_recomputation(): + # We lost one or more servers during the last upload cycle. + # Re-do the loop. + return self._place_shares() + + # Otherwise, we're done. We need to tell the graph about the + # ServerTrackers that we're using so that the other methods can + # get them later. + self._graph.set_trackers(self.use_trackers) + self._cancel_unused_assignments() + self._done.callback(self._graph) + hunk ./src/allmydata/immutable/upload.py 616 - def _got_response(self, res, tracker, shares_to_ask, put_tracker_here): + def _cancel_unused_assignments(self): + unused = self._graph.get_obsolete_allocations() + for server in unused: + t = self._trackers[server] + t.abort_some_buckets(list(unused[server])) + + # TODO: remove from self.use_peers + + + def _handle_response(self, res, server, shares_to_ask): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. hunk ./src/allmydata/immutable/upload.py 629 - self.log("%s got error during server selection: %s" % (tracker, res), - level=log.UNUSUAL) + self.log("%s got error during server selection: %s" % + (server.name(), res), + level=log.UNUSUAL) + self._remove_server(server) self.error_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 635 - self.homeless_shares |= shares_to_ask - if (self.uncontacted_trackers - or self.contacted_trackers - or self.contacted_trackers2): - # there is still hope, so just loop - pass - else: - # No more servers, so this upload might fail (it depends upon - # whether we've hit servers_of_happiness or not). Log the last - # failure we got: if a coding error causes all servers to fail - # in the same way, this allows the common failure to be seen - # by the uploader and should help with debugging - msg = ("last failure (from %s) was: %s" % (tracker, res)) - self.last_failure_msg = msg + self.last_failure_msg = ("last failure (from %s) was: %s" % \ + (server.name(), res)) else: hunk ./src/allmydata/immutable/upload.py 638 + # Did we allocate the shares? (alreadygot, allocated) = res hunk ./src/allmydata/immutable/upload.py 640 - self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" - % (tracker.name(), - tuple(sorted(alreadygot)), tuple(sorted(allocated))), - level=log.NOISY) - progress = False - for s in alreadygot: - self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid()) - if s in self.homeless_shares: - self.homeless_shares.remove(s) - progress = True - elif s in shares_to_ask: - progress = True + # First, we worry about query counting. We define this query as + # good (or perhaps productive) if it resulted in shares being + # removed from homeless shares. Otherwise, it is bad. If we + # got to this point without allocating shares, we can assume + # that the server is full -- or at least not accepting + # shares from us -- because it still works and refused to + # accept the share. + shares_for_query = set(alreadygot) + shares_for_query.update(set(allocated)) hunk ./src/allmydata/immutable/upload.py 650 - # the ServerTracker will remember which shares were allocated on - # that peer. We just have to remember to use them. - if allocated: - self.use_trackers.add(tracker) - progress = True + asked_for_shares = set(shares_to_ask) hunk ./src/allmydata/immutable/upload.py 652 - if allocated or alreadygot: - self.serverids_with_shares.add(tracker.get_serverid()) + serverid = server.get_serverid() hunk ./src/allmydata/immutable/upload.py 654 - not_yet_present = set(shares_to_ask) - set(alreadygot) - still_homeless = not_yet_present - set(allocated) - - if progress: - # They accepted at least one of the shares that we asked - # them to accept, or they had a share that we didn't ask - # them to accept but that we hadn't placed yet, so this - # was a productive query + # If asked_for_shares is a subset of shares_for_query, then + # we were productive. Otherwise, we weren't. + # XXX: Not necessarily. + if asked_for_shares.issubset(shares_for_query): self.good_query_count += 1 hunk ./src/allmydata/immutable/upload.py 659 + else: self.bad_query_count += 1 self.full_count += 1 hunk ./src/allmydata/immutable/upload.py 664 - if still_homeless: - # In networks with lots of space, this is very unusual and - # probably indicates an error. In networks with servers that - # are full, it is merely unusual. In networks that are very - # full, it is common, and many uploads will fail. In most - # cases, this is obviously not fatal, and we'll just use some - # other servers. + self.log("Response from server %s: alreadygot %s, allocated %s" % + (server.name(), alreadygot, allocated)) + if alreadygot: + # Hm, we're finding shares for the SI. That's normal if we're + # in repair mode, and says that we should be in repair mode if + # we're not. + if not self.repair: + self.repair = True + # If we've already done a repair check, we don't + # need to do another one. + if not self._needs_repair_check: + self._needs_repair_check = True + + for shnum in alreadygot: + self._graph.add_server_with_share(serverid, shnum) + + if not allocated: + # Uh oh, they ran out of space. We'll remove them + # from consideration, and then add any already + # existing shaes back in, then recompute. + self._graph.mark_full_server(serverid) + if serverid in self.writable_serverids: + self.writable_serverids.remove(serverid) + # The graph will take care of the process of + # regenerating what we've done. hunk ./src/allmydata/immutable/upload.py 690 - # some shares are still homeless, keep trying to find them a - # home. The ones that were rejected get first priority. - self.homeless_shares |= still_homeless - # Since they were unable to accept all of our requests, so it - # is safe to assume that asking them again won't help. else: hunk ./src/allmydata/immutable/upload.py 691 - # if they *were* able to accept everything, they might be - # willing to accept even more. - put_tracker_here.append(tracker) + self.use_trackers.add(server) + for a in allocated: + if a in self.allocated_shares: + # We need to cancel the existing PeerTracker. + t = self.allocated_shares[a] + t.abort_some_buckets([a]) + self.allocated_shares[a] = server + + self._graph.confirm_share_allocation(serverid, a) + hunk ./src/allmydata/immutable/upload.py 702 - # now loop - return self._loop() + def _remove_server(self, server): + """ + I handle the event when a server breaks during share assignment. I + remove that server from the graph, mark all of its shares as + homeless, and remove any preexisting edges that I know about for + that server. + """ + self.log("in remove server for bad server %s" % server.name()) + serverid = server.get_serverid() + self._graph.mark_bad_server(serverid) + + if serverid in self.all_serverids: + self.all_serverids.remove(serverid) + + if serverid in self.writable_serverids: + self.writable_serverids.remove(serverid) + + if server in self.use_trackers: + self.use_trackers.remove(server) + + + def _get_progress_message(self): + homeless_shares = self._graph.get_homeless_shares() + if not homeless_shares: + msg = "placed all %d shares, " % (self.total_shares) + else: + msg = ("placed %d shares out of %d total (%d homeless), " % + (self.total_shares - len(homeless_shares), + self.total_shares, + len(homeless_shares))) + return (msg + "want to place shares on at least %d servers such that " + "any %d of them have enough shares to recover the file, " + "sent %d queries to %d servers, " + "%d queries placed some shares, %d placed none " + "(of which %d placed none due to the server being" + " full and %d placed none due to an error)" % + (self.servers_of_happiness, self.needed_shares, + self.query_count, len(self.contacted_trackers), + self.good_query_count, self.bad_query_count, + self.full_count, self.error_count)) def _failed(self, msg): hunk ./src/allmydata/immutable/upload.py 751 place shares for this file. I then raise an UploadUnhappinessError with my msg argument. """ - for tracker in self.use_trackers: - assert isinstance(tracker, ServerTracker) - tracker.abort() + for server in self.use_trackers: + assert isinstance(server, ServerTracker) + + server.abort() + raise UploadUnhappinessError(msg) hunk ./src/allmydata/immutable/upload.py 1127 d.addCallback(_done) return d - def set_shareholders(self, (upload_trackers, already_servers), encoder): + def set_shareholders(self, happiness_graph, encoder): """ hunk ./src/allmydata/immutable/upload.py 1129 - @param upload_trackers: a sequence of ServerTracker objects that - have agreed to hold some shares for us (the - shareids are stashed inside the ServerTracker) - @paran already_servers: a dict mapping sharenum to a set of serverids - that claim to already have this share + I fire when peer selection is done. I take the happiness graph + that the peer selector made (which knows which peers hold which + shares), and tell my encoder about that. The Encoder will then + try to upload those shares, using the happiness to deal with + failures along the way. """ hunk ./src/allmydata/immutable/upload.py 1135 - msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s" - values = ([', '.join([str_shareloc(k,v) - for k,v in st.buckets.iteritems()]) - for st in upload_trackers], already_servers) - self.log(msgtempl % values, level=log.OPERATIONAL) - # record already-present shares in self._results - self._results.preexisting_shares = len(already_servers) + # TODO: We need a way to get preexisting shares. + self._server_trackers = {} # k: shnum, v: instance of PeerTracker + upload_servers = happiness_graph.get_trackers() + for server in upload_servers: + assert isinstance(server, ServerTracker) hunk ./src/allmydata/immutable/upload.py 1141 - self._server_trackers = {} # k: shnum, v: instance of ServerTracker - for tracker in upload_trackers: - assert isinstance(tracker, ServerTracker) buckets = {} hunk ./src/allmydata/immutable/upload.py 1142 - servermap = already_servers.copy() - for tracker in upload_trackers: - buckets.update(tracker.buckets) - for shnum in tracker.buckets: - self._server_trackers[shnum] = tracker - servermap.setdefault(shnum, set()).add(tracker.get_serverid()) - assert len(buckets) == sum([len(tracker.buckets) - for tracker in upload_trackers]), \ + + for server in upload_servers: + buckets.update(server.buckets) + for shnum in server.buckets: + self._server_trackers[shnum] = server + assert len(buckets) == sum([len(server.buckets) for server in upload_servers]), \ "%s (%s) != %s (%s)" % ( len(buckets), buckets, hunk ./src/allmydata/immutable/upload.py 1151 - sum([len(tracker.buckets) for tracker in upload_trackers]), - [(t.buckets, t.get_serverid()) for t in upload_trackers] + sum([len(server.buckets) for server in upload_servers]), + [(s.buckets, s.serverid) for s in upload_servers] ) hunk ./src/allmydata/immutable/upload.py 1154 - encoder.set_shareholders(buckets, servermap) + encoder.set_shareholders(buckets, happiness_graph) def _encrypted_done(self, verifycap): """ Returns a Deferred that will fire with the UploadResults instance. """ } [immutable/encode.py: Alter the encoder to work with tann IPeerSelector object Kevan Carstensen **20110424232019 Ignore-this: d3206059675f684e7b97dafb04895e9e ] { hunk ./src/allmydata/immutable/encode.py 195 else: raise KeyError("unknown parameter name '%s'" % name) - def set_shareholders(self, landlords, servermap): + def set_shareholders(self, landlords, graph): assert isinstance(landlords, dict) for k in landlords: assert IStorageBucketWriter.providedBy(landlords[k]) hunk ./src/allmydata/immutable/encode.py 200 self.landlords = landlords.copy() - assert isinstance(servermap, dict) - for v in servermap.itervalues(): - assert isinstance(v, set) - self.servermap = servermap.copy() + self.g = graph + # TODO: Use this. + self.servermap = {} def start(self): """ Returns a Deferred that will fire with the verify cap (an instance of hunk ./src/allmydata/immutable/encode.py 484 self.landlords[shareid].abort() peerid = self.landlords[shareid].get_peerid() assert peerid + del self.landlords[shareid] hunk ./src/allmydata/immutable/encode.py 486 - self.servermap[shareid].remove(peerid) - if not self.servermap[shareid]: - del self.servermap[shareid] + self.g.mark_bad_server(peerid) else: # even more UNUSUAL self.log("they weren't in our list of landlords", parent=ln, hunk ./src/allmydata/immutable/encode.py 491 level=log.WEIRD, umid="TQGFRw") - happiness = happinessutil.servers_of_happiness(self.servermap) - if happiness < self.servers_of_happiness: - peerids = set(happinessutil.shares_by_server(self.servermap).keys()) - msg = happinessutil.failure_message(len(peerids), - self.required_shares, - self.servers_of_happiness, - happiness) - msg = "%s: %s" % (msg, why) - raise UploadUnhappinessError(msg) + # Ask the graph whether it makes sense to continue our work. + self.g.get_share_assignments(shallow=True) + if not self.g.is_healthy(): + edges = self.g.get_share_assignments() + peers = [x[0] for x in edges] + peerids = set(peers) + + msg = self.g.get_failure_message() + raise UploadUnhappinessError("%s: %s" % (msg, why)) + self.log("but we can still continue with %s shares, we'll be happy " hunk ./src/allmydata/immutable/encode.py 502 - "with at least %s" % (happiness, + "with at least %s" % (5, # XXX self.servers_of_happiness), parent=ln) } [interfaces.py: Add IPeerSelector interfaces Kevan Carstensen **20110424232047 Ignore-this: 63837f056d3f7753e76ca8105443e093 ] hunk ./src/allmydata/interfaces.py 606 """I am a node which represents a file: a sequence of bytes. I am not a container, like IDirectoryNode.""" +class IPeerSelector(Interface): + """ + I select peers for an upload, maximizing some measure of health. + + I keep track of the state of a grid relative to a file. This means + that I know about all of the peers that parts of that file could be + placed on, and about shares that have been placed on those peers. + Given this, I assign shares to peers in a way that maximizes the + file's health according to whichever definition of health I am + programmed with. I tell the uploader whether or not my assignment is + healthy. I keep track of failures during the process and update my + conclusions appropriately. + """ + def add_peer_with_share(peerid, shnum): + """ + Update my internal state to reflect the fact that peer peerid + holds share shnum. Called for shares that are detected before + peer selection begins. + """ + + def confirm_share_allocation(peerid, shnum): + """ + Confirm that an allocated peer=>share pairing has been + successfully established. + """ + + def add_peers(peerids=set): + """ + Update my internal state to include the peers in peerids as + potential candidates for storing a file. + """ + + def mark_full_peer(peerid): + """ + Mark the peer peerid as full. This means that any + peer-with-share relationships I know about for peerid remain + valid, but that peerid will not be assigned any new shares. + """ + + def mark_bad_peer(peerid): + """ + Mark the peer peerid as bad. This is typically called when an + error is encountered when communicating with a peer. I will + disregard any existing peer => share relationships associated + with peerid, and will not attempt to assign it any more shares. + """ + + def get_share_assignments(shallow=False): + """ + Given what I know about the state of the grid, I'll attempt to + assign shares to peers in a way that maximizes my definition of + file health. I'll return a list of (share, peerid) tuples with + my decision. + + I have an optional shallow parameter. If True, I will not + attempt to assign any new shares before checking happiness; + instead, I will rely only on the shares that I know are already + placed. + """ + + def is_healthy(): + """ + I return whether the share assignments I'm currently using + reflect a healthy file, based on my internal definitions. + """ + + def needs_recomputation(): + """ + I return True if the share assignments I last returned may have + become stale. This is a hint to the caller that they should call + get_share_assignments again. + """ + class IImmutableFileNode(IFileNode): def read(consumer, offset=0, size=None): """Download a portion (possibly all) of the file's contents, making [test: fix existing tests to work with the new uploader + graph combination Kevan Carstensen **20110424232116 Ignore-this: 5adfadd33b693d06997183f70c1cfa1c ] { hunk ./src/allmydata/test/test_cli.py 2415 # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" - in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7vqgd) overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): hunk ./src/allmydata/test/test_download.py 297 # find the shares that were used and delete them shares = self.n._cnode._node._shares shnums = sorted([s._shnum for s in shares]) - self.failUnlessEqual(shnums, [0,1,2,3]) + self.failUnlessEqual(len(shnums), 4) + self._dead_shnums = shnums # break the RIBucketReader references # (we don't break the RIStorageServer references, because that hunk ./src/allmydata/test/test_download.py 315 self.failUnlessEqual("".join(c.chunks), plaintext) shares = self.n._cnode._node._shares shnums = sorted([s._shnum for s in shares]) - self.failIfEqual(shnums, [0,1,2,3]) + self.failIfEqual(shnums, self._dead_shnums) d.addCallback(_check_failover) return d hunk ./src/allmydata/test/test_upload.py 148 for shnum in sharenums]), ) + def get_buckets(self, storage_index): + return [x[1] for x in filter(lambda x: x[0] == storage_index, + self.allocated)] + + class FakeBucketWriter: # a diagnostic version of storageserver.BucketWriter def __init__(self, size): hunk ./src/allmydata/test/test_upload.py 418 def test_first_error_all(self): self.make_node("first-fail") + self.set_encoding_parameters(k=25, happy=1, n=100) d = self.shouldFail(UploadUnhappinessError, "first_error_all", "server selection failed", upload_data, self.u, DATA) hunk ./src/allmydata/test/test_upload.py 444 # enough to ensure a second pass with 100 shares). mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)]) self.make_node(mode, 40) + self.set_encoding_parameters(k=25, happy=1, n=100) d = upload_data(self.u, DATA) d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) hunk ./src/allmydata/test/test_upload.py 452 def test_second_error_all(self): self.make_node("second-fail") - d = self.shouldFail(UploadUnhappinessError, "second_error_all", + self.set_encoding_parameters(k=25, happy=1, n=100) + # The upload won't ask any peer to allocate shares more than once on + # the first try, so we have to upload twice to see that the + # errors are detected. + d = upload_data(self.u, DATA) + d.addCallback(lambda ignored: + self.shouldFail(UploadUnhappinessError, "second_error_all", "server selection failed", hunk ./src/allmydata/test/test_upload.py 460 - upload_data, self.u, DATA) + upload_data, self.u, DATA)) def _check((f,)): self.failUnlessIn("placed 10 shares out of 100 total", str(f.value)) # there should also be a 'last failure was' message hunk ./src/allmydata/test/test_upload.py 465 self.failUnlessIn("ServerError", str(f.value)) - d.addCallback(_check) + #d.addCallback(_check) return d class FullServer(unittest.TestCase): hunk ./src/allmydata/test/test_upload.py 545 for s in self.node.last_servers: allocated = s.allocated self.failUnlessEqual(len(allocated), 2) - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s.queries, 1) d.addCallback(_check) return d hunk ./src/allmydata/test/test_upload.py 569 self.failUnlessEqual(s.queries, 1) got_one.append(s) else: - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s.queries, 1) got_two.append(s) self.failUnlessEqual(len(got_one), 49) self.failUnlessEqual(len(got_two), 1) hunk ./src/allmydata/test/test_upload.py 593 for s in self.node.last_servers: allocated = s.allocated self.failUnlessEqual(len(allocated), 4) - self.failUnlessEqual(s.queries, 2) + self.failUnlessEqual(s.queries, 1) d.addCallback(_check) return d hunk ./src/allmydata/test/test_upload.py 796 d = selector.get_shareholders(broker, sh, storage_index, share_size, block_size, num_segments, 10, 3, 4) - def _have_shareholders((upload_trackers, already_servers)): - assert servers_to_break <= len(upload_trackers) + def _have_shareholders(graph): + assert isinstance(graph, HappinessGraph) + + upload_servers = graph.get_trackers() + + assert servers_to_break <= len(upload_servers) for index in xrange(servers_to_break): hunk ./src/allmydata/test/test_upload.py 803 - tracker = list(upload_trackers)[index] - for share in tracker.buckets.keys(): - tracker.buckets[share].abort() + server = list(upload_servers)[index] + for share in server.buckets.keys(): + server.buckets[share].abort() buckets = {} hunk ./src/allmydata/test/test_upload.py 807 - servermap = already_servers.copy() - for tracker in upload_trackers: - buckets.update(tracker.buckets) - for bucket in tracker.buckets: - servermap.setdefault(bucket, set()).add(tracker.get_serverid()) - encoder.set_shareholders(buckets, servermap) + for server in upload_servers: + buckets.update(server.buckets) + + encoder.set_shareholders(buckets, graph) d = encoder.start() return d d.addCallback(_have_shareholders) hunk ./src/allmydata/test/test_upload.py 906 self.failUnlessEqual(data["count-shares-expected"], 12) d.addCallback(_check) return d + test_configure_parameters.timeout = 10 def _setUp(self, ns): hunk ./src/allmydata/test/test_upload.py 971 d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d + test_aborted_shares.timeout = 20 def test_problem_layout_comment_52(self): hunk ./src/allmydata/test/test_upload.py 1210 d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d - test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release." def test_happiness_with_some_readonly_servers(self): # Try the following layout hunk ./src/allmydata/test/test_upload.py 1330 d.addCallback(_remove_server) d.addCallback(lambda ign: self.shouldFail(UploadUnhappinessError, - "test_dropped_servers_in_encoder", + "test_dropped_servers_in_encoder_1", "shares could be placed on only 3 server(s) " "such that any 3 of them have enough shares to " "recover the file, but we were asked to place " hunk ./src/allmydata/test/test_upload.py 1359 d.addCallback(_remove_server) d.addCallback(lambda ign: self.shouldFail(UploadUnhappinessError, - "test_dropped_servers_in_encoder", + "test_dropped_servers_in_encoder_2", "shares could be placed on only 3 server(s) " "such that any 3 of them have enough shares to " "recover the file, but we were asked to place " hunk ./src/allmydata/test/test_upload.py 1367 self._do_upload_with_broken_servers, 2)) return d - - def test_merge_servers(self): - # merge_servers merges a list of upload_servers and a dict of - # shareid -> serverid mappings. - shares = { - 1 : set(["server1"]), - 2 : set(["server2"]), - 3 : set(["server3"]), - 4 : set(["server4", "server5"]), - 5 : set(["server1", "server2"]), - } - # if not provided with a upload_servers argument, it should just - # return the first argument unchanged. - self.failUnlessEqual(shares, merge_servers(shares, set([]))) - trackers = [] - for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: - t = FakeServerTracker(server, [i]) - trackers.append(t) - expected = { - 1 : set(["server1"]), - 2 : set(["server2"]), - 3 : set(["server3"]), - 4 : set(["server4", "server5"]), - 5 : set(["server1", "server2", "server5"]), - 6 : set(["server6"]), - 7 : set(["server7"]), - 8 : set(["server8"]), - } - self.failUnlessEqual(expected, merge_servers(shares, set(trackers))) - shares2 = {} - expected = { - 5 : set(["server5"]), - 6 : set(["server6"]), - 7 : set(["server7"]), - 8 : set(["server8"]), - } - self.failUnlessEqual(expected, merge_servers(shares2, set(trackers))) - shares3 = {} - trackers = [] - expected = {} - for (i, server) in [(i, "server%d" % i) for i in xrange(10)]: - shares3[i] = set([server]) - t = FakeServerTracker(server, [i]) - trackers.append(t) - expected[i] = set([server]) - self.failUnlessEqual(expected, merge_servers(shares3, set(trackers))) - - - def test_servers_of_happiness_utility_function(self): - # These tests are concerned with the servers_of_happiness() - # utility function, and its underlying matching algorithm. Other - # aspects of the servers_of_happiness behavior are tested - # elsehwere These tests exist to ensure that - # servers_of_happiness doesn't under or overcount the happiness - # value for given inputs. - - # servers_of_happiness expects a dict of - # shnum => set(serverids) as a preexisting shares argument. - test1 = { - 1 : set(["server1"]), - 2 : set(["server2"]), - 3 : set(["server3"]), - 4 : set(["server4"]) - } - happy = servers_of_happiness(test1) - self.failUnlessEqual(4, happy) - test1[4] = set(["server1"]) - # We've added a duplicate server, so now servers_of_happiness - # should be 3 instead of 4. - happy = servers_of_happiness(test1) - self.failUnlessEqual(3, happy) - # The second argument of merge_servers should be a set of objects with - # serverid and buckets as attributes. In actual use, these will be - # ServerTracker instances, but for testing it is fine to make a - # FakeServerTracker whose job is to hold those instance variables to - # test that part. - trackers = [] - for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: - t = FakeServerTracker(server, [i]) - trackers.append(t) - # Recall that test1 is a server layout with servers_of_happiness - # = 3. Since there isn't any overlap between the shnum -> - # set([serverid]) correspondences in test1 and those in trackers, - # the result here should be 7. - test2 = merge_servers(test1, set(trackers)) - happy = servers_of_happiness(test2) - self.failUnlessEqual(7, happy) - # Now add an overlapping server to trackers. This is redundant, - # so it should not cause the previously reported happiness value - # to change. - t = FakeServerTracker("server1", [1]) - trackers.append(t) - test2 = merge_servers(test1, set(trackers)) - happy = servers_of_happiness(test2) - self.failUnlessEqual(7, happy) - test = {} - happy = servers_of_happiness(test) - self.failUnlessEqual(0, happy) - # Test a more substantial overlap between the trackers and the - # existing assignments. - test = { - 1 : set(['server1']), - 2 : set(['server2']), - 3 : set(['server3']), - 4 : set(['server4']), - } - trackers = [] - t = FakeServerTracker('server5', [4]) - trackers.append(t) - t = FakeServerTracker('server6', [3, 5]) - trackers.append(t) - # The value returned by servers_of_happiness is the size - # of a maximum matching in the bipartite graph that - # servers_of_happiness() makes between serverids and share - # numbers. It should find something like this: - # (server 1, share 1) - # (server 2, share 2) - # (server 3, share 3) - # (server 5, share 4) - # (server 6, share 5) - # - # and, since there are 5 edges in this matching, it should - # return 5. - test2 = merge_servers(test, set(trackers)) - happy = servers_of_happiness(test2) - self.failUnlessEqual(5, happy) - # Zooko's first puzzle: - # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156) - # - # server 1: shares 0, 1 - # server 2: shares 1, 2 - # server 3: share 2 - # - # This should yield happiness of 3. - test = { - 0 : set(['server1']), - 1 : set(['server1', 'server2']), - 2 : set(['server2', 'server3']), - } - self.failUnlessEqual(3, servers_of_happiness(test)) - # Zooko's second puzzle: - # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158) - # - # server 1: shares 0, 1 - # server 2: share 1 - # - # This should yield happiness of 2. - test = { - 0 : set(['server1']), - 1 : set(['server1', 'server2']), - } - self.failUnlessEqual(2, servers_of_happiness(test)) - - - def test_shares_by_server(self): - test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)]) - sbs = shares_by_server(test) - self.failUnlessEqual(set([1]), sbs["server1"]) - self.failUnlessEqual(set([2]), sbs["server2"]) - self.failUnlessEqual(set([3]), sbs["server3"]) - self.failUnlessEqual(set([4]), sbs["server4"]) - test1 = { - 1 : set(["server1"]), - 2 : set(["server1"]), - 3 : set(["server1"]), - 4 : set(["server2"]), - 5 : set(["server2"]) - } - sbs = shares_by_server(test1) - self.failUnlessEqual(set([1, 2, 3]), sbs["server1"]) - self.failUnlessEqual(set([4, 5]), sbs["server2"]) - # This should fail unless the serverid part of the mapping is a set - test2 = {1: "server1"} - self.shouldFail(AssertionError, - "test_shares_by_server", - "", - shares_by_server, test2) - - def test_existing_share_detection(self): self.basedir = self.mktemp() d = self._setup_and_upload() hunk ./src/allmydata/test/test_upload.py 1403 d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d + test_existing_share_detection.timeout = 20 def test_query_counting(self): hunk ./src/allmydata/test/test_upload.py 1407 - # If server selection fails, Tahoe2ServerSelector prints out a lot + # If server selection fails, Tahoe2PeerSelector prints out a lot # of helpful diagnostic information, including query stats. # This test helps make sure that that information is accurate. self.basedir = self.mktemp() hunk ./src/allmydata/test/test_upload.py 1426 d.addCallback(_setup) d.addCallback(lambda c: self.shouldFail(UploadUnhappinessError, "test_query_counting", - "10 queries placed some shares", + "0 queries placed some shares", c.upload, upload.Data("data" * 10000, convergence=""))) # Now try with some readonly servers. We want to make sure that hunk ./src/allmydata/test/test_upload.py 1501 d.addCallback(lambda client: self.shouldFail(UploadUnhappinessError, "test_upper_limit_on_readonly_queries", - "sent 8 queries to 8 servers", + "sent 10 queries to 10 servers", client.upload, upload.Data('data' * 10000, convergence=""))) return d hunk ./src/allmydata/test/test_upload.py 1535 return client d.addCallback(_reset_encoding_parameters) d.addCallback(lambda client: - self.shouldFail(UploadUnhappinessError, "test_selection_exceptions", + self.shouldFail(UploadUnhappinessError, + "test_selection_exceptions_1", "placed 0 shares out of 10 " "total (10 homeless), want to place shares on at " "least 4 servers such that any 3 of them have " hunk ./src/allmydata/test/test_upload.py 1577 self.g.remove_server(self.g.servers_by_number[0].my_nodeid)) d.addCallback(_reset_encoding_parameters) d.addCallback(lambda client: - self.shouldFail(UploadUnhappinessError, "test_selection_exceptions", + self.shouldFail(UploadUnhappinessError, + "test_selection_exceptions_2", "placed 0 shares out of 10 " "total (10 homeless), want to place shares on at " "least 4 servers such that any 3 of them have " hunk ./src/allmydata/test/test_upload.py 1583 "enough shares to recover the file, " - "sent 5 queries to 5 servers, 0 queries placed " - "some shares, 5 placed none " + "sent 4 queries to 4 servers, 0 queries placed " + "some shares, 4 placed none " "(of which 4 placed none due to the server being " hunk ./src/allmydata/test/test_upload.py 1586 - "full and 1 placed none due to an error)", + "full and 0 placed none due to an error)", client.upload, upload.Data("data" * 10000, convergence=""))) # server 0, server 1 = empty, accepting shares hunk ./src/allmydata/test/test_upload.py 1599 self._add_server(server_number=1)) d.addCallback(_reset_encoding_parameters) d.addCallback(lambda client: - self.shouldFail(UploadUnhappinessError, "test_selection_exceptions", + self.shouldFail(UploadUnhappinessError, + "test_selection_exceptions_3", "shares could be placed or found on only 2 " "server(s). We were asked to place shares on at " "least 4 server(s) such that any 3 of them have " hunk ./src/allmydata/test/test_upload.py 1625 self._add_server(server_number=4)) d.addCallback(_reset_encoding_parameters, happy=7) d.addCallback(lambda client: - self.shouldFail(UploadUnhappinessError, "test_selection_exceptions", + self.shouldFail(UploadUnhappinessError, + "test_selection_exceptions_4", "shares could be placed on only 5 server(s) such " "that any 3 of them have enough shares to recover " "the file, but we were asked to place shares on " hunk ./src/allmydata/test/test_upload.py 1655 readonly=True)) d.addCallback(_reset_encoding_parameters, happy=7) d.addCallback(lambda client: - self.shouldFail(UploadUnhappinessError, "test_selection_exceptions", + self.shouldFail(UploadUnhappinessError, + "test_selection_exceptions_5", "shares could be placed or found on 4 server(s), " "but they are not spread out evenly enough to " "ensure that any 3 of these servers would have " hunk ./src/allmydata/test/test_upload.py 1701 d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d - test_problem_layout_comment_187.todo = "this isn't fixed yet" def test_problem_layout_ticket_1118(self): # #1118 includes a report from a user who hit an assertion in hunk ./src/allmydata/test/test_upload.py 1730 return client d.addCallback(_setup) - # Note: actually it should succeed! See - # test_problem_layout_ticket_1128. But ticket 1118 is just to - # make it realize that it has failed, so if it raises - # UploadUnhappinessError then we'll give it the green light - # for now. d.addCallback(lambda ignored: hunk ./src/allmydata/test/test_upload.py 1731 - self.shouldFail(UploadUnhappinessError, - "test_problem_layout_ticket_1118", - "", - self.g.clients[0].upload, upload.Data("data" * 10000, - convergence=""))) + self.g.clients[0].upload(upload.Data("data" * 1000, + convergence=""))) return d def test_problem_layout_ticket_1128(self): hunk ./src/allmydata/test/test_upload.py 1768 d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d - test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case." def test_upload_succeeds_with_some_homeless_shares(self): # If the upload is forced to stop trying to place shares before hunk ./src/allmydata/test/test_web.py 4276 " overdue= unused= need 3. Last failure: None") msg2 = msgbase + (" ran out of shares:" " complete=" - " pending=Share(sh0-on-xgru5)" + " pending=Share(sh0-on-ysbz4st7)" " overdue= unused= need 3. Last failure: None") self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share) } [util/happinessutil.py: Abstract happiness functions behind an IPeerSelector-conforming object Kevan Carstensen **20110503034926 Ignore-this: ad14c5044baf64068faf53862107c3 ] { hunk ./src/allmydata/util/happinessutil.py 5 I contain utilities useful for calculating servers_of_happiness, and for reporting it in messages """ +from allmydata.util import mathutil +from allmydata.interfaces import IPeerSelector +from zope.interface import implements from copy import deepcopy hunk ./src/allmydata/util/happinessutil.py 46 return msg -def shares_by_server(servermap): +class HappinessGraph: + implements(IPeerSelector) """ hunk ./src/allmydata/util/happinessutil.py 49 - I accept a dict of shareid -> set(peerid) mappings, and return a - dict of peerid -> set(shareid) mappings. My argument is a dictionary - with sets of peers, indexed by shares, and I transform that into a - dictionary of sets of shares, indexed by peerids. + I am a bipartite graph that the servers of happiness code uses to + determine whether or not an upload is correctly distributed. """ hunk ./src/allmydata/util/happinessutil.py 52 - ret = {} - for shareid, peers in servermap.iteritems(): - assert isinstance(peers, set) - for peerid in peers: - ret.setdefault(peerid, set()).add(shareid) - return ret + def __init__(self, k, happy, n, logparent=None): + # We should avoid needlessly recomputing the happiness matching. + # If no changes to the graph have occurred between calls to + # compute_happiness, return _computed_happiness. + self._needs_recomputation = True + self._computed_happiness = 0 hunk ./src/allmydata/util/happinessutil.py 59 -def merge_servers(servermap, upload_trackers=None): - """ - I accept a dict of shareid -> set(serverid) mappings, and optionally a - set of ServerTrackers. If no set of ServerTrackers is provided, I return - my first argument unmodified. Otherwise, I update a copy of my first - argument to include the shareid -> serverid mappings implied in the - set of ServerTrackers, returning the resulting dict. - """ - # Since we mutate servermap, and are called outside of a - # context where it is okay to do that, make a copy of servermap and - # work with it. - servermap = deepcopy(servermap) - if not upload_trackers: - return servermap + # set((serverid, shnum)) + # Used to keep track of relationships that we already know + # about. + self._existing_edges = set() + self._allocated_edges = set() + self._edges_used = set() + # The subset of allocated edges that was not used in my last + # returned share assignments. + self._obsolete_edges = set() hunk ./src/allmydata/util/happinessutil.py 69 - assert(isinstance(servermap, dict)) - assert(isinstance(upload_trackers, set)) + # serverid => set(shares) + # Returned to our caller when they ask for share assignments so + # that the graph computation doesn't have to take place twice. + self._assignment = {} hunk ./src/allmydata/util/happinessutil.py 74 - for tracker in upload_trackers: - for shnum in tracker.buckets: - servermap.setdefault(shnum, set()).add(tracker.get_serverid()) - return servermap + # set([serverid]) + # The set of peers that we can assign shares to. + self._writable_servers = [] hunk ./src/allmydata/util/happinessutil.py 78 -def servers_of_happiness(sharemap): - """ - I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I - return the 'servers_of_happiness' number that sharemap results in. + self._needed_shares = k + self._total_shares = n + self._happy = happy hunk ./src/allmydata/util/happinessutil.py 82 - To calculate the 'servers_of_happiness' number for the sharemap, I - construct a bipartite graph with servers in one partition of vertices - and shares in the other, and with an edge between a server s and a share t - if s is to store t. I then compute the size of a maximum matching in - the resulting graph; this is then returned as the 'servers_of_happiness' - for my arguments. + # TODO: Add the logparent. hunk ./src/allmydata/util/happinessutil.py 84 - For example, consider the following layout: hunk ./src/allmydata/util/happinessutil.py 85 - server 1: shares 1, 2, 3, 4 - server 2: share 6 - server 3: share 3 - server 4: share 4 - server 5: share 2 + def add_servers(self, servers): + """ + I incorporate the serverids in 'servers' into my internal + representation in such a way as to allow assignment of shares to + them later. + """ + assert isinstance(servers, list) hunk ./src/allmydata/util/happinessutil.py 93 - From this, we can construct the following graph: + for server in servers: + if server not in self._writable_servers: + self._writable_servers.append(server) + self._needs_recomputation = True hunk ./src/allmydata/util/happinessutil.py 98 - L = {server 1, server 2, server 3, server 4, server 5} - R = {share 1, share 2, share 3, share 4, share 6} - V = L U R - E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), - (server 1, share 4), (server 2, share 6), (server 3, share 3), - (server 4, share 4), (server 5, share 2)} - G = (V, E) hunk ./src/allmydata/util/happinessutil.py 99 - Note that G is bipartite since every edge in e has one endpoint in L - and one endpoint in R. + def confirm_share_allocation(self, server, share): + """ + Establish the allocation of share to server. hunk ./src/allmydata/util/happinessutil.py 103 - A matching in a graph G is a subset M of E such that, for any vertex - v in V, v is incident to at most one edge of M. A maximum matching - in G is a matching that is no smaller than any other matching. For - this graph, a matching of cardinality 5 is: + This records that allocation in my internal world representation + and supersedes any existing allocations that may have already + been made. + """ + assert (server, share) in self._edges_used hunk ./src/allmydata/util/happinessutil.py 109 - M = {(server 1, share 1), (server 2, share 6), - (server 3, share 3), (server 4, share 4), - (server 5, share 2)} + # check for existing edges, removing them if necessary. + filtered_edges = filter(lambda x: x[1] == share, + self._allocated_edges) hunk ./src/allmydata/util/happinessutil.py 113 - Since G is bipartite, and since |L| = 5, we cannot have an M' such - that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and - as long as k <= 5, we can see that the layout above has - servers_of_happiness = 5, which matches the results here. - """ - if sharemap == {}: - return 0 - sharemap = shares_by_server(sharemap) - graph = flow_network_for(sharemap) - # This is an implementation of the Ford-Fulkerson method for finding - # a maximum flow in a flow network applied to a bipartite graph. - # Specifically, it is the Edmonds-Karp algorithm, since it uses a - # BFS to find the shortest augmenting path at each iteration, if one - # exists. - # - # The implementation here is an adapation of an algorithm described in - # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. - dim = len(graph) - flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] - residual_graph, residual_function = residual_network(graph, flow_function) - while augmenting_path_for(residual_graph): - path = augmenting_path_for(residual_graph) - # Delta is the largest amount that we can increase flow across - # all of the edges in path. Because of the way that the residual - # function is constructed, f[u][v] for a particular edge (u, v) - # is the amount of unused capacity on that edge. Taking the - # minimum of a list of those values for each edge in the - # augmenting path gives us our delta. - delta = min(map(lambda (u, v): residual_function[u][v], path)) - for (u, v) in path: - flow_function[u][v] += delta - flow_function[v][u] -= delta - residual_graph, residual_function = residual_network(graph, - flow_function) - num_servers = len(sharemap) - # The value of a flow is the total flow out of the source vertex - # (vertex 0, in our graph). We could just as well sum across all of - # f[0], but we know that vertex 0 only has edges to the servers in - # our graph, so we can stop after summing flow across those. The - # value of a flow computed in this way is the size of a maximum - # matching on the bipartite graph described above. - return sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) + # Otherwise, we're asking the uploader to do more work than it + # has to, since we're telling it to assign shares to something + # it already assigned shares to. + for f in filtered_edges: + assert f not in self._edges_used hunk ./src/allmydata/util/happinessutil.py 119 -def flow_network_for(sharemap): - """ - I take my argument, a dict of peerid -> set(shareid) mappings, and - turn it into a flow network suitable for use with Edmonds-Karp. I - then return the adjacency list representation of that network. + self._allocated_edges.difference_update(filtered_edges) hunk ./src/allmydata/util/happinessutil.py 121 - Specifically, I build G = (V, E), where: - V = { peerid in sharemap } U { shareid in sharemap } U {s, t} - E = {(s, peerid) for each peerid} - U {(peerid, shareid) if peerid is to store shareid } - U {(shareid, t) for each shareid} + self._allocated_edges.add((server, share)) hunk ./src/allmydata/util/happinessutil.py 123 - s and t will be source and sink nodes when my caller starts treating - the graph I return like a flow network. Without s and t, the - returned graph is bipartite. - """ - # Servers don't have integral identifiers, and we can't make any - # assumptions about the way shares are indexed -- it's possible that - # there are missing shares, for example. So before making a graph, - # we re-index so that all of our vertices have integral indices, and - # that there aren't any holes. We start indexing at 1, so that we - # can add a source node at index 0. - sharemap, num_shares = reindex(sharemap, base_index=1) - num_servers = len(sharemap) - graph = [] # index -> [index], an adjacency list - # Add an entry at the top (index 0) that has an edge to every server - # in sharemap - graph.append(sharemap.keys()) - # For each server, add an entry that has an edge to every share that it - # contains (or will contain). - for k in sharemap: - graph.append(sharemap[k]) - # For each share, add an entry that has an edge to the sink. - sink_num = num_servers + num_shares + 1 - for i in xrange(num_shares): - graph.append([sink_num]) - # Add an empty entry for the sink, which has no outbound edges. - graph.append([]) - return graph + # we don't need to recompute after this. hunk ./src/allmydata/util/happinessutil.py 125 -def reindex(sharemap, base_index): - """ - Given sharemap, I map peerids and shareids to integers that don't - conflict with each other, so they're useful as indices in a graph. I - return a sharemap that is reindexed appropriately, and also the - number of distinct shares in the resulting sharemap as a convenience - for my caller. base_index tells me where to start indexing. - """ - shares = {} # shareid -> vertex index - num = base_index - ret = {} # peerid -> [shareid], a reindexed sharemap. - # Number the servers first - for k in sharemap: - ret[num] = sharemap[k] - num += 1 - # Number the shares - for k in ret: - for shnum in ret[k]: - if not shares.has_key(shnum): - shares[shnum] = num - num += 1 - ret[k] = map(lambda x: shares[x], ret[k]) - return (ret, len(shares)) hunk ./src/allmydata/util/happinessutil.py 126 -def residual_network(graph, f): - """ - I return the residual network and residual capacity function of the - flow network represented by my graph and f arguments. graph is a - flow network in adjacency-list form, and f is a flow in graph. - """ - new_graph = [[] for i in xrange(len(graph))] - cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))] - for i in xrange(len(graph)): - for v in graph[i]: - if f[i][v] == 1: - # We add an edge (v, i) with cf[v,i] = 1. This means - # that we can remove 1 unit of flow from the edge (i, v) - new_graph[v].append(i) - cf[v][i] = 1 - cf[i][v] = -1 - else: - # We add the edge (i, v), since we're not using it right - # now. - new_graph[i].append(v) - cf[i][v] = 1 - cf[v][i] = -1 - return (new_graph, cf) + def add_server_with_share(self, server, share): + """ + I record that the server identified by 'server' is currently + storing 'share'. + """ + if (server, share) not in self._existing_edges: + self._existing_edges.add((server, share)) + self._needs_recomputation = True hunk ./src/allmydata/util/happinessutil.py 135 -def augmenting_path_for(graph): - """ - I return an augmenting path, if there is one, from the source node - to the sink node in the flow network represented by my graph argument. - If there is no augmenting path, I return False. I assume that the - source node is at index 0 of graph, and the sink node is at the last - index. I also assume that graph is a flow network in adjacency list - form. - """ - bfs_tree = bfs(graph, 0) - if bfs_tree[len(graph) - 1]: - n = len(graph) - 1 - path = [] # [(u, v)], where u and v are vertices in the graph - while n != 0: - path.insert(0, (bfs_tree[n], n)) - n = bfs_tree[n] - return path - return False hunk ./src/allmydata/util/happinessutil.py 136 -def bfs(graph, s): - """ - Perform a BFS on graph starting at s, where graph is a graph in - adjacency list form, and s is a node in graph. I return the - predecessor table that the BFS generates. - """ - # This is an adaptation of the BFS described in "Introduction to - # Algorithms", Cormen et al, 2nd ed., p. 532. - # WHITE vertices are those that we haven't seen or explored yet. - WHITE = 0 - # GRAY vertices are those we have seen, but haven't explored yet - GRAY = 1 - # BLACK vertices are those we have seen and explored - BLACK = 2 - color = [WHITE for i in xrange(len(graph))] - predecessor = [None for i in xrange(len(graph))] - distance = [-1 for i in xrange(len(graph))] - queue = [s] # vertices that we haven't explored yet. - color[s] = GRAY - distance[s] = 0 - while queue: - n = queue.pop(0) - for v in graph[n]: - if color[v] == WHITE: - color[v] = GRAY - distance[v] = distance[n] + 1 - predecessor[v] = n - queue.append(v) - color[n] = BLACK - return predecessor + def mark_bad_server(self, serverid): + """ + Given a serverid, I remove that serverid from my internal world + representation. This means that I will not attempt to allocate + shares to it, nor will I remember any existing shares that I've + been told that it holds. + """ + # remove serverid from writable peers, any unplaced share + # assignments. + self.mark_full_server(serverid) + + # If serverid has any existing shares in our estimation, remove + # those as well. + existing = filter(lambda x: x[0] != serverid, + list(self._existing_edges)) + existing = set(existing) + + if self._existing_edges != existing: + self._existing_edges = existing + self._needs_recomputation = True + + allocated = filter(lambda x: x[0] != serverid, + list(self._allocated_edges)) + allocated = set(allocated) + + if self._allocated_edges != allocated: + self._allocated_edges = allocated + self._needs_recomputation = True + + + def mark_full_server(self, serverid): + """ + Given a serverid, I mark that serverid as full in my graph. + """ + if serverid in self._writable_servers: + self._writable_servers.remove(serverid) + + new_edges = filter(lambda x: x[0] != serverid, list(self._edges_used)) + new_edges = set(new_edges) + + if new_edges != self._edges_used: + self._edges_used = new_edges + self._needs_recomputation = True + + + def get_obsolete_allocations(self): + """ + Return a sharemap-like structure of all of the allocations that + I've made and peers have followed that are obsolete. This allows + the uploader to cancel allocations as appropriate. + """ + ret = {} + for (server, share) in self._obsolete_edges: + ret.setdefault(server, set()).add(share) + + return ret + + + def _assign_homeless_shares(self, used_edges): + """ + I'm a helper method to assign homeless shares. + + The bipartite matching algorithm only computes the matching. By + definition, the matching cannot contain duplicate edges, so if + we're trying to place more shares than there are servers, we'll + have some left over. This method assigns those, trying to build + off of the existing matching to distribute them evenly. + """ + assert isinstance(used_edges, set) + + all_shares = set(range(self._total_shares)) + + comp_edges = used_edges.union(self._existing_edges) + comp_edges = comp_edges.union(self._allocated_edges) + for (peer, share) in comp_edges: + # If a share exists on more than one peer we may have already + # removed it by the time we get to this statement, which is why the + # check is necessary. + if share in all_shares: all_shares.remove(share) + + # now all_shares is the set of all shares that we can't account + # for either as a result of our matching or as an existing share + # placement. these need to be distributed to peers. + peers_used = set([x[0] for x in used_edges]) + # remove any peers that we can't assign shares to. + existing_servers = set([x[0] for x in self._existing_edges]) + readonly_servers = \ + existing_servers.difference(self._writable_servers) + + peers_used.difference_update(readonly_servers) + + new_edges = set() + while all_shares and peers_used: + new_edges.update(zip(peers_used, all_shares)) + for share in [x[1] for x in new_edges]: + if share in all_shares: all_shares.remove(share) + + used_edges.update(new_edges) + + return used_edges + + + def _filter_edges(self, edges_to_filter): + """ + Return a canonical set of existing edges to use. + """ + our_sharemap = {} + + for (peer, share) in edges_to_filter: + our_sharemap.setdefault(peer, set()).add(share) + + edges_used = self._compute_matching(our_sharemap) + + return edges_used + + + def _build_sharemap(self, shallow): + """ + I'm a helper method responsible for synthesizing instance + information into a sharemap that can then be used by the + bipartite matching algorithm. + """ + our_sharemap = {} + + # Add the shares we already know about. + all_shares = set(range(self._total_shares)) + peers_used_already = set() + + # To obey our optimality guarantee, we need to consider readonly + # shares first, and separately from existing shares on peers we + # can write to. + readonly_edges = filter(lambda x: x[0] not in self._writable_servers, + self._existing_edges) + writable_edges = filter(lambda x: x not in readonly_edges, + self._existing_edges) + + for edges in (readonly_edges, writable_edges, self._allocated_edges): + edges = filter(lambda x: x[0] not in peers_used_already, edges) + edges = filter(lambda x: x[1] in all_shares, edges) + edges_to_use = self._filter_edges(edges) + + for (peer, share) in edges_to_use: + if peer not in peers_used_already: + our_sharemap.setdefault(peer, set()).add(share) + peers_used_already.add(peer) + # this happens when a share exists on more than one peer. + if share in all_shares: all_shares.remove(share) + + # Pick happy peers that we know we can write to. + if not shallow: + peers_to_use = [x for x in self._writable_servers + if x not in peers_used_already] + peers_to_use = peers_to_use[:len(all_shares)] + + # Assign. + for peer in peers_to_use: + our_sharemap.setdefault(peer, set()).update(all_shares) + + + return our_sharemap + + + def _compute_happiness(self, shallow): + """ + I compute the happiness of the current state of the graph that I + represent, if necessary, and return it. + """ + # build the sharemap. + sharemap = self._build_sharemap(shallow) + + # compute which edges we're going to use. + edges_used = self._compute_matching(sharemap) + + all_edges = self._assign_homeless_shares(edges_used) + + # From these, remove any edges that we already know about + all_edges = set(all_edges) + all_edges.difference_update(self._existing_edges) + + unused_allocations = self._allocated_edges.difference(all_edges) + self._obsolete_edges.update(unused_allocations) + + all_edges.difference_update(self._allocated_edges) + + self._edges_used = all_edges + + + def get_share_assignments(self, shallow=False): + self._compute_happiness(shallow) + self._needs_recomputation = False + + result = {} + for (peer, share) in self._edges_used: + result.setdefault(peer, set()).add(share) + return result + + + def set_trackers(self, trackers): + self._peer_trackers = trackers + + + def get_trackers(self): + return self._peer_trackers + + + def is_healthy(self): + """ + I return whether the share assignments I last returned to the + caller represent a healthy share distribution according to the + server of happiness health metric. + """ + return (self._computed_happiness >= self._happy) + + + def needs_recomputation(self): + """ + I return True if the share assignments last returned to my + caller are possibly stale, and False otherwise. + """ + return self._needs_recomputation + + + def get_homeless_shares(self): + """ + I return a set of the shares that weren't placed in the last share + assignment. + """ + all_shares = set(range(self._total_shares)) + + # Remove the shares that we know exist already. + existing_shares = set([x[1] for x in self._existing_edges]) + all_shares.difference_update(existing_shares) + + relevant_shares = set([x[1] for x in self._allocated_edges]) + all_shares.difference_update(relevant_shares) + + return all_shares + + + def get_failure_message(self): + """ + I return a failure message suitable for printing in an exception + statement. + """ + peers = [x[0] for x in self._edges_used] + peers.extend([x[0] for x in self._existing_edges]) + peers = set(peers) + + return failure_message(len(peers), + self._needed_shares, + self._happy, + self._computed_happiness) + + + def _compute_matching(self, sharemap): + """ + I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I + return the 'servers_of_happiness' number that sharemap results in. + + To calculate the 'servers_of_happiness' number for the sharemap, I + construct a bipartite graph with servers in one partition of vertices + and shares in the other, and with an edge between a server s and a share t + if s is to store t. I then compute the size of a maximum matching in + the resulting graph; this is then returned as the 'servers_of_happiness' + for my arguments. + + For example, consider the following layout: + + server 1: shares 1, 2, 3, 4 + server 2: share 6 + server 3: share 3 + server 4: share 4 + server 5: share 2 + + From this, we can construct the following graph: + + L = {server 1, server 2, server 3, server 4, server 5} + R = {share 1, share 2, share 3, share 4, share 6} + V = L U R + E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), + (server 1, share 4), (server 2, share 6), (server 3, share 3), + (server 4, share 4), (server 5, share 2)} + G = (V, E) + + Note that G is bipartite since every edge in e has one endpoint in L + and one endpoint in R. + + A matching in a graph G is a subset M of E such that, for any vertex + v in V, v is incident to at most one edge of M. A maximum matching + in G is a matching that is no smaller than any other matching. For + this graph, a matching of cardinality 5 is: + + M = {(server 1, share 1), (server 2, share 6), + (server 3, share 3), (server 4, share 4), + (server 5, share 2)} + + Since G is bipartite, and since |L| = 5, we cannot have an M' such + that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and + as long as k <= 5, we can see that the layout above has + servers_of_happiness = 5, which matches the results here. + """ + if sharemap == {}: + self._computed_happiness = 0 + return set([]) + + # graph is what we're used to; vertices maps an index in the + # graph to its peerid or shnum. + # XXX: How do we tell the difference between them? Two dicts? + graph, peers, shares = self.flow_network_for(sharemap) + # This is an implementation of the Ford-Fulkerson method for finding + # a maximum flow in a flow network applied to a bipartite graph. + # Specifically, it is the Edmonds-Karp algorithm, since it uses a + # BFS to find the shortest augmenting path at each iteration, if one + # exists. + # + # The implementation here is an adapation of an algorithm described in + # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = self.residual_network(graph, + flow_function) + while self.augmenting_path_for(residual_graph): + path = self.augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v): residual_function[u][v], path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, \ + residual_function = self.residual_network(graph, flow_function) + num_servers = len(sharemap) + # The value of a flow is the total flow out of the source vertex + # (vertex 0, in our graph). We could just as well sum across all of + # f[0], but we know that vertex 0 only has edges to the servers in + # our graph, so we can stop after summing flow across those. The + # value of a flow computed in this way is the size of a maximum + # matching on the bipartite graph described above. + happiness = sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) + self._computed_happiness = happiness + + # Now we need to compute the edges that our matching used. + # Our maximum flow will place one unit of flow across each of + # the edges that end up getting used. By construction, we know + # that the first num_servers edges that aren't the source node + # are the servers, so we're going to look for those that have + # a unit of positive outflow. + # + # (we could just look for a negative flow to the source node, + # but that wouldn't tell us anything about share assignments) + edges = set() + + # Exclude the source node, include the server nodes, exclude the + # share nodes. + graph = flow_function[1:num_servers+1] + assert len(graph) == num_servers + + for (i, x) in enumerate(graph): + # We're actually dealing with index i+1. + i += 1 + + if 1 in x: + # This node is getting used. + edges.add((i, x.index(1))) + + assert len(edges) == self._computed_happiness + + # Now transform the edges into actual edges. + edges = map(lambda (s, d): (peers[s], shares[d]), edges) + # The client can transform these into write buckets, if + # necessary. + return set(edges) + + + def flow_network_for(self, sharemap): + """ + I take my argument, a dict of peerid -> set(shareid) mappings, and + turn it into a flow network suitable for use with Edmonds-Karp. I + then return the adjacency list representation of that network. + + Specifically, I build G = (V, E), where: + V = { peerid in sharemap } U { shareid in sharemap } U {s, t} + E = {(s, peerid) for each peerid} + U {(peerid, shareid) if peerid is to store shareid } + U {(shareid, t) for each shareid} + + s and t will be source and sink nodes when my caller starts treating + the graph I return like a flow network. Without s and t, the + returned graph is bipartite. + """ + # Servers don't have integral identifiers, and we can't make any + # assumptions about the way shares are indexed -- it's possible that + # there are missing shares, for example. So before making a graph, + # we re-index so that all of our vertices have integral indices, and + # that there aren't any holes. We start indexing at 1, so that we + # can add a source node at index 0. + sharemap, shares, peers = self.reindex(sharemap, base_index=1) + num_servers = len(sharemap) + num_shares = len(shares) + graph = [] # index -> [index], an adjacency list + # Add an entry at the top (index 0) that has an edge to every server + # in sharemap + graph.append(sharemap.keys()) + # For each server, add an entry that has an edge to every share that it + # contains (or will contain). + for k in sharemap: + graph.append(sharemap[k]) + # For each share, add an entry that has an edge to the sink. + sink_num = num_servers + num_shares + 1 + for i in xrange(num_shares): + graph.append([sink_num]) + # Add an empty entry for the sink, which has no outbound edges. + graph.append([]) + return graph, peers, shares + + def reindex(self, sharemap, base_index): + """ + Given sharemap, I map peerids and shareids to integers that don't + conflict with each other, so they're useful as indices in a graph. I + return a sharemap that is reindexed appropriately, and also the + number of distinct shares in the resulting sharemap as a convenience + for my caller. base_index tells me where to start indexing. + """ + shares = {} # shareid => vertex index, then reversed and + # returned. + peers = {} # vertex index -> peerid + num = base_index + ret = {} # peerid -> [shareid], a reindexed sharemap. + # Number the servers first + for k in sharemap: + ret[num] = sharemap[k] + peers[num] = k + num += 1 + # Number the shares + for k in ret: + for shnum in ret[k]: + if shnum not in shares: + shares[shnum] = num + num += 1 + ret[k] = map(lambda x: shares[x], ret[k]) + + # Reverse the sharemap so that we can use it later. + shares = dict([(n, s) for (s, n) in shares.items()]) + return (ret, shares, peers) + + def residual_network(self, graph, f): + """ + I return the residual network and residual capacity function of the + flow network represented by my graph and f arguments. graph is a + flow network in adjacency-list form, and f is a flow in graph. + """ + new_graph = [[] for i in xrange(len(graph))] + cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))] + for i in xrange(len(graph)): + for v in graph[i]: + if f[i][v] == 1: + # We add an edge (v, i) with cf[v,i] = 1. This means + # that we can remove 1 unit of flow from the edge (i, v) + new_graph[v].append(i) + cf[v][i] = 1 + cf[i][v] = -1 + else: + # We add the edge (i, v), since we're not using it right + # now. + new_graph[i].append(v) + cf[i][v] = 1 + cf[v][i] = -1 + return (new_graph, cf) + + def augmenting_path_for(self, graph): + """ + I return an augmenting path, if there is one, from the source node + to the sink node in the flow network represented by my graph argument. + If there is no augmenting path, I return False. I assume that the + source node is at index 0 of graph, and the sink node is at the last + index. I also assume that graph is a flow network in adjacency list + form. + """ + bfs_tree = self.bfs(graph, 0) + if bfs_tree[len(graph) - 1]: + n = len(graph) - 1 + path = [] # [(u, v)], where u and v are vertices in the graph + while n != 0: + path.insert(0, (bfs_tree[n], n)) + n = bfs_tree[n] + return path + return False + + def bfs(self, graph, s): + """ + Perform a BFS on graph starting at s, where graph is a graph in + adjacency list form, and s is a node in graph. I return the + predecessor table that the BFS generates. + """ + # This is an adaptation of the BFS described in "Introduction to + # Algorithms", Cormen et al, 2nd ed., p. 532. + # WHITE vertices are those that we haven't seen or explored yet. + WHITE = 0 + # GRAY vertices are those we have seen, but haven't explored yet + GRAY = 1 + # BLACK vertices are those we have seen and explored + BLACK = 2 + color = [WHITE for i in xrange(len(graph))] + predecessor = [None for i in xrange(len(graph))] + distance = [-1 for i in xrange(len(graph))] + queue = [s] # vertices that we haven't explored yet. + color[s] = GRAY + distance[s] = 0 + while queue: + n = queue.pop(0) + for v in graph[n]: + if color[v] == WHITE: + color[v] = GRAY + distance[v] = distance[n] + 1 + predecessor[v] = n + queue.append(v) + color[n] = BLACK + return predecessor } [test/test_upload: Add tests for HappinessGraph object. Kevan Carstensen **20110503035009 Ignore-this: b2a290bd0d5e2d71d83f01cee6c55c68 ] { hunk ./src/allmydata/test/test_upload.py 17 from allmydata.util import log from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed +from allmydata.util.happinessutil import HappinessGraph from allmydata.test.no_network import GridTestMixin from allmydata.test.common_util import ShouldFailMixin hunk ./src/allmydata/test/test_upload.py 20 -from allmydata.util.happinessutil import servers_of_happiness, \ - shares_by_server, merge_servers from allmydata.storage_client import StorageFarmBroker from allmydata.storage.server import storage_index_to_dir hunk ./src/allmydata/test/test_upload.py 450 return d def test_second_error_all(self): + # XXX: This test is stupid and wrong; fix it. self.make_node("second-fail") self.set_encoding_parameters(k=25, happy=1, n=100) # The upload won't ask any peer to allocate shares more than once on hunk ./src/allmydata/test/test_upload.py 1906 f.close() return None +class HappinessGraphTests(unittest.TestCase, ShouldFailMixin): + """ + Tests for the HappinessGraph construct, which handles validation and + revaliation based on what we've seen on the grid. + """ + def make_servers_to(self, n=0): + """ + Make servers up to n + """ + return ["server%d" % i for i in xrange(n)] + + + def test_add_servers_with_excess_shares(self): + """ + Test the add_servers method in HappinessGraph when there are more + shares than there are writable servers. + """ + # First, test the case where there are fewer servers than shares. + # We should have happiness equal to the number of servers. + servers = self.make_servers_to(6) + + self.g = HappinessGraph(3, 7, 10) + self.g.add_servers(servers) + self.g.get_share_assignments() + + self.failUnlessEqual(self.g._computed_happiness, len(servers)) + self.failIf(self.g.is_healthy()) + + + def test_add_servers_with_excess_servers(self): + """ + Test the add_servers method in HappinessGraph when there are more + writable servers than there are shares. + """ + servers = self.make_servers_to(15) + self.g = HappinessGraph(3, 7, 10) + self.g.add_servers(servers) + self.g.get_share_assignments() + + self.failUnlessEqual(self.g._computed_happiness, 10) + self.failUnless(self.g.is_healthy()) + + + def test_add_server_with_share(self): + """ + HappinessGraph.add_server_with_share adds a single edge to its + internal graph representation. It is used by client code that + has found a single existing share that it wishes to add to the + graph. This is in contrast to the idea of representing possible + share assignments by adding a complete graph; we don't want to + represent that with read-only servers that we don't know we can + assign to. + """ + self.g = HappinessGraph(3, 8, 10) + added_count = 0 + for (sv, sh) in zip(self.make_servers_to(7), range(7)): + self.g.add_server_with_share(sv, sh) + added_count += 1 + self.g.get_share_assignments() + + self.failIf(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, added_count) + + self.g.add_server_with_share("server8", 7) + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, added_count + 1) + + + def test_add_server_with_share_duplicate_share(self): + """ + HappinessGraph shouldn't overcount happiness in the case where a + share is found on more than one server. + """ + self.g = HappinessGraph(3, 8, 10) + + for server in self.make_servers_to(6): + self.g.add_server_with_share(server, 1) + self.g.get_share_assignments() + + self.failIf(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 1) + + + def test_add_server_with_share_multiple_shares_on_server(self): + """ + HappinessGraph shouldn't overcount happiness in the case where a + server has more than one share. + """ + self.g = HappinessGraph(3, 8, 10) + + for share in range(6): + self.g.add_server_with_share("server1", share) + self.g.get_share_assignments() + + self.failIf(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 1) + + + def test_add_server_with_share_after_assignments(self): + """ + add_server_with_share should interact in a useful way with the + existing share assignments after we've made them. + """ + self.g = HappinessGraph(3, 5, 8) + + servers = self.make_servers_to(6) + shares = [i for i in xrange(6)] + + self.g.add_servers(servers) + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 6) + + # Now add some edges. We'll add duplicate edges, which shouldn't + # increase happiness any. + edges = zip(servers, shares) + for (server, share) in edges: + self.g.add_server_with_share(server, share) + self.g.get_share_assignments() + + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 6) + + # Now add some new edges, which should increase happiness. + self.g.add_server_with_share("server6", 6) + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 7) + + self.g.add_server_with_share("server7", 7) + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 8) + + + def test_mark_full_server(self): + """ + Marking a full server should mean that no new shares get assigned + to that server, but that existing shares added with + add_server_with_share are still counted. + """ + self.g = HappinessGraph(3, 5, 6) + + servers = self.make_servers_to(6) + shares = [i for i in xrange(6)] + + self.g.add_servers(servers) + self.g.get_share_assignments() + + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 6) + + # Now remove a couple of servers, and verify that the happiness + # score decreases accordingly. + self.g.mark_full_server("server5") + self.g.get_share_assignments() + + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 5) + + self.g.mark_full_server("server4") + self.g.get_share_assignments() + self.failIf(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 4) + + + def test_mark_bad_server(self): + """ + If we mark a server bad, we should see existing shares on that + server vanish. + """ + self.g = HappinessGraph(3, 5, 6) + + servers = self.make_servers_to(6) + self.g.add_servers(servers) + + # Now make a couple additional servers and say that they have some shares. + self.g.add_server_with_share("server6", 4) + self.g.add_server_with_share("server7", 5) + + # We have all of these shares in the graph already, so happiness + # should be 6. + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 6) + + # Now mark one of those servers bad. Nothing should change. + self.g.mark_bad_server("server6") + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 6) + + # Now mark one of the servers that we added initially bad. + self.g.mark_bad_server("server0") + self.g.mark_bad_server("server7") + assignments = self.g.get_share_assignments() + + for k in assignments: + self.failIfEqual(k, "server0") + + + def test_mark_nonexistent_server(self): + """ + Removing a nonexistent server shouldn't do anything. + """ + self.g = HappinessGraph(3, 5, 6) + + servers = self.make_servers_to(6) + self.g.add_servers(servers) + self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 6) + + # Now try removing a server that isn't there; nothing should + # happen in this case. + self.g.mark_bad_server("server7") + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 6) + + + def test_compute_happiness(self): + # The happiness of an unused graph should be zero. + self.g = HappinessGraph(3, 5, 10) + + self.g.get_share_assignments() + self.failIf(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 0) + + self.g.add_server_with_share("server1", 1) + self.g.add_server_with_share("server2", 2) + self.g.add_server_with_share("server3", 3) + self.g.add_server_with_share("server4", 4) + self.g.get_share_assignments() + + self.failUnlessEqual(self.g._computed_happiness, 4) + + self.g.mark_bad_server("server4") + self.g.add_server_with_share("server1", 4) + self.g.get_share_assignments() + # We've added a duplicate server, so now servers_of_happiness + # should be 3 instead of 4. + self.failUnlessEqual(self.g._computed_happiness, 3) + + # Let's add four more servers with no overlap between themselves + # or the (server,share) tuples already in the matching. + for (server, share) in [("server%d" % i, i) for i in xrange(5, 9)]: + self.g.add_server_with_share(server, share) + + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 7) + + # Try adding a redundant server and share; observe that it does + # not affect the matching. + self.g.add_server_with_share("server1", 1) + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 7) + + # Test a more substantial overlap between the trackers and the + # existing assignments. + edges = [ + ("server1", 1), + ("server2", 2), + ("server3", 3), + ("server4", 4), + ("server5", 4), + ("server6", 3), + ("server6", 5), + ] + self.g = HappinessGraph(3, 5, 6) + for (server, share) in edges: self.g.add_server_with_share(server, share) + + # The value returned by servers_of_happiness is the size + # of a maximum matching in the bipartite graph that + # servers_of_happiness() makes between serverids and share + # numbers. It should find something like this: + # (server 1, share 1) + # (server 2, share 2) + # (server 3, share 3) + # (server 5, share 4) + # (server 6, share 5) + # + # and, since there are 5 edges in this matching, it should + # return 5. + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 5) + + # Now remove server 6, and watch the happiness drop to 4. + self.g.mark_bad_server("server6") + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 4) + + # Now remove server 5, and watch happiness stay where it is. + self.g.mark_bad_server("server4") + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 4) + + # Zooko's first puzzle: + # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156) + # + # server 1: shares 0, 1 + # server 2: shares 1, 2 + # server 3: share 2 + # + # This should yield happiness of 3. + self.g = HappinessGraph(3, 3, 6) + + edges = [ + ("server1", 0), + ("server1", 1), + ("server2", 1), + ("server2", 2), + ("server3", 2) + ] + for (server, share) in edges: self.g.add_server_with_share(server, share) + + self.g.get_share_assignments() + self.failUnlessEqual(3, self.g._computed_happiness) + + # Zooko's second puzzle: + # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158) + # + # server 1: shares 0, 1 + # server 2: share 1 + # + # This should yield happiness of 2. + self.g = HappinessGraph(3, 3, 6) + + edges = [ + ("server1", 0), + ("server1", 1), + ("server2", 1), + ] + for (server, share) in edges: self.g.add_server_with_share(server, share) + + self.g.get_share_assignments() + self.failUnlessEqual(2, self.g._computed_happiness) + + + def test_shallow_compute_happiness(self): + """ + Shallow happiness computations depend only on the edges that + correspond to already existing share placements. These tell us + whether we're likely to be able to continue an upload without + resulting in unhappiness after we've started it. + """ + # Make more servers than shares. This means that we'll have one + # server with no shares assigned to it. + self.g = HappinessGraph(3, 5, 5) + servers = self.make_servers_to(6) + self.g.add_servers(servers) + results = self.g.get_share_assignments() + + self.failUnlessEqual(self.g._computed_happiness, 5) + + # Now we tell the graph that we managed to allocate space for + # all of these shares. + for k in results: + for s in results[k]: + self.g.add_server_with_share(k, s) + + # Now imagine that our upload has started, and that one of the + # servers that we're working with is broken. Uh oh. Remove that + # server from the list. + s = [k for k in results][0] + self.g.mark_bad_server(s) + + # Now recompute. At this point we just want the upload to die + # gracefully if it is unhappy, since it will be less work then + # recomputing the matching, reallocating buckets, and so on on + # the fly. Since we only know about 5 edges in our graph, and + # we've just removed one of them, the result should be four, + # even though we could theoretically go back, ask the remaining + # server to hold some share, and try again. + # (that's probably what we will do, though after starting the + # upload over again) + self.g.get_share_assignments(shallow=True) + self.failUnlessEqual(self.g._computed_happiness, 4) + + + def test_get_share_assignments(self): + """ + The HappinessGraph needs to be able to tell us about share + assignments correctly so that we know where to put things. + """ + self.g = HappinessGraph(3, 5, 10) + servers = self.make_servers_to(6) + shares = [i for i in xrange(6)] + edges = zip(servers, shares) + + for (server, share) in edges: + self.g.add_server_with_share(server, share) + + assignments = self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 6) + # All of these shares are existing, so we have no work to do. + self.failUnlessEqual(assignments, {}) + + more_servers = self.make_servers_to(10) + self.g.add_servers(more_servers) + + assignments = self.g.get_share_assignments() + + self.failUnlessEqual(self.g._computed_happiness, 10) + + all_shares = range(10) + + for server in assignments: + for share in assignments[server]: + self.failUnlessIn(server, more_servers) + self.failUnlessIn(share, all_shares) + more_servers.remove(server) + all_shares.remove(share) + + + def test_needs_recomputation(self): + """ + HappinessGraph instances should be smart enough to know when + they need to recompute happiness and when they can return the + value that they've already computed. + """ + self.g = HappinessGraph(3, 5, 10) + servers = self.make_servers_to(6) + + self.g.add_servers(servers) + self.failUnless(self.g.needs_recomputation()) + + self.g.get_share_assignments() + self.failIf(self.g.needs_recomputation()) + + # Now add a server, and verify that it needs to be recomputed. + self.g.add_server_with_share("server7", 7) + self.failUnless(self.g.needs_recomputation()) + self.g.get_share_assignments() + self.failIf(self.g.needs_recomputation()) + + # Since this server has an existing share, we shouldn't need to + # recompute the graph for it if it's full. + self.g.mark_full_server("server7") + self.failIf(self.g.needs_recomputation()) + + self.g.mark_bad_server("server7") + self.failUnless(self.g.needs_recomputation()) + self.g.get_share_assignments() + self.failIf(self.g.needs_recomputation()) + + # Now remove a nonexistent server, and verify that recomputation + # isn't called for + self.g.mark_bad_server("serverfake") + self.failIf(self.g.needs_recomputation()) + + + def test_assignment_freshness(self): + self.g = HappinessGraph(3, 5, 7) + + servers = self.make_servers_to(6) + self.g.add_servers(servers) + self.g.get_share_assignments() + + self.failUnlessEqual(self.g._computed_happiness, 6) + + self.g.add_server_with_share("server7", 6) + self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 7) + + + def test_assignment_server_permutation(self): + """ + If we give the HappinessGraph instance an ordered list of servers + and there are more servers that it needs to make a happy share + assignment, it should distribute shares over the earlier servers + before distributing them to the later servers. + """ + self.g = HappinessGraph(3, 7, 10) + + servers = self.make_servers_to(15) + self.g.add_servers(servers) + + assignments = self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 10) + + expected_use_servers = list(servers)[:10] + for k in assignments: + self.failUnlessIn(k, expected_use_servers) + + + def test_confirm_share_placement(self): + # The HappinessGraph won't start counting edges associated with + # allocated shares until we've confirmed that those have been + # placed. + self.g = HappinessGraph(3, 7, 10) + servers = self.make_servers_to(10) + self.g.add_servers(servers) + + assignments = self.g.get_share_assignments() + self.failUnlessEqual(len(assignments), 10) + self.failUnless(self.g.is_healthy()) + + # Now do a shallow happiness computation. We shouldn't get any + # assignments back, since we haven't given the HappinessGraph + # any indication that we've successfully allocated anything. + assignments = self.g.get_share_assignments(shallow=True) + self.failUnlessEqual(len(assignments), 0) + self.failIf(self.g.is_healthy()) + + # Now confirm a few shares. + assignments = self.g.get_share_assignments() + + for server in servers[:3]: + for share in assignments[server]: + self.g.confirm_share_allocation(server, share) + + + # and check that they show up in subsequent assignments. + assignments = self.g.get_share_assignments(shallow=True) + self.failUnlessEqual(len(assignments), 3) + self.failIf(self.g.is_healthy()) + + + def test_obsolete_allocations_with_confirmed_placments(self): + # The HappinessGraph keeps track of all of the allocations it's + # made over the course of server selection. In some cases, these + # allocations can become obsolete -- for example, if information + # found when allocating shares reveals additional shares on + # additional servers such that fewer shares actually need to be + # allocated to adequately distribute a file. The + # get_obsolete_allocations method tells the uploader about these + # so it can cancel them appropriately. + self.g = HappinessGraph(3, 7, 10) + + servers = self.make_servers_to(10) + self.g.add_servers(servers) + + assignments = self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + + # Now we'll confirm all of these placements. + for server in assignments: + for share in assignments[server]: + self.g.confirm_share_allocation(server, share) + + # We'll also pretend that we've found share 1 on a server. + self.g.add_server_with_share("server1", 1) + + # Now we'll attempt to get share assignments again. + assignments = self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 10) + + obsolete = self.g.get_obsolete_allocations() + self.failUnlessEqual(len(obsolete), 2) + + + def test_obsolete_allocations_with_unconfirmed_placements(self): + # The HappinessGraph should not care about shares which have + # been allocated but which have not been placed by an uploader. + # These should not be reported by get_obsolete_allocations. + self.g = HappinessGraph(3, 7, 10) + + servers = self.make_servers_to(10) + self.g.add_servers(servers) + + assignments = self.g.get_share_assignments() + + # Don't confirm any of these placements, but find another share. + self.g.add_server_with_share("server1", 1) + + # then recompute. + assignments = self.g.get_share_assignments() + self.failUnlessEqual(self.g._computed_happiness, 10) + + obsolete = self.g.get_obsolete_allocations() + # We made no allocations, so there shouldn't be any obsolete + # allocations. + self.failUnlessEqual(len(obsolete), 0) + + + def test_allocated_reuse(self): + # If we have to redo share placement after already allocating + # some shares to some servers, we should make sure to re-use + # those allocations if possible in order to reduce the number of + # network round trips. + self.g = HappinessGraph(3, 7, 10) + + servers = self.make_servers_to(10) + self.g.add_servers(servers) + + assignments = self.g.get_share_assignments() + + # Confirm all of the share assignments. + for server in assignments: + for share in assignments[server]: + self.g.confirm_share_allocation(server, share) + + # We need to break the allocation somehow. We'll do that by + # finding a share on the server "server1" that wasn't assigned + # to it. + all_shares = set([i for i in xrange(10)]) + all_shares.difference_update(assignments["server1"]) + share = list(all_shares)[0] + + self.g.add_server_with_share("server1", share) + + # And go again. + assignments = self.g.get_share_assignments() + + # We should have had to allocate one more share to deal with + # that. + self.failUnlessEqual(len(assignments), 1) + self.failUnlessEqual(self.g._computed_happiness, 10) + + # And, if we check for obsolete allocations, we should have two + # wasted allocations. + self.failUnlessEqual(len(self.g.get_obsolete_allocations()), 2) + + + def test_existing_reuse(self): + # The HappinessGraph should attempt to use existing shares + # whenever possible to reduce network traffic. + self.g = HappinessGraph(3, 7, 10) + + servers = self.make_servers_to(10) + + self.g.add_servers(servers) + self.g.add_server_with_share("server0", 0) + self.g.add_server_with_share("server1", 1) + self.g.add_server_with_share("server2", 2) + + # Now get share assignments. We should have 7 assignments. + assignments = self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(self.g._computed_happiness, 10) + self.failUnlessEqual(len(assignments), 7) + + + def test_homeless_share_efficiency(self): + # When we're dealing with an inefficient layout, the + # HappinessGraph object should be smart enough to + # use the existing shares when deciding on where to place + # things. + self.g = HappinessGraph(1, 2, 10) + + # Add two servers, and say that between the two of them they + # have all of the shares. We shouldn't have to allocate any + # shares to satisfy the upload. + servers = self.make_servers_to(2) + self.g.add_servers(servers) + + for i in (1, 3, 4, 8, 9): + self.g.add_server_with_share("server1", i) + + for i in (0, 2, 5, 6, 7): + self.g.add_server_with_share("server0", i) + + # Now get share assignments. There shouldn't be any share + # assignments. + assignments = self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnlessEqual(len(assignments), 0) + + + def test_conservatism_with_replication(self): + # The old HappinessGraph was too conservative when k = 1. The + # new one shouldn't be. + self.g = HappinessGraph(1, 10, 10) + + servers = self.make_servers_to(10) + self.g.add_servers(servers) + + assignments = self.g.get_share_assignments() + self.failUnless(self.g.is_healthy()) + self.failUnless(len(assignments), 10) + + + def test_happinessgraph_homeless_readonly(self): + # servers with shares already on them shouldn't be assigned new + # shares if they're not writable + self.g = HappinessGraph(1, 2, 4) + servers = self.make_servers_to(1) + self.g.add_servers(servers) + + self.g.add_server_with_share("server2", 2) + assignments = self.g.get_share_assignments() + + # We should have no homeless shares, but we should have 3 + # allocations going to server 0. + self.failUnless(len(assignments), 1) + self.failUnless(self.g.is_healthy()) + self.failUnlessIn("server0", assignments) + self.failUnlessEqual(len(assignments["server0"]), 3) + + + def test_happinessgraph_homeless_existing_writable(self): + # servers with shares already on them should be assigned + # homeless shares if they're writable. + self.g = HappinessGraph(1, 2, 4) + servers = self.make_servers_to(2) + self.g.add_servers(servers) + + self.g.add_server_with_share("server1", 1) + assignments = self.g.get_share_assignments() + + # We should have at least one assignment for each of these + # servers. + self.failUnlessEqual(len(assignments), 2) + self.failUnlessIn("server1", assignments) + self.failUnlessEqual(len(assignments["server1"]), 1) + self.failUnlessIn("server0", assignments) + self.failUnlessEqual(len(assignments["server0"]), 2) + + # TODO: # upload with exactly 75 servers (shares_of_happiness) # have a download fail } Context: [docs: FTP-and-SFTP.rst: fix a minor error and update the information about which version of Twisted fixes #1297 zooko@zooko.com**20110428055232 Ignore-this: b63cfb4ebdbe32fb3b5f885255db4d39 ] [munin tahoe_files plugin: fix incorrect file count francois@ctrlaltdel.ch**20110428055312 Ignore-this: 334ba49a0bbd93b4a7b06a25697aba34 fixes #1391 ] [corrected "k must never be smaller than N" to "k must never be greater than N" secorp@allmydata.org**20110425010308 Ignore-this: 233129505d6c70860087f22541805eac ] [Fix a test failure in test_package_initialization on Python 2.4.x due to exceptions being stringified differently than in later versions of Python. refs #1389 david-sarah@jacaranda.org**20110411190738 Ignore-this: 7847d26bc117c328c679f08a7baee519 ] [tests: add test for including the ImportError message and traceback entry in the summary of errors from importing dependencies. refs #1389 david-sarah@jacaranda.org**20110410155844 Ignore-this: fbecdbeb0d06a0f875fe8d4030aabafa ] [allmydata/__init__.py: preserve the message and last traceback entry (file, line number, function, and source line) of ImportErrors in the package versions string. fixes #1389 david-sarah@jacaranda.org**20110410155705 Ignore-this: 2f87b8b327906cf8bfca9440a0904900 ] [remove unused variable detected by pyflakes zooko@zooko.com**20110407172231 Ignore-this: 7344652d5e0720af822070d91f03daf9 ] [allmydata/__init__.py: Nicer reporting of unparseable version numbers in dependencies. fixes #1388 david-sarah@jacaranda.org**20110401202750 Ignore-this: 9c6bd599259d2405e1caadbb3e0d8c7f ] [update FTP-and-SFTP.rst: the necessary patch is included in Twisted-10.1 Brian Warner **20110325232511 Ignore-this: d5307faa6900f143193bfbe14e0f01a ] [control.py: remove all uses of s.get_serverid() warner@lothar.com**20110227011203 Ignore-this: f80a787953bd7fa3d40e828bde00e855 ] [web: remove some uses of s.get_serverid(), not all warner@lothar.com**20110227011159 Ignore-this: a9347d9cf6436537a47edc6efde9f8be ] [immutable/downloader/fetcher.py: remove all get_serverid() calls warner@lothar.com**20110227011156 Ignore-this: fb5ef018ade1749348b546ec24f7f09a ] [immutable/downloader/fetcher.py: fix diversity bug in server-response handling warner@lothar.com**20110227011153 Ignore-this: bcd62232c9159371ae8a16ff63d22c1b When blocks terminate (either COMPLETE or CORRUPT/DEAD/BADSEGNUM), the _shares_from_server dict was being popped incorrectly (using shnum as the index instead of serverid). I'm still thinking through the consequences of this bug. It was probably benign and really hard to detect. I think it would cause us to incorrectly believe that we're pulling too many shares from a server, and thus prefer a different server rather than asking for a second share from the first server. The diversity code is intended to spread out the number of shares simultaneously being requested from each server, but with this bug, it might be spreading out the total number of shares requested at all, not just simultaneously. (note that SegmentFetcher is scoped to a single segment, so the effect doesn't last very long). ] [immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps warner@lothar.com**20110227011150 Ignore-this: d8d56dd8e7b280792b40105e13664554 test_download.py: create+check MyShare instances better, make sure they share Server objects, now that finder.py cares ] [immutable/downloader/finder.py: reduce use of get_serverid(), one left warner@lothar.com**20110227011146 Ignore-this: 5785be173b491ae8a78faf5142892020 ] [immutable/offloaded.py: reduce use of get_serverid() a bit more warner@lothar.com**20110227011142 Ignore-this: b48acc1b2ae1b311da7f3ba4ffba38f ] [immutable/upload.py: reduce use of get_serverid() warner@lothar.com**20110227011138 Ignore-this: ffdd7ff32bca890782119a6e9f1495f6 ] [immutable/checker.py: remove some uses of s.get_serverid(), not all warner@lothar.com**20110227011134 Ignore-this: e480a37efa9e94e8016d826c492f626e ] [add remaining get_* methods to storage_client.Server, NoNetworkServer, and warner@lothar.com**20110227011132 Ignore-this: 6078279ddf42b179996a4b53bee8c421 MockIServer stubs ] [upload.py: rearrange _make_trackers a bit, no behavior changes warner@lothar.com**20110227011128 Ignore-this: 296d4819e2af452b107177aef6ebb40f ] [happinessutil.py: finally rename merge_peers to merge_servers warner@lothar.com**20110227011124 Ignore-this: c8cd381fea1dd888899cb71e4f86de6e ] [test_upload.py: factor out FakeServerTracker warner@lothar.com**20110227011120 Ignore-this: 6c182cba90e908221099472cc159325b ] [test_upload.py: server-vs-tracker cleanup warner@lothar.com**20110227011115 Ignore-this: 2915133be1a3ba456e8603885437e03 ] [happinessutil.py: server-vs-tracker cleanup warner@lothar.com**20110227011111 Ignore-this: b856c84033562d7d718cae7cb01085a9 ] [upload.py: more tracker-vs-server cleanup warner@lothar.com**20110227011107 Ignore-this: bb75ed2afef55e47c085b35def2de315 ] [upload.py: fix var names to avoid confusion between 'trackers' and 'servers' warner@lothar.com**20110227011103 Ignore-this: 5d5e3415b7d2732d92f42413c25d205d ] [refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload warner@lothar.com**20110227011100 Ignore-this: 7ea858755cbe5896ac212a925840fe68 No behavioral changes, just updating variable/method names and log messages. The effects outside these three files should be minimal: some exception messages changed (to say "server" instead of "peer"), and some internal class names were changed. A few things still use "peer" to minimize external changes, like UploadResults.timings["peer_selection"] and happinessutil.merge_peers, which can be changed later. ] [storage_client.py: clean up test_add_server/test_add_descriptor, remove .test_servers warner@lothar.com**20110227011056 Ignore-this: efad933e78179d3d5fdcd6d1ef2b19cc ] [test_client.py, upload.py:: remove KiB/MiB/etc constants, and other dead code warner@lothar.com**20110227011051 Ignore-this: dc83c5794c2afc4f81e592f689c0dc2d ] [test: increase timeout on a network test because Francois's ARM machine hit that timeout zooko@zooko.com**20110317165909 Ignore-this: 380c345cdcbd196268ca5b65664ac85b I'm skeptical that the test was proceeding correctly but ran out of time. It seems more likely that it had gotten hung. But if we raise the timeout to an even more extravagant number then we can be even more certain that the test was never going to finish. ] [docs/configuration.rst: add a "Frontend Configuration" section Brian Warner **20110222014323 Ignore-this: 657018aa501fe4f0efef9851628444ca this points to docs/frontends/*.rst, which were previously underlinked ] [web/filenode.py: avoid calling req.finish() on closed HTTP connections. Closes #1366 "Brian Warner "**20110221061544 Ignore-this: 799d4de19933f2309b3c0c19a63bb888 ] [Add unit tests for cross_check_pkg_resources_versus_import, and a regression test for ref #1355. This requires a little refactoring to make it testable. david-sarah@jacaranda.org**20110221015817 Ignore-this: 51d181698f8c20d3aca58b057e9c475a ] [allmydata/__init__.py: .name was used in place of the correct .__name__ when printing an exception. Also, robustify string formatting by using %r instead of %s in some places. fixes #1355. david-sarah@jacaranda.org**20110221020125 Ignore-this: b0744ed58f161bf188e037bad077fc48 ] [Refactor StorageFarmBroker handling of servers Brian Warner **20110221015804 Ignore-this: 842144ed92f5717699b8f580eab32a51 Pass around IServer instance instead of (peerid, rref) tuple. Replace "descriptor" with "server". Other replacements: get_all_servers -> get_connected_servers/get_known_servers get_servers_for_index -> get_servers_for_psi (now returns IServers) This change still needs to be pushed further down: lots of code is now getting the IServer and then distributing (peerid, rref) internally. Instead, it ought to distribute the IServer internally and delay extracting a serverid or rref until the last moment. no_network.py was updated to retain parallelism. ] [TAG allmydata-tahoe-1.8.2 warner@lothar.com**20110131020101] Patch bundle hash: cc3f1072d6970ca2c498ad74043a5f4c6ffa73ac