20 patches for repository /Users/warner2/stuff/tahoe/trunk: Sat Feb 26 17:10:51 PST 2011 warner@lothar.com * test_client.py, upload.py:: remove KiB/MiB/etc constants, and other dead code Sat Feb 26 17:10:56 PST 2011 warner@lothar.com * storage_client.py: clean up test_add_server/test_add_descriptor, remove .test_servers Sat Feb 26 17:11:00 PST 2011 warner@lothar.com * refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload No behavioral changes, just updating variable/method names and log messages. The effects outside these three files should be minimal: some exception messages changed (to say "server" instead of "peer"), and some internal class names were changed. A few things still use "peer" to minimize external changes, like UploadResults.timings["peer_selection"] and happinessutil.merge_peers, which can be changed later. Sat Feb 26 17:11:03 PST 2011 warner@lothar.com * upload.py: fix var names to avoid confusion between 'trackers' and 'servers' Sat Feb 26 17:11:07 PST 2011 warner@lothar.com * upload.py: more tracker-vs-server cleanup Sat Feb 26 17:11:11 PST 2011 warner@lothar.com * happinessutil.py: server-vs-tracker cleanup Sat Feb 26 17:11:15 PST 2011 warner@lothar.com * test_upload.py: server-vs-tracker cleanup Sat Feb 26 17:11:20 PST 2011 warner@lothar.com * test_upload.py: factor out FakeServerTracker Sat Feb 26 17:11:24 PST 2011 warner@lothar.com * happinessutil.py: finally rename merge_peers to merge_servers Sat Feb 26 17:11:28 PST 2011 warner@lothar.com * upload.py: rearrange _make_trackers a bit, no behavior changes Sat Feb 26 17:11:32 PST 2011 warner@lothar.com * add remaining get_* methods to storage_client.Server, NoNetworkServer, and MockIServer stubs Sat Feb 26 17:11:34 PST 2011 warner@lothar.com * immutable/checker.py: remove some uses of s.get_serverid(), not all Sat Feb 26 17:11:38 PST 2011 warner@lothar.com * immutable/upload.py: reduce use of get_serverid() Sat Feb 26 17:11:42 PST 2011 warner@lothar.com * immutable/offloaded.py: reduce use of get_serverid() a bit more Sat Feb 26 17:11:46 PST 2011 warner@lothar.com * immutable/downloader/finder.py: reduce use of get_serverid(), one left Sat Feb 26 17:11:50 PST 2011 warner@lothar.com * immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps test_download.py: create+check MyShare instances better, make sure they share Server objects, now that finder.py cares Sat Feb 26 17:11:53 PST 2011 warner@lothar.com * immutable/downloader/fetcher.py: fix diversity bug in server-response handling When blocks terminate (either COMPLETE or CORRUPT/DEAD/BADSEGNUM), the _shares_from_server dict was being popped incorrectly (using shnum as the index instead of serverid). I'm still thinking through the consequences of this bug. It was probably benign and really hard to detect. I think it would cause us to incorrectly believe that we're pulling too many shares from a server, and thus prefer a different server rather than asking for a second share from the first server. The diversity code is intended to spread out the number of shares simultaneously being requested from each server, but with this bug, it might be spreading out the total number of shares requested at all, not just simultaneously. (note that SegmentFetcher is scoped to a single segment, so the effect doesn't last very long). Sat Feb 26 17:11:56 PST 2011 warner@lothar.com * immutable/downloader/fetcher.py: remove all get_serverid() calls Sat Feb 26 17:11:59 PST 2011 warner@lothar.com * web: remove some uses of s.get_serverid(), not all Sat Feb 26 17:12:03 PST 2011 warner@lothar.com * control.py: remove all uses of s.get_serverid() New patches: [test_client.py, upload.py:: remove KiB/MiB/etc constants, and other dead code warner@lothar.com**20110227011051 Ignore-this: dc83c5794c2afc4f81e592f689c0dc2d ] { hunk ./src/allmydata/immutable/upload.py 31 from cStringIO import StringIO -KiB=1024 -MiB=1024*KiB -GiB=1024*MiB -TiB=1024*GiB -PiB=1024*TiB - -class HaveAllPeersError(Exception): - # we use this to jump out of the loop - pass - # this wants to live in storage, not here class TooFullError(Exception): pass hunk ./src/allmydata/test/test_client.py 10 import allmydata from allmydata import client from allmydata.storage_client import StorageFarmBroker -from allmydata.introducer.client import IntroducerClient from allmydata.util import base32, fileutil from allmydata.interfaces import IFilesystemNode, IFileNode, \ IImmutableFileNode, IMutableFileNode, IDirectoryNode hunk ./src/allmydata/test/test_client.py 16 from foolscap.api import flushEventualQueue import allmydata.test.common_util as testutil -class FakeIntroducerClient(IntroducerClient): - def __init__(self): - self._connections = set() - def add_peer(self, nodeid): - entry = (nodeid, "storage", "rref") - self._connections.add(entry) - def remove_all_peers(self): - self._connections.clear() - BASECONFIG = ("[client]\n" "introducer.furl = \n" ) } [storage_client.py: clean up test_add_server/test_add_descriptor, remove .test_servers warner@lothar.com**20110227011056 Ignore-this: efad933e78179d3d5fdcd6d1ef2b19cc ] { hunk ./src/allmydata/storage_client.py 74 # own Reconnector, and will give us a RemoteReference when we ask # them for it. self.servers = {} - # self.test_servers are statically configured from unit tests - self.test_servers = {} # serverid -> rref self.introducer_client = None # these two are used in unit tests hunk ./src/allmydata/storage_client.py 77 - def test_add_server(self, serverid, rref): - self.test_servers[serverid] = rref - def test_add_descriptor(self, serverid, dsc): - self.servers[serverid] = dsc + def test_add_rref(self, serverid, rref): + s = NativeStorageServer(serverid, {}) + s.rref = rref + self.servers[serverid] = s + + def test_add_server(self, serverid, s): + self.servers[serverid] = s def use_introducer(self, introducer_client): self.introducer_client = ic = introducer_client hunk ./src/allmydata/storage_client.py 128 def get_all_serverids(self): serverids = set() - serverids.update(self.test_servers.keys()) serverids.update(self.servers.keys()) return frozenset(serverids) hunk ./src/allmydata/storage_client.py 136 if s.get_rref()]) def get_known_servers(self): - servers = [] - for serverid,rref in self.test_servers.items(): - s = NativeStorageServer(serverid, {}) - s.rref = rref - servers.append(s) - servers.extend(self.servers.values()) - return sorted(servers, key=lambda s: s.get_serverid()) + return sorted(self.servers.values(), key=lambda s: s.get_serverid()) def get_nickname_for_serverid(self, serverid): if serverid in self.servers: hunk ./src/allmydata/test/test_checker.py 31 "my-version": "ver", "oldest-supported": "oldest", } - dsc = NativeStorageServer(peerid, ann_d) - sb.test_add_descriptor(peerid, dsc) + s = NativeStorageServer(peerid, ann_d) + sb.test_add_server(peerid, s) c = FakeClient() c.storage_broker = sb return c hunk ./src/allmydata/test/test_client.py 132 def test_permute(self): sb = StorageFarmBroker(None, True) for k in ["%d" % i for i in range(5)]: - sb.test_add_server(k, "rref") + sb.test_add_rref(k, "rref") self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2']) self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3']) hunk ./src/allmydata/test/test_client.py 136 - sb.test_servers.clear() + sb.servers.clear() self.failUnlessReallyEqual(self._permute(sb, "one"), []) def test_versions(self): hunk ./src/allmydata/test/test_mutable.py 191 storage_broker = StorageFarmBroker(None, True) for peerid in peerids: fss = FakeStorageServer(peerid, s) - storage_broker.test_add_server(peerid, fss) + storage_broker.test_add_rref(peerid, fss) return storage_broker def make_nodemaker(s=None, num_peers=10): hunk ./src/allmydata/test/test_upload.py 199 peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) for fakeid in range(self.num_servers) ] self.storage_broker = StorageFarmBroker(None, permute_peers=True) - for (serverid, server) in peers: - self.storage_broker.test_add_server(serverid, server) + for (serverid, rref) in peers: + self.storage_broker.test_add_rref(serverid, rref) self.last_peers = [p[1] for p in peers] def log(self, *args, **kwargs): } [refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload warner@lothar.com**20110227011100 Ignore-this: 7ea858755cbe5896ac212a925840fe68 No behavioral changes, just updating variable/method names and log messages. The effects outside these three files should be minimal: some exception messages changed (to say "server" instead of "peer"), and some internal class names were changed. A few things still use "peer" to minimize external changes, like UploadResults.timings["peer_selection"] and happinessutil.merge_peers, which can be changed later. ] { hunk ./src/allmydata/immutable/upload.py 71 def pretty_print_shnum_to_servers(s): return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) -class PeerTracker: - def __init__(self, peerid, storage_server, +class ServerTracker: + def __init__(self, serverid, storage_server, sharesize, blocksize, num_segments, num_share_hashes, storage_index, bucket_renewal_secret, bucket_cancel_secret): hunk ./src/allmydata/immutable/upload.py 76 - precondition(isinstance(peerid, str), peerid) - precondition(len(peerid) == 20, peerid) - self.peerid = peerid + precondition(isinstance(serverid, str), serverid) + precondition(len(serverid) == 20, serverid) + self.serverid = serverid self._storageserver = storage_server # to an RIStorageServer self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize hunk ./src/allmydata/immutable/upload.py 86 wbp = layout.make_write_bucket_proxy(None, sharesize, blocksize, num_segments, num_share_hashes, - EXTENSION_SIZE, peerid) + EXTENSION_SIZE, serverid) self.wbp_class = wbp.__class__ # to create more of them self.allocated_size = wbp.get_allocated_size() self.blocksize = blocksize hunk ./src/allmydata/immutable/upload.py 98 self.cancel_secret = bucket_cancel_secret def __repr__(self): - return ("" - % (idlib.shortnodeid_b2a(self.peerid), + return ("" + % (idlib.shortnodeid_b2a(self.serverid), si_b2a(self.storage_index)[:5])) def query(self, sharenums): hunk ./src/allmydata/immutable/upload.py 126 self.num_segments, self.num_share_hashes, EXTENSION_SIZE, - self.peerid) + self.serverid) b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys())) hunk ./src/allmydata/immutable/upload.py 152 def str_shareloc(shnum, bucketwriter): return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),) -class Tahoe2PeerSelector(log.PrefixingLogMixin): +class Tahoe2ServerSelector(log.PrefixingLogMixin): def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id hunk ./src/allmydata/immutable/upload.py 157 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 - # Peers that are working normally, but full. + # Servers that are working normally, but full. self.full_count = 0 self.error_count = 0 hunk ./src/allmydata/immutable/upload.py 160 - self.num_peers_contacted = 0 + self.num_servers_contacted = 0 self.last_failure_msg = None self._status = IUploadStatus(upload_status) log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) hunk ./src/allmydata/immutable/upload.py 167 self.log("starting", level=log.OPERATIONAL) def __repr__(self): - return "" % self.upload_id + return "" % self.upload_id def get_shareholders(self, storage_broker, secret_holder, storage_index, share_size, block_size, hunk ./src/allmydata/immutable/upload.py 174 num_segments, total_shares, needed_shares, servers_of_happiness): """ - @return: (upload_servers, already_peers), where upload_servers is a set of - PeerTracker instances that have agreed to hold some shares - for us (the shareids are stashed inside the PeerTracker), - and already_peers is a dict mapping shnum to a set of peers - which claim to already have the share. + @return: (upload_servers, already_servers), where upload_servers is + a set of ServerTracker instances that have agreed to hold + some shares for us (the shareids are stashed inside the + ServerTracker), and already_servers is a dict mapping shnum + to a set of servers which claim to already have the share. """ if self._status: hunk ./src/allmydata/immutable/upload.py 182 - self._status.set_status("Contacting Peers..") + self._status.set_status("Contacting Servers..") self.total_shares = total_shares self.servers_of_happiness = servers_of_happiness hunk ./src/allmydata/immutable/upload.py 189 self.needed_shares = needed_shares self.homeless_shares = set(range(total_shares)) - self.contacted_peers = [] # peers worth asking again - self.contacted_peers2 = [] # peers that we have asked again + self.contacted_servers = [] # servers worth asking again + self.contacted_servers2 = [] # servers that we have asked again self._started_second_pass = False hunk ./src/allmydata/immutable/upload.py 192 - self.use_peers = set() # PeerTrackers that have shares assigned to them - self.preexisting_shares = {} # shareid => set(peerids) holding shareid + self.use_servers = set() # ServerTrackers that have shares assigned + # to them + self.preexisting_shares = {} # shareid => set(serverids) holding shareid # We don't try to allocate shares to these servers, since they've said # that they're incapable of storing shares of the size that we'd want # to store. We keep them around because they may have existing shares hunk ./src/allmydata/immutable/upload.py 201 # for this storage index, which we want to know about for accurate # servers_of_happiness accounting # (this is eventually a list, but it is initialized later) - self.readonly_peers = None - # These peers have shares -- any shares -- for our SI. We keep + self.readonly_servers = None + # These servers have shares -- any shares -- for our SI. We keep # track of these to write an error message with them later. hunk ./src/allmydata/immutable/upload.py 204 - self.peers_with_shares = set() + self.servers_with_shares = set() # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree hunk ./src/allmydata/immutable/upload.py 218 num_share_hashes, EXTENSION_SIZE, None) allocated_size = wbp.get_allocated_size() - all_peers = [(s.get_serverid(), s.get_rref()) - for s in storage_broker.get_servers_for_psi(storage_index)] - if not all_peers: - raise NoServersError("client gave us zero peers") + all_servers = [(s.get_serverid(), s.get_rref()) + for s in storage_broker.get_servers_for_psi(storage_index)] + if not all_servers: + raise NoServersError("client gave us zero servers") hunk ./src/allmydata/immutable/upload.py 223 - # filter the list of peers according to which ones can accomodate - # this request. This excludes older peers (which used a 4-byte size + # filter the list of servers according to which ones can accomodate + # this request. This excludes older servers (which used a 4-byte size # field) from getting large shares (for files larger than about # 12GiB). See #439 for details. hunk ./src/allmydata/immutable/upload.py 227 - def _get_maxsize(peer): - (peerid, conn) = peer + def _get_maxsize(server): + (serverid, conn) = server v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] hunk ./src/allmydata/immutable/upload.py 231 - writable_peers = [peer for peer in all_peers - if _get_maxsize(peer) >= allocated_size] - readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers) + writable_servers = [server for server in all_servers + if _get_maxsize(server) >= allocated_size] + readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers) # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. hunk ./src/allmydata/immutable/upload.py 244 storage_index) file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) - def _make_trackers(peers): - return [PeerTracker(peerid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - peerid), - bucket_cancel_secret_hash(file_cancel_secret, - peerid)) - for (peerid, conn) in peers] - self.uncontacted_peers = _make_trackers(writable_peers) - self.readonly_peers = _make_trackers(readonly_peers) - # We now ask peers that can't hold any new shares about existing + def _make_trackers(servers): + return [ServerTracker(serverid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + bucket_renewal_secret_hash(file_renewal_secret, + serverid), + bucket_cancel_secret_hash(file_cancel_secret, + serverid)) + for (serverid, conn) in servers] + self.uncontacted_servers = _make_trackers(writable_servers) + self.readonly_servers = _make_trackers(readonly_servers) + # We now ask servers that can't hold any new shares about existing # shares that they might have for our SI. Once this is done, we # start placing the shares that we haven't already accounted # for. hunk ./src/allmydata/immutable/upload.py 261 ds = [] - if self._status and self.readonly_peers: - self._status.set_status("Contacting readonly peers to find " + if self._status and self.readonly_servers: + self._status.set_status("Contacting readonly servers to find " "any existing shares") hunk ./src/allmydata/immutable/upload.py 264 - for peer in self.readonly_peers: - assert isinstance(peer, PeerTracker) - d = peer.ask_about_existing_shares() - d.addBoth(self._handle_existing_response, peer.peerid) + for server in self.readonly_servers: + assert isinstance(server, ServerTracker) + d = server.ask_about_existing_shares() + d.addBoth(self._handle_existing_response, server.serverid) ds.append(d) hunk ./src/allmydata/immutable/upload.py 269 - self.num_peers_contacted += 1 + self.num_servers_contacted += 1 self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 271 - self.log("asking peer %s for any existing shares" % - (idlib.shortnodeid_b2a(peer.peerid),), + self.log("asking server %s for any existing shares" % + (idlib.shortnodeid_b2a(server.serverid),), level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) hunk ./src/allmydata/immutable/upload.py 279 return dl - def _handle_existing_response(self, res, peer): + def _handle_existing_response(self, res, server): """ I handle responses to the queries sent by hunk ./src/allmydata/immutable/upload.py 282 - Tahoe2PeerSelector._existing_shares. + Tahoe2ServerSelector._existing_shares. """ if isinstance(res, failure.Failure): self.log("%s got error during existing shares check: %s" hunk ./src/allmydata/immutable/upload.py 286 - % (idlib.shortnodeid_b2a(peer), res), + % (idlib.shortnodeid_b2a(server), res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 293 else: buckets = res if buckets: - self.peers_with_shares.add(peer) - self.log("response to get_buckets() from peer %s: alreadygot=%s" - % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), + self.servers_with_shares.add(server) + self.log("response to get_buckets() from server %s: alreadygot=%s" + % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: hunk ./src/allmydata/immutable/upload.py 298 - self.preexisting_shares.setdefault(bucket, set()).add(peer) + self.preexisting_shares.setdefault(bucket, set()).add(server) self.homeless_shares.discard(bucket) self.full_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 314 len(self.homeless_shares))) return (msg + "want to place shares on at least %d servers such that " "any %d of them have enough shares to recover the file, " - "sent %d queries to %d peers, " + "sent %d queries to %d servers, " "%d queries placed some shares, %d placed none " "(of which %d placed none due to the server being" " full and %d placed none due to an error)" % hunk ./src/allmydata/immutable/upload.py 319 (self.servers_of_happiness, self.needed_shares, - self.query_count, self.num_peers_contacted, + self.query_count, self.num_servers_contacted, self.good_query_count, self.bad_query_count, self.full_count, self.error_count)) hunk ./src/allmydata/immutable/upload.py 326 def _loop(self): if not self.homeless_shares: - merged = merge_peers(self.preexisting_shares, self.use_peers) + merged = merge_peers(self.preexisting_shares, self.use_servers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " hunk ./src/allmydata/immutable/upload.py 330 - "self.use_peers: %s, self.preexisting_shares: %s") \ - % (self, self._get_progress_message(), - pretty_print_shnum_to_servers(merged), - [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()]) - for p in self.use_peers], - pretty_print_shnum_to_servers(self.preexisting_shares)) + "self.use_servers: %s, self.preexisting_shares: %s") \ + % (self, self._get_progress_message(), + pretty_print_shnum_to_servers(merged), + [', '.join([str_shareloc(k,v) + for k,v in s.buckets.iteritems()]) + for s in self.use_servers], + pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) hunk ./src/allmydata/immutable/upload.py 338 - return (self.use_peers, self.preexisting_shares) + return (self.use_servers, self.preexisting_shares) else: # We're not okay right now, but maybe we can fix it by # redistributing some shares. In cases where one or two hunk ./src/allmydata/immutable/upload.py 344 # servers has, before the upload, all or most of the # shares for a given SI, this can work by allowing _loop - # a chance to spread those out over the other peers, + # a chance to spread those out over the other servers, delta = self.servers_of_happiness - effective_happiness shares = shares_by_server(self.preexisting_shares) # Each server in shares maps to a set of shares stored on it. hunk ./src/allmydata/immutable/upload.py 355 shares_to_spread = sum([len(list(sharelist)) - 1 for (server, sharelist) in shares.items()]) - if delta <= len(self.uncontacted_peers) and \ + if delta <= len(self.uncontacted_servers) and \ shares_to_spread >= delta: items = shares.items() while len(self.homeless_shares) < delta: hunk ./src/allmydata/immutable/upload.py 371 if not self.preexisting_shares[share]: del self.preexisting_shares[share] items.append((server, sharelist)) - for writer in self.use_peers: + for writer in self.use_servers: writer.abort_some_buckets(self.homeless_shares) return self._loop() else: hunk ./src/allmydata/immutable/upload.py 376 # Redistribution won't help us; fail. - peer_count = len(self.peers_with_shares) - failmsg = failure_message(peer_count, - self.needed_shares, - self.servers_of_happiness, - effective_happiness) + server_count = len(self.servers_with_shares) + failmsg = failure_message(server_count, + self.needed_shares, + self.servers_of_happiness, + effective_happiness) servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s" servmsg = servmsgtempl % ( self, hunk ./src/allmydata/immutable/upload.py 391 self.log(servmsg, level=log.INFREQUENT) return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) - if self.uncontacted_peers: - peer = self.uncontacted_peers.pop(0) - # TODO: don't pre-convert all peerids to PeerTrackers - assert isinstance(peer, PeerTracker) + if self.uncontacted_servers: + server = self.uncontacted_servers.pop(0) + # TODO: don't pre-convert all serverids to ServerTrackers + assert isinstance(server, ServerTracker) shares_to_ask = set(sorted(self.homeless_shares)[:1]) self.homeless_shares -= shares_to_ask hunk ./src/allmydata/immutable/upload.py 399 self.query_count += 1 - self.num_peers_contacted += 1 + self.num_servers_contacted += 1 if self._status: hunk ./src/allmydata/immutable/upload.py 401 - self._status.set_status("Contacting Peers [%s] (first query)," + self._status.set_status("Contacting Servers [%s] (first query)," " %d shares left.." hunk ./src/allmydata/immutable/upload.py 403 - % (idlib.shortnodeid_b2a(peer.peerid), + % (idlib.shortnodeid_b2a(server.serverid), len(self.homeless_shares))) hunk ./src/allmydata/immutable/upload.py 405 - d = peer.query(shares_to_ask) - d.addBoth(self._got_response, peer, shares_to_ask, - self.contacted_peers) + d = server.query(shares_to_ask) + d.addBoth(self._got_response, server, shares_to_ask, + self.contacted_servers) return d hunk ./src/allmydata/immutable/upload.py 409 - elif self.contacted_peers: - # ask a peer that we've already asked. + elif self.contacted_servers: + # ask a server that we've already asked. if not self._started_second_pass: self.log("starting second pass", level=log.NOISY) hunk ./src/allmydata/immutable/upload.py 416 self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_peers)) - peer = self.contacted_peers.pop(0) + len(self.contacted_servers)) + server = self.contacted_servers.pop(0) shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) self.homeless_shares -= shares_to_ask self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 422 if self._status: - self._status.set_status("Contacting Peers [%s] (second query)," + self._status.set_status("Contacting Servers [%s] (second query)," " %d shares left.." hunk ./src/allmydata/immutable/upload.py 424 - % (idlib.shortnodeid_b2a(peer.peerid), + % (idlib.shortnodeid_b2a(server.serverid), len(self.homeless_shares))) hunk ./src/allmydata/immutable/upload.py 426 - d = peer.query(shares_to_ask) - d.addBoth(self._got_response, peer, shares_to_ask, - self.contacted_peers2) + d = server.query(shares_to_ask) + d.addBoth(self._got_response, server, shares_to_ask, + self.contacted_servers2) return d hunk ./src/allmydata/immutable/upload.py 430 - elif self.contacted_peers2: + elif self.contacted_servers2: # we've finished the second-or-later pass. Move all the remaining hunk ./src/allmydata/immutable/upload.py 432 - # peers back into self.contacted_peers for the next pass. - self.contacted_peers.extend(self.contacted_peers2) - self.contacted_peers2[:] = [] + # servers back into self.contacted_servers for the next pass. + self.contacted_servers.extend(self.contacted_servers2) + self.contacted_servers2[:] = [] return self._loop() else: hunk ./src/allmydata/immutable/upload.py 437 - # no more peers. If we haven't placed enough shares, we fail. - merged = merge_peers(self.preexisting_shares, self.use_peers) + # no more servers. If we haven't placed enough shares, we fail. + merged = merge_peers(self.preexisting_shares, self.use_servers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: hunk ./src/allmydata/immutable/upload.py 441 - msg = failure_message(len(self.peers_with_shares), + msg = failure_message(len(self.servers_with_shares), self.needed_shares, self.servers_of_happiness, effective_happiness) hunk ./src/allmydata/immutable/upload.py 445 - msg = ("peer selection failed for %s: %s (%s)" % (self, - msg, - self._get_progress_message())) + msg = ("server selection failed for %s: %s (%s)" % + (self, msg, self._get_progress_message())) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) self.log(msg, level=log.UNUSUAL) hunk ./src/allmydata/immutable/upload.py 458 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged))) self.log(msg, level=log.OPERATIONAL) - return (self.use_peers, self.preexisting_shares) + return (self.use_servers, self.preexisting_shares) hunk ./src/allmydata/immutable/upload.py 460 - def _got_response(self, res, peer, shares_to_ask, put_peer_here): + def _got_response(self, res, server, shares_to_ask, put_server_here): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. hunk ./src/allmydata/immutable/upload.py 464 - self.log("%s got error during peer selection: %s" % (peer, res), + self.log("%s got error during server selection: %s" % (server, res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 469 self.homeless_shares |= shares_to_ask - if (self.uncontacted_peers - or self.contacted_peers - or self.contacted_peers2): + if (self.uncontacted_servers + or self.contacted_servers + or self.contacted_servers2): # there is still hope, so just loop pass else: hunk ./src/allmydata/immutable/upload.py 475 - # No more peers, so this upload might fail (it depends upon + # No more servers, so this upload might fail (it depends upon # whether we've hit servers_of_happiness or not). Log the last hunk ./src/allmydata/immutable/upload.py 477 - # failure we got: if a coding error causes all peers to fail + # failure we got: if a coding error causes all servers to fail # in the same way, this allows the common failure to be seen # by the uploader and should help with debugging hunk ./src/allmydata/immutable/upload.py 480 - msg = ("last failure (from %s) was: %s" % (peer, res)) + msg = ("last failure (from %s) was: %s" % (server, res)) self.last_failure_msg = msg else: (alreadygot, allocated) = res hunk ./src/allmydata/immutable/upload.py 484 - self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s" - % (idlib.shortnodeid_b2a(peer.peerid), + self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" + % (idlib.shortnodeid_b2a(server.serverid), tuple(sorted(alreadygot)), tuple(sorted(allocated))), level=log.NOISY) progress = False hunk ./src/allmydata/immutable/upload.py 490 for s in alreadygot: - self.preexisting_shares.setdefault(s, set()).add(peer.peerid) + self.preexisting_shares.setdefault(s, set()).add(server.serverid) if s in self.homeless_shares: self.homeless_shares.remove(s) progress = True hunk ./src/allmydata/immutable/upload.py 497 elif s in shares_to_ask: progress = True - # the PeerTracker will remember which shares were allocated on + # the ServerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. if allocated: hunk ./src/allmydata/immutable/upload.py 500 - self.use_peers.add(peer) + self.use_servers.add(server) progress = True if allocated or alreadygot: hunk ./src/allmydata/immutable/upload.py 504 - self.peers_with_shares.add(peer.peerid) + self.servers_with_shares.add(server.serverid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) hunk ./src/allmydata/immutable/upload.py 521 if still_homeless: # In networks with lots of space, this is very unusual and - # probably indicates an error. In networks with peers that + # probably indicates an error. In networks with servers that # are full, it is merely unusual. In networks that are very # full, it is common, and many uploads will fail. In most # cases, this is obviously not fatal, and we'll just use some hunk ./src/allmydata/immutable/upload.py 525 - # other peers. + # other servers. # some shares are still homeless, keep trying to find them a # home. The ones that were rejected get first priority. hunk ./src/allmydata/immutable/upload.py 535 else: # if they *were* able to accept everything, they might be # willing to accept even more. - put_peer_here.append(peer) + put_server_here.append(server) # now loop return self._loop() hunk ./src/allmydata/immutable/upload.py 543 def _failed(self, msg): """ - I am called when peer selection fails. I first abort all of the + I am called when server selection fails. I first abort all of the remote buckets that I allocated during my unsuccessful attempt to place shares for this file. I then raise an UploadUnhappinessError with my msg argument. hunk ./src/allmydata/immutable/upload.py 548 """ - for peer in self.use_peers: - assert isinstance(peer, PeerTracker) + for server in self.use_servers: + assert isinstance(server, ServerTracker) hunk ./src/allmydata/immutable/upload.py 551 - peer.abort() + server.abort() raise UploadUnhappinessError(msg) hunk ./src/allmydata/immutable/upload.py 829 self.results = value class CHKUploader: - peer_selector_class = Tahoe2PeerSelector + server_selector_class = Tahoe2ServerSelector def __init__(self, storage_broker, secret_holder): hunk ./src/allmydata/immutable/upload.py 832 - # peer_selector needs storage_broker and secret_holder + # server_selector needs storage_broker and secret_holder self._storage_broker = storage_broker self._secret_holder = secret_holder self._log_number = self.log("CHKUploader starting", parent=None) hunk ./src/allmydata/immutable/upload.py 845 self._upload_status.set_results(self._results) # locate_all_shareholders() will create the following attribute: - # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker + # self._server_trackers = {} # k: shnum, v: instance of ServerTracker def log(self, *args, **kwargs): if "parent" not in kwargs: hunk ./src/allmydata/immutable/upload.py 896 return d def locate_all_shareholders(self, encoder, started): - peer_selection_started = now = time.time() + server_selection_started = now = time.time() self._storage_index_elapsed = now - started storage_broker = self._storage_broker secret_holder = self._secret_holder hunk ./src/allmydata/immutable/upload.py 904 self._storage_index = storage_index upload_id = si_b2a(storage_index)[:5] self.log("using storage index %s" % upload_id) - peer_selector = self.peer_selector_class(upload_id, self._log_number, - self._upload_status) + server_selector = self.server_selector_class(upload_id, + self._log_number, + self._upload_status) share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") hunk ./src/allmydata/immutable/upload.py 913 num_segments = encoder.get_param("num_segments") k,desired,n = encoder.get_param("share_counts") - self._peer_selection_started = time.time() - d = peer_selector.get_shareholders(storage_broker, secret_holder, - storage_index, - share_size, block_size, - num_segments, n, k, desired) + self._server_selection_started = time.time() + d = server_selector.get_shareholders(storage_broker, secret_holder, + storage_index, + share_size, block_size, + num_segments, n, k, desired) def _done(res): hunk ./src/allmydata/immutable/upload.py 919 - self._peer_selection_elapsed = time.time() - peer_selection_started + self._server_selection_elapsed = time.time() - server_selection_started return res d.addCallback(_done) return d hunk ./src/allmydata/immutable/upload.py 924 - def set_shareholders(self, (upload_servers, already_peers), encoder): + def set_shareholders(self, (upload_servers, already_servers), encoder): """ hunk ./src/allmydata/immutable/upload.py 926 - @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some - shares for us (the shareids are stashed inside the PeerTracker) - @paran already_peers: a dict mapping sharenum to a set of peerids - that claim to already have this share + @param upload_servers: a sequence of ServerTracker objects that + have agreed to hold some shares for us (the + shareids are stashed inside the ServerTracker) + @paran already_servers: a dict mapping sharenum to a set of serverids + that claim to already have this share """ hunk ./src/allmydata/immutable/upload.py 932 - msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s" - values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()]) - for p in upload_servers], already_peers) + msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s" + values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()]) + for s in upload_servers], already_servers) self.log(msgtempl % values, level=log.OPERATIONAL) # record already-present shares in self._results hunk ./src/allmydata/immutable/upload.py 937 - self._results.preexisting_shares = len(already_peers) + self._results.preexisting_shares = len(already_servers) hunk ./src/allmydata/immutable/upload.py 939 - self._peer_trackers = {} # k: shnum, v: instance of PeerTracker - for peer in upload_servers: - assert isinstance(peer, PeerTracker) + self._server_trackers = {} # k: shnum, v: instance of ServerTracker + for server in upload_servers: + assert isinstance(server, ServerTracker) buckets = {} hunk ./src/allmydata/immutable/upload.py 943 - servermap = already_peers.copy() - for peer in upload_servers: - buckets.update(peer.buckets) - for shnum in peer.buckets: - self._peer_trackers[shnum] = peer - servermap.setdefault(shnum, set()).add(peer.peerid) - assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \ + servermap = already_servers.copy() + for server in upload_servers: + buckets.update(server.buckets) + for shnum in server.buckets: + self._server_trackers[shnum] = server + servermap.setdefault(shnum, set()).add(server.serverid) + assert len(buckets) == sum([len(server.buckets) + for server in upload_servers]), \ "%s (%s) != %s (%s)" % ( len(buckets), buckets, hunk ./src/allmydata/immutable/upload.py 954 - sum([len(peer.buckets) for peer in upload_servers]), - [(p.buckets, p.peerid) for p in upload_servers] + sum([len(server.buckets) for server in upload_servers]), + [(s.buckets, s.serverid) for s in upload_servers] ) encoder.set_shareholders(buckets, servermap) hunk ./src/allmydata/immutable/upload.py 963 """ Returns a Deferred that will fire with the UploadResults instance. """ r = self._results for shnum in self._encoder.get_shares_placed(): - peer_tracker = self._peer_trackers[shnum] - peerid = peer_tracker.peerid - r.sharemap.add(shnum, peerid) - r.servermap.add(peerid, shnum) + server_tracker = self._server_trackers[shnum] + serverid = server_tracker.serverid + r.sharemap.add(shnum, serverid) + r.servermap.add(serverid, shnum) r.pushed_shares = len(self._encoder.get_shares_placed()) now = time.time() r.file_size = self._encoder.file_size hunk ./src/allmydata/immutable/upload.py 972 r.timings["total"] = now - self._started r.timings["storage_index"] = self._storage_index_elapsed - r.timings["peer_selection"] = self._peer_selection_elapsed + r.timings["peer_selection"] = self._server_selection_elapsed r.timings.update(self._encoder.get_times()) r.uri_extension_data = self._encoder.get_uri_extension_data() r.verifycapstr = verifycap.to_string() hunk ./src/allmydata/test/test_upload.py 196 self.num_servers = num_servers if type(mode) is str: mode = dict([i,mode] for i in range(num_servers)) - peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) - for fakeid in range(self.num_servers) ] + servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) + for fakeid in range(self.num_servers) ] self.storage_broker = StorageFarmBroker(None, permute_peers=True) hunk ./src/allmydata/test/test_upload.py 199 - for (serverid, rref) in peers: + for (serverid, rref) in servers: self.storage_broker.test_add_rref(serverid, rref) hunk ./src/allmydata/test/test_upload.py 201 - self.last_peers = [p[1] for p in peers] + self.last_servers = [s[1] for s in servers] def log(self, *args, **kwargs): pass hunk ./src/allmydata/test/test_upload.py 414 def test_first_error_all(self): self.make_node("first-fail") d = self.shouldFail(UploadUnhappinessError, "first_error_all", - "peer selection failed", + "server selection failed", upload_data, self.u, DATA) def _check((f,)): self.failUnlessIn("placed 0 shares out of 100 total", str(f.value)) hunk ./src/allmydata/test/test_upload.py 446 def test_second_error_all(self): self.make_node("second-fail") d = self.shouldFail(UploadUnhappinessError, "second_error_all", - "peer selection failed", + "server selection failed", upload_data, self.u, DATA) def _check((f,)): self.failUnlessIn("placed 10 shares out of 100 total", str(f.value)) hunk ./src/allmydata/test/test_upload.py 471 d.addBoth(self._should_fail) return d -class PeerSelection(unittest.TestCase): +class ServerSelection(unittest.TestCase): def make_client(self, num_servers=50): self.node = FakeClient(mode="good", num_servers=num_servers) hunk ./src/allmydata/test/test_upload.py 500 self.node.DEFAULT_ENCODING_PARAMETERS = p def test_one_each(self): - # if we have 50 shares, and there are 50 peers, and they all accept a - # share, we should get exactly one share per peer + # if we have 50 shares, and there are 50 servers, and they all accept + # a share, we should get exactly one share per server self.make_client() data = self.get_data(SIZE_LARGE) hunk ./src/allmydata/test/test_upload.py 510 d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnlessEqual(len(allocated), 1) hunk ./src/allmydata/test/test_upload.py 513 - self.failUnlessEqual(p.queries, 1) + self.failUnlessEqual(s.queries, 1) d.addCallback(_check) return d hunk ./src/allmydata/test/test_upload.py 518 def test_two_each(self): - # if we have 100 shares, and there are 50 peers, and they all accept - # all shares, we should get exactly two shares per peer + # if we have 100 shares, and there are 50 servers, and they all + # accept all shares, we should get exactly two shares per server self.make_client() data = self.get_data(SIZE_LARGE) hunk ./src/allmydata/test/test_upload.py 523 - # if there are 50 peers, then happy needs to be <= 50 + # if there are 50 servers, then happy needs to be <= 50 self.set_encoding_parameters(50, 50, 100) d = upload_data(self.u, data) d.addCallback(extract_uri) hunk ./src/allmydata/test/test_upload.py 529 d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnlessEqual(len(allocated), 2) hunk ./src/allmydata/test/test_upload.py 532 - self.failUnlessEqual(p.queries, 2) + self.failUnlessEqual(s.queries, 2) d.addCallback(_check) return d hunk ./src/allmydata/test/test_upload.py 537 def test_one_each_plus_one_extra(self): - # if we have 51 shares, and there are 50 peers, then one peer gets - # two shares and the rest get just one + # if we have 51 shares, and there are 50 servers, then one server + # gets two shares and the rest get just one self.make_client() data = self.get_data(SIZE_LARGE) hunk ./src/allmydata/test/test_upload.py 549 def _check(res): got_one = [] got_two = [] - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnless(len(allocated) in (1,2), len(allocated)) if len(allocated) == 1: hunk ./src/allmydata/test/test_upload.py 553 - self.failUnlessEqual(p.queries, 1) - got_one.append(p) + self.failUnlessEqual(s.queries, 1) + got_one.append(s) else: hunk ./src/allmydata/test/test_upload.py 556 - self.failUnlessEqual(p.queries, 2) - got_two.append(p) + self.failUnlessEqual(s.queries, 2) + got_two.append(s) self.failUnlessEqual(len(got_one), 49) self.failUnlessEqual(len(got_two), 1) d.addCallback(_check) hunk ./src/allmydata/test/test_upload.py 564 return d def test_four_each(self): - # if we have 200 shares, and there are 50 peers, then each peer gets - # 4 shares. The design goal is to accomplish this with only two - # queries per peer. + # if we have 200 shares, and there are 50 servers, then each server + # gets 4 shares. The design goal is to accomplish this with only two + # queries per server. self.make_client() data = self.get_data(SIZE_LARGE) hunk ./src/allmydata/test/test_upload.py 570 - # if there are 50 peers, then happy should be no more than 50 if - # we want this to work. + # if there are 50 servers, then happy should be no more than 50 if we + # want this to work. self.set_encoding_parameters(100, 50, 200) d = upload_data(self.u, data) d.addCallback(extract_uri) hunk ./src/allmydata/test/test_upload.py 577 d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated self.failUnlessEqual(len(allocated), 4) hunk ./src/allmydata/test/test_upload.py 580 - self.failUnlessEqual(p.queries, 2) + self.failUnlessEqual(s.queries, 2) d.addCallback(_check) return d hunk ./src/allmydata/test/test_upload.py 596 d.addCallback(self._check_large, SIZE_LARGE) def _check(res): counts = {} - for p in self.node.last_peers: - allocated = p.allocated + for s in self.node.last_servers: + allocated = s.allocated counts[len(allocated)] = counts.get(len(allocated), 0) + 1 histogram = [counts.get(i, 0) for i in range(5)] self.failUnlessEqual(histogram, [0,0,0,2,1]) hunk ./src/allmydata/test/test_upload.py 619 d.addCallback(extract_uri) d.addCallback(self._check_large, SIZE_LARGE) def _check(res): - # we should have put one share each on the big peers, and zero - # shares on the small peers + # we should have put one share each on the big servers, and zero + # shares on the small servers total_allocated = 0 hunk ./src/allmydata/test/test_upload.py 622 - for p in self.node.last_peers: + for p in self.node.last_servers: if p.mode == "good": self.failUnlessEqual(len(p.allocated), 1) elif p.mode == "small": hunk ./src/allmydata/test/test_upload.py 753 def _do_upload_with_broken_servers(self, servers_to_break): """ I act like a normal upload, but before I send the results of - Tahoe2PeerSelector to the Encoder, I break the first servers_to_break - PeerTrackers in the upload_servers part of the return result. + Tahoe2ServerSelector to the Encoder, I break the first + servers_to_break ServerTrackers in the upload_servers part of the + return result. """ assert self.g, "I tried to find a grid at self.g, but failed" broker = self.g.clients[0].storage_broker hunk ./src/allmydata/test/test_upload.py 768 encoder = encode.Encoder() encoder.set_encrypted_uploadable(uploadable) status = upload.UploadStatus() - selector = upload.Tahoe2PeerSelector("dglev", "test", status) + selector = upload.Tahoe2ServerSelector("dglev", "test", status) storage_index = encoder.get_param("storage_index") share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") hunk ./src/allmydata/test/test_upload.py 776 d = selector.get_shareholders(broker, sh, storage_index, share_size, block_size, num_segments, 10, 3, 4) - def _have_shareholders((upload_servers, already_peers)): + def _have_shareholders((upload_servers, already_servers)): assert servers_to_break <= len(upload_servers) for index in xrange(servers_to_break): server = list(upload_servers)[index] hunk ./src/allmydata/test/test_upload.py 783 for share in server.buckets.keys(): server.buckets[share].abort() buckets = {} - servermap = already_peers.copy() - for peer in upload_servers: - buckets.update(peer.buckets) - for bucket in peer.buckets: - servermap.setdefault(bucket, set()).add(peer.peerid) + servermap = already_servers.copy() + for server in upload_servers: + buckets.update(server.buckets) + for bucket in server.buckets: + servermap.setdefault(bucket, set()).add(server.serverid) encoder.set_shareholders(buckets, servermap) d = encoder.start() return d hunk ./src/allmydata/test/test_upload.py 1058 # one share from our initial upload to each of these. # The counterintuitive ordering of the share numbers is to deal with # the permuting of these servers -- distributing the shares this - # way ensures that the Tahoe2PeerSelector sees them in the order + # way ensures that the Tahoe2ServerSelector sees them in the order # described below. d = self._setup_and_upload() d.addCallback(lambda ign: hunk ./src/allmydata/test/test_upload.py 1073 # server 2: share 0 # server 3: share 1 # We change the 'happy' parameter in the client to 4. - # The Tahoe2PeerSelector will see the peers permuted as: + # The Tahoe2ServerSelector will see the servers permuted as: # 2, 3, 1, 0 # Ideally, a reupload of our original data should work. def _reset_encoding_parameters(ign, happy=4): hunk ./src/allmydata/test/test_upload.py 1088 # This scenario is basically comment:53, but changed so that the - # Tahoe2PeerSelector sees the server with all of the shares before + # Tahoe2ServerSelector sees the server with all of the shares before # any of the other servers. # The layout is: # server 2: shares 0 - 9 hunk ./src/allmydata/test/test_upload.py 1095 # server 3: share 0 # server 1: share 1 # server 4: share 2 - # The Tahoe2PeerSelector sees the peers permuted as: + # The Tahoe2ServerSelector sees the servers permuted as: # 2, 3, 1, 4 # Note that server 0 has been replaced by server 4; this makes it hunk ./src/allmydata/test/test_upload.py 1098 - # easier to ensure that the last server seen by Tahoe2PeerSelector + # easier to ensure that the last server seen by Tahoe2ServerSelector # has only one share. d.addCallback(_change_basedir) d.addCallback(lambda ign: hunk ./src/allmydata/test/test_upload.py 1128 # Try the same thing, but with empty servers after the first one - # We want to make sure that Tahoe2PeerSelector will redistribute + # We want to make sure that Tahoe2ServerSelector will redistribute # shares as necessary, not simply discover an existing layout. # The layout is: # server 2: shares 0 - 9 hunk ./src/allmydata/test/test_upload.py 1188 return d test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release." - def test_happiness_with_some_readonly_peers(self): + def test_happiness_with_some_readonly_servers(self): # Try the following layout # server 2: shares 0-9 # server 4: share 0, read-only hunk ./src/allmydata/test/test_upload.py 1227 return d - def test_happiness_with_all_readonly_peers(self): + def test_happiness_with_all_readonly_servers(self): # server 3: share 1, read-only # server 1: share 2, read-only # server 2: shares 0-9, read-only hunk ./src/allmydata/test/test_upload.py 1233 # server 4: share 0, read-only # The idea with this test is to make sure that the survey of - # read-only peers doesn't undercount servers of happiness + # read-only servers doesn't undercount servers of happiness self.basedir = self.mktemp() d = self._setup_and_upload() d.addCallback(lambda ign: hunk ./src/allmydata/test/test_upload.py 1272 # the layout presented to it satisfies "servers_of_happiness" # until a failure occurs) # - # This test simulates an upload where servers break after peer + # This test simulates an upload where servers break after server # selection, but before they are written to. def _set_basedir(ign=None): self.basedir = self.mktemp() hunk ./src/allmydata/test/test_upload.py 1287 self._add_server(server_number=5) d.addCallback(_do_server_setup) # remove the original server - # (necessary to ensure that the Tahoe2PeerSelector will distribute + # (necessary to ensure that the Tahoe2ServerSelector will distribute # all the shares) def _remove_server(ign): server = self.g.servers_by_number[0] hunk ./src/allmydata/test/test_upload.py 1347 def test_merge_peers(self): # merge_peers merges a list of upload_servers and a dict of - # shareid -> peerid mappings. + # shareid -> serverid mappings. shares = { 1 : set(["server1"]), 2 : set(["server2"]), hunk ./src/allmydata/test/test_upload.py 1358 # if not provided with a upload_servers argument, it should just # return the first argument unchanged. self.failUnlessEqual(shares, merge_peers(shares, set([]))) - class FakePeerTracker: + class FakeServerTracker: pass trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: hunk ./src/allmydata/test/test_upload.py 1362 - t = FakePeerTracker() - t.peerid = server + t = FakeServerTracker() + t.serverid = server t.buckets = [i] trackers.append(t) expected = { hunk ./src/allmydata/test/test_upload.py 1390 expected = {} for (i, server) in [(i, "server%d" % i) for i in xrange(10)]: shares3[i] = set([server]) - t = FakePeerTracker() - t.peerid = server + t = FakeServerTracker() + t.serverid = server t.buckets = [i] trackers.append(t) expected[i] = set([server]) hunk ./src/allmydata/test/test_upload.py 1407 # value for given inputs. # servers_of_happiness expects a dict of - # shnum => set(peerids) as a preexisting shares argument. + # shnum => set(serverids) as a preexisting shares argument. test1 = { 1 : set(["server1"]), 2 : set(["server2"]), hunk ./src/allmydata/test/test_upload.py 1421 # should be 3 instead of 4. happy = servers_of_happiness(test1) self.failUnlessEqual(3, happy) - # The second argument of merge_peers should be a set of - # objects with peerid and buckets as attributes. In actual use, - # these will be PeerTracker instances, but for testing it is fine - # to make a FakePeerTracker whose job is to hold those instance - # variables to test that part. - class FakePeerTracker: + # The second argument of merge_peers should be a set of objects with + # serverid and buckets as attributes. In actual use, these will be + # ServerTracker instances, but for testing it is fine to make a + # FakeServerTracker whose job is to hold those instance variables to + # test that part. + class FakeServerTracker: pass trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: hunk ./src/allmydata/test/test_upload.py 1430 - t = FakePeerTracker() - t.peerid = server + t = FakeServerTracker() + t.serverid = server t.buckets = [i] trackers.append(t) # Recall that test1 is a server layout with servers_of_happiness hunk ./src/allmydata/test/test_upload.py 1436 # = 3. Since there isn't any overlap between the shnum -> - # set([peerid]) correspondences in test1 and those in trackers, + # set([serverid]) correspondences in test1 and those in trackers, # the result here should be 7. test2 = merge_peers(test1, set(trackers)) happy = servers_of_happiness(test2) hunk ./src/allmydata/test/test_upload.py 1444 # Now add an overlapping server to trackers. This is redundant, # so it should not cause the previously reported happiness value # to change. - t = FakePeerTracker() - t.peerid = "server1" + t = FakeServerTracker() + t.serverid = "server1" t.buckets = [1] trackers.append(t) test2 = merge_peers(test1, set(trackers)) hunk ./src/allmydata/test/test_upload.py 1463 4 : set(['server4']), } trackers = [] - t = FakePeerTracker() - t.peerid = 'server5' + t = FakeServerTracker() + t.serverid = 'server5' t.buckets = [4] trackers.append(t) hunk ./src/allmydata/test/test_upload.py 1467 - t = FakePeerTracker() - t.peerid = 'server6' + t = FakeServerTracker() + t.serverid = 'server6' t.buckets = [3, 5] trackers.append(t) # The value returned by servers_of_happiness is the size hunk ./src/allmydata/test/test_upload.py 1473 # of a maximum matching in the bipartite graph that - # servers_of_happiness() makes between peerids and share + # servers_of_happiness() makes between serverids and share # numbers. It should find something like this: # (server 1, share 1) # (server 2, share 2) hunk ./src/allmydata/test/test_upload.py 1531 sbs = shares_by_server(test1) self.failUnlessEqual(set([1, 2, 3]), sbs["server1"]) self.failUnlessEqual(set([4, 5]), sbs["server2"]) - # This should fail unless the peerid part of the mapping is a set + # This should fail unless the serverid part of the mapping is a set test2 = {1: "server1"} self.shouldFail(AssertionError, "test_shares_by_server", hunk ./src/allmydata/test/test_upload.py 1547 # server 2: empty # server 3: empty # server 4: empty - # The purpose of this test is to make sure that the peer selector + # The purpose of this test is to make sure that the server selector # knows about the shares on server 1, even though it is read-only. # It used to simply filter these out, which would cause the test # to fail when servers_of_happiness = 4. hunk ./src/allmydata/test/test_upload.py 1578 def test_query_counting(self): - # If peer selection fails, Tahoe2PeerSelector prints out a lot + # If server selection fails, Tahoe2ServerSelector prints out a lot # of helpful diagnostic information, including query stats. # This test helps make sure that that information is accurate. self.basedir = self.mktemp() hunk ./src/allmydata/test/test_upload.py 1601 c.upload, upload.Data("data" * 10000, convergence=""))) # Now try with some readonly servers. We want to make sure that - # the readonly peer share discovery phase is counted correctly. + # the readonly server share discovery phase is counted correctly. def _reset(ign): self.basedir = self.mktemp() self.g = None hunk ./src/allmydata/test/test_upload.py 1672 d.addCallback(lambda client: self.shouldFail(UploadUnhappinessError, "test_upper_limit_on_readonly_queries", - "sent 8 queries to 8 peers", + "sent 8 queries to 8 servers", client.upload, upload.Data('data' * 10000, convergence=""))) return d hunk ./src/allmydata/test/test_upload.py 1678 - def test_exception_messages_during_peer_selection(self): + def test_exception_messages_during_server_selection(self): # server 1: read-only, no shares # server 2: read-only, no shares # server 3: read-only, no shares hunk ./src/allmydata/test/test_upload.py 1711 "total (10 homeless), want to place shares on at " "least 4 servers such that any 3 of them have " "enough shares to recover the file, " - "sent 5 queries to 5 peers, 0 queries placed " + "sent 5 queries to 5 servers, 0 queries placed " "some shares, 5 placed none " "(of which 5 placed none due to the server being " "full and 0 placed none due to an error)", hunk ./src/allmydata/test/test_upload.py 1752 "total (10 homeless), want to place shares on at " "least 4 servers such that any 3 of them have " "enough shares to recover the file, " - "sent 5 queries to 5 peers, 0 queries placed " + "sent 5 queries to 5 servers, 0 queries placed " "some shares, 5 placed none " "(of which 4 placed none due to the server being " "full and 1 placed none due to an error)", hunk ./src/allmydata/test/test_upload.py 2013 return d - def test_peer_selector_bucket_abort(self): - # If peer selection for an upload fails due to an unhappy - # layout, the peer selection process should abort the buckets it + def test_server_selector_bucket_abort(self): + # If server selection for an upload fails due to an unhappy + # layout, the server selection process should abort the buckets it # allocates before failing, so that the space can be re-used. self.basedir = self.mktemp() self.set_up_grid(num_servers=5) hunk ./src/allmydata/test/test_upload.py 2028 d = defer.succeed(None) d.addCallback(lambda ignored: self.shouldFail(UploadUnhappinessError, - "test_peer_selection_bucket_abort", + "test_server_selection_bucket_abort", "", client.upload, upload.Data("data" * 10000, convergence=""))) hunk ./src/allmydata/test/test_upload.py 2083 return None # TODO: -# upload with exactly 75 peers (shares_of_happiness) +# upload with exactly 75 servers (shares_of_happiness) # have a download fail # cancel a download (need to implement more cancel stuff) hunk ./src/allmydata/util/happinessutil.py 77 for peer in upload_servers: for shnum in peer.buckets: - servermap.setdefault(shnum, set()).add(peer.peerid) + servermap.setdefault(shnum, set()).add(peer.serverid) return servermap def servers_of_happiness(sharemap): } [upload.py: fix var names to avoid confusion between 'trackers' and 'servers' warner@lothar.com**20110227011103 Ignore-this: 5d5e3415b7d2732d92f42413c25d205d ] { hunk ./src/allmydata/immutable/upload.py 189 self.needed_shares = needed_shares self.homeless_shares = set(range(total_shares)) - self.contacted_servers = [] # servers worth asking again - self.contacted_servers2 = [] # servers that we have asked again + self.contacted_trackers = [] # servers worth asking again + self.contacted_trackers2 = [] # servers that we have asked again self._started_second_pass = False hunk ./src/allmydata/immutable/upload.py 192 - self.use_servers = set() # ServerTrackers that have shares assigned - # to them + self.use_trackers = set() # ServerTrackers that have shares assigned + # to them self.preexisting_shares = {} # shareid => set(serverids) holding shareid hunk ./src/allmydata/immutable/upload.py 195 - # We don't try to allocate shares to these servers, since they've said - # that they're incapable of storing shares of the size that we'd want - # to store. We keep them around because they may have existing shares - # for this storage index, which we want to know about for accurate - # servers_of_happiness accounting - # (this is eventually a list, but it is initialized later) - self.readonly_servers = None + # These servers have shares -- any shares -- for our SI. We keep # track of these to write an error message with them later. self.servers_with_shares = set() hunk ./src/allmydata/immutable/upload.py 248 bucket_cancel_secret_hash(file_cancel_secret, serverid)) for (serverid, conn) in servers] - self.uncontacted_servers = _make_trackers(writable_servers) - self.readonly_servers = _make_trackers(readonly_servers) + self.uncontacted_trackers = _make_trackers(writable_servers) + + # We don't try to allocate shares to these servers, since they've + # said that they're incapable of storing shares of the size that we'd + # want to store. We ask them about existing shares for this storage + # index, which we want to know about for accurate + # servers_of_happiness accounting, then we forget about them. + readonly_trackers = _make_trackers(readonly_servers) + # We now ask servers that can't hold any new shares about existing # shares that they might have for our SI. Once this is done, we # start placing the shares that we haven't already accounted hunk ./src/allmydata/immutable/upload.py 262 # for. ds = [] - if self._status and self.readonly_servers: + if self._status and readonly_trackers: self._status.set_status("Contacting readonly servers to find " "any existing shares") hunk ./src/allmydata/immutable/upload.py 265 - for server in self.readonly_servers: - assert isinstance(server, ServerTracker) - d = server.ask_about_existing_shares() - d.addBoth(self._handle_existing_response, server.serverid) + for tracker in readonly_trackers: + assert isinstance(tracker, ServerTracker) + d = tracker.ask_about_existing_shares() + d.addBoth(self._handle_existing_response, tracker.serverid) ds.append(d) self.num_servers_contacted += 1 self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 273 self.log("asking server %s for any existing shares" % - (idlib.shortnodeid_b2a(server.serverid),), + (idlib.shortnodeid_b2a(tracker.serverid),), level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) hunk ./src/allmydata/immutable/upload.py 327 def _loop(self): if not self.homeless_shares: - merged = merge_peers(self.preexisting_shares, self.use_servers) + merged = merge_peers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " hunk ./src/allmydata/immutable/upload.py 331 - "self.use_servers: %s, self.preexisting_shares: %s") \ + "self.use_trackers: %s, self.preexisting_shares: %s") \ % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join([str_shareloc(k,v) hunk ./src/allmydata/immutable/upload.py 335 - for k,v in s.buckets.iteritems()]) - for s in self.use_servers], + for k,v in st.buckets.iteritems()]) + for st in self.use_trackers], pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) hunk ./src/allmydata/immutable/upload.py 339 - return (self.use_servers, self.preexisting_shares) + return (self.use_trackers, self.preexisting_shares) else: # We're not okay right now, but maybe we can fix it by # redistributing some shares. In cases where one or two hunk ./src/allmydata/immutable/upload.py 356 shares_to_spread = sum([len(list(sharelist)) - 1 for (server, sharelist) in shares.items()]) - if delta <= len(self.uncontacted_servers) and \ + if delta <= len(self.uncontacted_trackers) and \ shares_to_spread >= delta: items = shares.items() while len(self.homeless_shares) < delta: hunk ./src/allmydata/immutable/upload.py 372 if not self.preexisting_shares[share]: del self.preexisting_shares[share] items.append((server, sharelist)) - for writer in self.use_servers: + for writer in self.use_trackers: writer.abort_some_buckets(self.homeless_shares) return self._loop() else: hunk ./src/allmydata/immutable/upload.py 392 self.log(servmsg, level=log.INFREQUENT) return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) - if self.uncontacted_servers: - server = self.uncontacted_servers.pop(0) + if self.uncontacted_trackers: + tracker = self.uncontacted_trackers.pop(0) # TODO: don't pre-convert all serverids to ServerTrackers hunk ./src/allmydata/immutable/upload.py 395 - assert isinstance(server, ServerTracker) + assert isinstance(tracker, ServerTracker) shares_to_ask = set(sorted(self.homeless_shares)[:1]) self.homeless_shares -= shares_to_ask hunk ./src/allmydata/immutable/upload.py 404 if self._status: self._status.set_status("Contacting Servers [%s] (first query)," " %d shares left.." - % (idlib.shortnodeid_b2a(server.serverid), + % (idlib.shortnodeid_b2a(tracker.serverid), len(self.homeless_shares))) hunk ./src/allmydata/immutable/upload.py 406 - d = server.query(shares_to_ask) - d.addBoth(self._got_response, server, shares_to_ask, - self.contacted_servers) + d = tracker.query(shares_to_ask) + d.addBoth(self._got_response, tracker, shares_to_ask, + self.contacted_trackers) return d hunk ./src/allmydata/immutable/upload.py 410 - elif self.contacted_servers: + elif self.contacted_trackers: # ask a server that we've already asked. if not self._started_second_pass: self.log("starting second pass", hunk ./src/allmydata/immutable/upload.py 417 level=log.NOISY) self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.contacted_servers)) - server = self.contacted_servers.pop(0) + len(self.contacted_trackers)) + tracker = self.contacted_trackers.pop(0) shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) self.homeless_shares -= shares_to_ask self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 425 if self._status: self._status.set_status("Contacting Servers [%s] (second query)," " %d shares left.." - % (idlib.shortnodeid_b2a(server.serverid), + % (idlib.shortnodeid_b2a(tracker.serverid), len(self.homeless_shares))) hunk ./src/allmydata/immutable/upload.py 427 - d = server.query(shares_to_ask) - d.addBoth(self._got_response, server, shares_to_ask, - self.contacted_servers2) + d = tracker.query(shares_to_ask) + d.addBoth(self._got_response, tracker, shares_to_ask, + self.contacted_trackers2) return d hunk ./src/allmydata/immutable/upload.py 431 - elif self.contacted_servers2: + elif self.contacted_trackers2: # we've finished the second-or-later pass. Move all the remaining hunk ./src/allmydata/immutable/upload.py 433 - # servers back into self.contacted_servers for the next pass. - self.contacted_servers.extend(self.contacted_servers2) - self.contacted_servers2[:] = [] + # servers back into self.contacted_trackers for the next pass. + self.contacted_trackers.extend(self.contacted_trackers2) + self.contacted_trackers2[:] = [] return self._loop() else: # no more servers. If we haven't placed enough shares, we fail. hunk ./src/allmydata/immutable/upload.py 439 - merged = merge_peers(self.preexisting_shares, self.use_servers) + merged = merge_peers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: msg = failure_message(len(self.servers_with_shares), hunk ./src/allmydata/immutable/upload.py 459 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged))) self.log(msg, level=log.OPERATIONAL) - return (self.use_servers, self.preexisting_shares) + return (self.use_trackers, self.preexisting_shares) hunk ./src/allmydata/immutable/upload.py 461 - def _got_response(self, res, server, shares_to_ask, put_server_here): + def _got_response(self, res, tracker, shares_to_ask, put_tracker_here): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. hunk ./src/allmydata/immutable/upload.py 465 - self.log("%s got error during server selection: %s" % (server, res), + self.log("%s got error during server selection: %s" % (tracker, res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 470 self.homeless_shares |= shares_to_ask - if (self.uncontacted_servers - or self.contacted_servers - or self.contacted_servers2): + if (self.uncontacted_trackers + or self.contacted_trackers + or self.contacted_trackers2): # there is still hope, so just loop pass else: hunk ./src/allmydata/immutable/upload.py 481 # failure we got: if a coding error causes all servers to fail # in the same way, this allows the common failure to be seen # by the uploader and should help with debugging - msg = ("last failure (from %s) was: %s" % (server, res)) + msg = ("last failure (from %s) was: %s" % (tracker, res)) self.last_failure_msg = msg else: (alreadygot, allocated) = res hunk ./src/allmydata/immutable/upload.py 486 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" - % (idlib.shortnodeid_b2a(server.serverid), + % (idlib.shortnodeid_b2a(tracker.serverid), tuple(sorted(alreadygot)), tuple(sorted(allocated))), level=log.NOISY) progress = False hunk ./src/allmydata/immutable/upload.py 491 for s in alreadygot: - self.preexisting_shares.setdefault(s, set()).add(server.serverid) + self.preexisting_shares.setdefault(s, set()).add(tracker.serverid) if s in self.homeless_shares: self.homeless_shares.remove(s) progress = True hunk ./src/allmydata/immutable/upload.py 501 # the ServerTracker will remember which shares were allocated on # that peer. We just have to remember to use them. if allocated: - self.use_servers.add(server) + self.use_trackers.add(tracker) progress = True if allocated or alreadygot: hunk ./src/allmydata/immutable/upload.py 505 - self.servers_with_shares.add(server.serverid) + self.servers_with_shares.add(tracker.serverid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) hunk ./src/allmydata/immutable/upload.py 536 else: # if they *were* able to accept everything, they might be # willing to accept even more. - put_server_here.append(server) + put_tracker_here.append(tracker) # now loop return self._loop() hunk ./src/allmydata/immutable/upload.py 549 place shares for this file. I then raise an UploadUnhappinessError with my msg argument. """ - for server in self.use_servers: - assert isinstance(server, ServerTracker) - - server.abort() - + for tracker in self.use_trackers: + assert isinstance(tracker, ServerTracker) + tracker.abort() raise UploadUnhappinessError(msg) } [upload.py: more tracker-vs-server cleanup warner@lothar.com**20110227011107 Ignore-this: bb75ed2afef55e47c085b35def2de315 ] { hunk ./src/allmydata/immutable/upload.py 174 num_segments, total_shares, needed_shares, servers_of_happiness): """ - @return: (upload_servers, already_servers), where upload_servers is + @return: (upload_trackers, already_servers), where upload_trackers is a set of ServerTracker instances that have agreed to hold some shares for us (the shareids are stashed inside the ServerTracker), and already_servers is a dict mapping shnum hunk ./src/allmydata/immutable/upload.py 178 - to a set of servers which claim to already have the share. + to a set of serverids which claim to already have the share. """ if self._status: hunk ./src/allmydata/immutable/upload.py 198 # These servers have shares -- any shares -- for our SI. We keep # track of these to write an error message with them later. - self.servers_with_shares = set() + self.serverids_with_shares = set() # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree hunk ./src/allmydata/immutable/upload.py 280 return dl - def _handle_existing_response(self, res, server): + def _handle_existing_response(self, res, serverid): """ I handle responses to the queries sent by Tahoe2ServerSelector._existing_shares. hunk ./src/allmydata/immutable/upload.py 287 """ if isinstance(res, failure.Failure): self.log("%s got error during existing shares check: %s" - % (idlib.shortnodeid_b2a(server), res), + % (idlib.shortnodeid_b2a(serverid), res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 294 else: buckets = res if buckets: - self.servers_with_shares.add(server) + self.serverids_with_shares.add(serverid) self.log("response to get_buckets() from server %s: alreadygot=%s" hunk ./src/allmydata/immutable/upload.py 296 - % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))), + % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: hunk ./src/allmydata/immutable/upload.py 299 - self.preexisting_shares.setdefault(bucket, set()).add(server) + self.preexisting_shares.setdefault(bucket, set()).add(serverid) self.homeless_shares.discard(bucket) self.full_count += 1 self.bad_query_count += 1 hunk ./src/allmydata/immutable/upload.py 377 return self._loop() else: # Redistribution won't help us; fail. - server_count = len(self.servers_with_shares) + server_count = len(self.serverids_with_shares) failmsg = failure_message(server_count, self.needed_shares, self.servers_of_happiness, hunk ./src/allmydata/immutable/upload.py 442 merged = merge_peers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: - msg = failure_message(len(self.servers_with_shares), + msg = failure_message(len(self.serverids_with_shares), self.needed_shares, self.servers_of_happiness, effective_happiness) hunk ./src/allmydata/immutable/upload.py 505 progress = True if allocated or alreadygot: - self.servers_with_shares.add(tracker.serverid) + self.serverids_with_shares.add(tracker.serverid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) hunk ./src/allmydata/immutable/upload.py 923 d.addCallback(_done) return d - def set_shareholders(self, (upload_servers, already_servers), encoder): + def set_shareholders(self, (upload_trackers, already_servers), encoder): """ hunk ./src/allmydata/immutable/upload.py 925 - @param upload_servers: a sequence of ServerTracker objects that - have agreed to hold some shares for us (the - shareids are stashed inside the ServerTracker) + @param upload_trackers: a sequence of ServerTracker objects that + have agreed to hold some shares for us (the + shareids are stashed inside the ServerTracker) @paran already_servers: a dict mapping sharenum to a set of serverids that claim to already have this share """ hunk ./src/allmydata/immutable/upload.py 931 - msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s" - values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()]) - for s in upload_servers], already_servers) + msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s" + values = ([', '.join([str_shareloc(k,v) + for k,v in st.buckets.iteritems()]) + for st in upload_trackers], already_servers) self.log(msgtempl % values, level=log.OPERATIONAL) # record already-present shares in self._results self._results.preexisting_shares = len(already_servers) hunk ./src/allmydata/immutable/upload.py 940 self._server_trackers = {} # k: shnum, v: instance of ServerTracker - for server in upload_servers: - assert isinstance(server, ServerTracker) + for tracker in upload_trackers: + assert isinstance(tracker, ServerTracker) buckets = {} servermap = already_servers.copy() hunk ./src/allmydata/immutable/upload.py 944 - for server in upload_servers: - buckets.update(server.buckets) - for shnum in server.buckets: - self._server_trackers[shnum] = server - servermap.setdefault(shnum, set()).add(server.serverid) - assert len(buckets) == sum([len(server.buckets) - for server in upload_servers]), \ + for tracker in upload_trackers: + buckets.update(tracker.buckets) + for shnum in tracker.buckets: + self._server_trackers[shnum] = tracker + servermap.setdefault(shnum, set()).add(tracker.serverid) + assert len(buckets) == sum([len(tracker.buckets) + for tracker in upload_trackers]), \ "%s (%s) != %s (%s)" % ( len(buckets), buckets, hunk ./src/allmydata/immutable/upload.py 954 - sum([len(server.buckets) for server in upload_servers]), - [(s.buckets, s.serverid) for s in upload_servers] + sum([len(tracker.buckets) for tracker in upload_trackers]), + [(t.buckets, t.serverid) for t in upload_trackers] ) encoder.set_shareholders(buckets, servermap) } [happinessutil.py: server-vs-tracker cleanup warner@lothar.com**20110227011111 Ignore-this: b856c84033562d7d718cae7cb01085a9 ] { hunk ./src/allmydata/util/happinessutil.py 57 ret.setdefault(peerid, set()).add(shareid) return ret -def merge_peers(servermap, upload_servers=None): +def merge_peers(servermap, upload_trackers=None): """ I accept a dict of shareid -> set(peerid) mappings, and optionally a set of PeerTrackers. If no set of PeerTrackers is provided, I return hunk ./src/allmydata/util/happinessutil.py 69 # context where it is okay to do that, make a copy of servermap and # work with it. servermap = deepcopy(servermap) - if not upload_servers: + if not upload_trackers: return servermap assert(isinstance(servermap, dict)) hunk ./src/allmydata/util/happinessutil.py 73 - assert(isinstance(upload_servers, set)) + assert(isinstance(upload_trackers, set)) hunk ./src/allmydata/util/happinessutil.py 75 - for peer in upload_servers: - for shnum in peer.buckets: - servermap.setdefault(shnum, set()).add(peer.serverid) + for tracker in upload_trackers: + for shnum in tracker.buckets: + servermap.setdefault(shnum, set()).add(tracker.serverid) return servermap def servers_of_happiness(sharemap): } [test_upload.py: server-vs-tracker cleanup warner@lothar.com**20110227011115 Ignore-this: 2915133be1a3ba456e8603885437e03 ] { hunk ./src/allmydata/test/test_upload.py 776 d = selector.get_shareholders(broker, sh, storage_index, share_size, block_size, num_segments, 10, 3, 4) - def _have_shareholders((upload_servers, already_servers)): - assert servers_to_break <= len(upload_servers) + def _have_shareholders((upload_trackers, already_servers)): + assert servers_to_break <= len(upload_trackers) for index in xrange(servers_to_break): hunk ./src/allmydata/test/test_upload.py 779 - server = list(upload_servers)[index] - for share in server.buckets.keys(): - server.buckets[share].abort() + tracker = list(upload_trackers)[index] + for share in tracker.buckets.keys(): + tracker.buckets[share].abort() buckets = {} servermap = already_servers.copy() hunk ./src/allmydata/test/test_upload.py 784 - for server in upload_servers: - buckets.update(server.buckets) - for bucket in server.buckets: - servermap.setdefault(bucket, set()).add(server.serverid) + for tracker in upload_trackers: + buckets.update(tracker.buckets) + for bucket in tracker.buckets: + servermap.setdefault(bucket, set()).add(tracker.serverid) encoder.set_shareholders(buckets, servermap) d = encoder.start() return d } [test_upload.py: factor out FakeServerTracker warner@lothar.com**20110227011120 Ignore-this: 6c182cba90e908221099472cc159325b ] { hunk ./src/allmydata/test/test_upload.py 728 # print "HAAPP{Y" return True +class FakeServerTracker: + def __init__(self, serverid, buckets): + self.serverid = serverid + self.buckets = buckets + class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, ShouldFailMixin): def find_all_shares(self, unused=None): hunk ./src/allmydata/test/test_upload.py 1363 # if not provided with a upload_servers argument, it should just # return the first argument unchanged. self.failUnlessEqual(shares, merge_peers(shares, set([]))) - class FakeServerTracker: - pass trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: hunk ./src/allmydata/test/test_upload.py 1365 - t = FakeServerTracker() - t.serverid = server - t.buckets = [i] + t = FakeServerTracker(server, [i]) trackers.append(t) expected = { 1 : set(["server1"]), hunk ./src/allmydata/test/test_upload.py 1391 expected = {} for (i, server) in [(i, "server%d" % i) for i in xrange(10)]: shares3[i] = set([server]) - t = FakeServerTracker() - t.serverid = server - t.buckets = [i] + t = FakeServerTracker(server, [i]) trackers.append(t) expected[i] = set([server]) self.failUnlessEqual(expected, merge_peers(shares3, set(trackers))) hunk ./src/allmydata/test/test_upload.py 1425 # ServerTracker instances, but for testing it is fine to make a # FakeServerTracker whose job is to hold those instance variables to # test that part. - class FakeServerTracker: - pass trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: hunk ./src/allmydata/test/test_upload.py 1427 - t = FakeServerTracker() - t.serverid = server - t.buckets = [i] + t = FakeServerTracker(server, [i]) trackers.append(t) # Recall that test1 is a server layout with servers_of_happiness # = 3. Since there isn't any overlap between the shnum -> hunk ./src/allmydata/test/test_upload.py 1439 # Now add an overlapping server to trackers. This is redundant, # so it should not cause the previously reported happiness value # to change. - t = FakeServerTracker() - t.serverid = "server1" - t.buckets = [1] + t = FakeServerTracker("server1", [1]) trackers.append(t) test2 = merge_peers(test1, set(trackers)) happy = servers_of_happiness(test2) hunk ./src/allmydata/test/test_upload.py 1456 4 : set(['server4']), } trackers = [] - t = FakeServerTracker() - t.serverid = 'server5' - t.buckets = [4] + t = FakeServerTracker('server5', [4]) trackers.append(t) hunk ./src/allmydata/test/test_upload.py 1458 - t = FakeServerTracker() - t.serverid = 'server6' - t.buckets = [3, 5] + t = FakeServerTracker('server6', [3, 5]) trackers.append(t) # The value returned by servers_of_happiness is the size # of a maximum matching in the bipartite graph that } [happinessutil.py: finally rename merge_peers to merge_servers warner@lothar.com**20110227011124 Ignore-this: c8cd381fea1dd888899cb71e4f86de6e ] { hunk ./src/allmydata/immutable/upload.py 17 from allmydata.immutable import encode from allmydata.util import base32, dictutil, idlib, log, mathutil from allmydata.util.happinessutil import servers_of_happiness, \ - shares_by_server, merge_peers, \ + shares_by_server, merge_servers, \ failure_message from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference hunk ./src/allmydata/immutable/upload.py 327 def _loop(self): if not self.homeless_shares: - merged = merge_peers(self.preexisting_shares, self.use_trackers) + merged = merge_servers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " hunk ./src/allmydata/immutable/upload.py 439 return self._loop() else: # no more servers. If we haven't placed enough shares, we fail. - merged = merge_peers(self.preexisting_shares, self.use_trackers) + merged = merge_servers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: msg = failure_message(len(self.serverids_with_shares), hunk ./src/allmydata/test/test_upload.py 20 from allmydata.test.no_network import GridTestMixin from allmydata.test.common_util import ShouldFailMixin from allmydata.util.happinessutil import servers_of_happiness, \ - shares_by_server, merge_peers + shares_by_server, merge_servers from allmydata.storage_client import StorageFarmBroker from allmydata.storage.server import storage_index_to_dir hunk ./src/allmydata/test/test_upload.py 1350 return d - def test_merge_peers(self): - # merge_peers merges a list of upload_servers and a dict of + def test_merge_servers(self): + # merge_servers merges a list of upload_servers and a dict of # shareid -> serverid mappings. shares = { 1 : set(["server1"]), hunk ./src/allmydata/test/test_upload.py 1362 } # if not provided with a upload_servers argument, it should just # return the first argument unchanged. - self.failUnlessEqual(shares, merge_peers(shares, set([]))) + self.failUnlessEqual(shares, merge_servers(shares, set([]))) trackers = [] for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]: t = FakeServerTracker(server, [i]) hunk ./src/allmydata/test/test_upload.py 1377 7 : set(["server7"]), 8 : set(["server8"]), } - self.failUnlessEqual(expected, merge_peers(shares, set(trackers))) + self.failUnlessEqual(expected, merge_servers(shares, set(trackers))) shares2 = {} expected = { 5 : set(["server5"]), hunk ./src/allmydata/test/test_upload.py 1385 7 : set(["server7"]), 8 : set(["server8"]), } - self.failUnlessEqual(expected, merge_peers(shares2, set(trackers))) + self.failUnlessEqual(expected, merge_servers(shares2, set(trackers))) shares3 = {} trackers = [] expected = {} hunk ./src/allmydata/test/test_upload.py 1394 t = FakeServerTracker(server, [i]) trackers.append(t) expected[i] = set([server]) - self.failUnlessEqual(expected, merge_peers(shares3, set(trackers))) + self.failUnlessEqual(expected, merge_servers(shares3, set(trackers))) def test_servers_of_happiness_utility_function(self): hunk ./src/allmydata/test/test_upload.py 1420 # should be 3 instead of 4. happy = servers_of_happiness(test1) self.failUnlessEqual(3, happy) - # The second argument of merge_peers should be a set of objects with + # The second argument of merge_servers should be a set of objects with # serverid and buckets as attributes. In actual use, these will be # ServerTracker instances, but for testing it is fine to make a # FakeServerTracker whose job is to hold those instance variables to hunk ./src/allmydata/test/test_upload.py 1433 # = 3. Since there isn't any overlap between the shnum -> # set([serverid]) correspondences in test1 and those in trackers, # the result here should be 7. - test2 = merge_peers(test1, set(trackers)) + test2 = merge_servers(test1, set(trackers)) happy = servers_of_happiness(test2) self.failUnlessEqual(7, happy) # Now add an overlapping server to trackers. This is redundant, hunk ./src/allmydata/test/test_upload.py 1441 # to change. t = FakeServerTracker("server1", [1]) trackers.append(t) - test2 = merge_peers(test1, set(trackers)) + test2 = merge_servers(test1, set(trackers)) happy = servers_of_happiness(test2) self.failUnlessEqual(7, happy) test = {} hunk ./src/allmydata/test/test_upload.py 1472 # # and, since there are 5 edges in this matching, it should # return 5. - test2 = merge_peers(test, set(trackers)) + test2 = merge_servers(test, set(trackers)) happy = servers_of_happiness(test2) self.failUnlessEqual(5, happy) # Zooko's first puzzle: hunk ./src/allmydata/util/happinessutil.py 57 ret.setdefault(peerid, set()).add(shareid) return ret -def merge_peers(servermap, upload_trackers=None): +def merge_servers(servermap, upload_trackers=None): """ hunk ./src/allmydata/util/happinessutil.py 59 - I accept a dict of shareid -> set(peerid) mappings, and optionally a - set of PeerTrackers. If no set of PeerTrackers is provided, I return + I accept a dict of shareid -> set(serverid) mappings, and optionally a + set of ServerTrackers. If no set of ServerTrackers is provided, I return my first argument unmodified. Otherwise, I update a copy of my first hunk ./src/allmydata/util/happinessutil.py 62 - argument to include the shareid -> peerid mappings implied in the - set of PeerTrackers, returning the resulting dict. + argument to include the shareid -> serverid mappings implied in the + set of ServerTrackers, returning the resulting dict. """ # Since we mutate servermap, and are called outside of a # context where it is okay to do that, make a copy of servermap and } [upload.py: rearrange _make_trackers a bit, no behavior changes warner@lothar.com**20110227011128 Ignore-this: 296d4819e2af452b107177aef6ebb40f ] hunk ./src/allmydata/immutable/upload.py 239 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) def _make_trackers(servers): - return [ServerTracker(serverid, conn, - share_size, block_size, - num_segments, num_share_hashes, - storage_index, - bucket_renewal_secret_hash(file_renewal_secret, - serverid), - bucket_cancel_secret_hash(file_cancel_secret, - serverid)) - for (serverid, conn) in servers] + trackers = [] + for (serverid, conn) in servers: + seed = serverid + renew = bucket_renewal_secret_hash(file_renewal_secret, seed) + cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) + st = ServerTracker(serverid, conn, + share_size, block_size, + num_segments, num_share_hashes, + storage_index, + renew, cancel) + trackers.append(st) + return trackers self.uncontacted_trackers = _make_trackers(writable_servers) # We don't try to allocate shares to these servers, since they've [add remaining get_* methods to storage_client.Server, NoNetworkServer, and warner@lothar.com**20110227011132 Ignore-this: 6078279ddf42b179996a4b53bee8c421 MockIServer stubs ] { hunk ./src/allmydata/storage_client.py 182 def __init__(self, serverid, ann_d, min_shares=1): self.serverid = serverid + self._tubid = serverid self.announcement = ann_d self.min_shares = min_shares hunk ./src/allmydata/storage_client.py 195 self._reconnector = None self._trigger_cb = None + def __repr__(self): + return "" % self.name() def get_serverid(self): hunk ./src/allmydata/storage_client.py 198 - return self.serverid + return self._tubid def get_permutation_seed(self): hunk ./src/allmydata/storage_client.py 200 - return self.serverid + return self._tubid + def get_version(self): + if self.rref: + return self.rref.version + return None + def name(self): # keep methodname short + return self.serverid_s + def longname(self): + return idlib.nodeid_b2a(self._tubid) + def get_lease_seed(self): + return self._tubid + def get_foolscap_write_enabler_seed(self): + return self._tubid def get_nickname(self): return self.announcement["nickname"].decode("utf-8") hunk ./src/allmydata/storage_client.py 233 self._reconnector = tub.connectTo(furl, self._got_connection) def _got_connection(self, rref): - lp = log.msg(format="got connection to %(serverid)s, getting versions", - serverid=self.serverid_s, + lp = log.msg(format="got connection to %(name)s, getting versions", + name=self.name(), facility="tahoe.storage_broker", umid="coUECQ") if self._trigger_cb: eventually(self._trigger_cb) hunk ./src/allmydata/storage_client.py 242 d = add_version_to_remote_reference(rref, default) d.addCallback(self._got_versioned_service, lp) d.addErrback(log.err, format="storageclient._got_connection", - serverid=self.serverid_s, umid="Sdq3pg") + name=self.name(), umid="Sdq3pg") def _got_versioned_service(self, rref, lp): hunk ./src/allmydata/storage_client.py 245 - log.msg(format="%(serverid)s provided version info %(version)s", - serverid=self.serverid_s, version=rref.version, + log.msg(format="%(name)s provided version info %(version)s", + name=self.name(), version=rref.version, facility="tahoe.storage_broker", umid="SWmJYg", level=log.NOISY, parent=lp) hunk ./src/allmydata/storage_client.py 259 return self.rref def _lost(self): - log.msg(format="lost connection to %(serverid)s", - serverid=self.serverid_s, + log.msg(format="lost connection to %(name)s", name=self.name(), facility="tahoe.storage_broker", umid="zbRllw") self.last_loss_time = time.time() self.rref = None hunk ./src/allmydata/test/no_network.py 124 def __init__(self, serverid, rref): self.serverid = serverid self.rref = rref + def __repr__(self): + return "" % self.name() def get_serverid(self): return self.serverid def get_permutation_seed(self): hunk ./src/allmydata/test/no_network.py 130 return self.serverid + def get_lease_seed(self): + return self.serverid + def name(self): + return idlib.shortnodeid_b2a(self.serverid) + def longname(self): + return idlib.nodeid_b2a(self.serverid) + def get_nickname(self): + return "nickname" def get_rref(self): return self.rref hunk ./src/allmydata/test/no_network.py 140 + def get_version(self): + return self.rref.version class NoNetworkStorageBroker: implements(IStorageBroker) hunk ./src/allmydata/test/test_immutable.py 106 return self.serverid def get_rref(self): return self.rref + def name(self): + return "name-%s" % self.serverid + def get_version(self): + return self.rref.version mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()}) mockserver2 = MockServer({}) } [immutable/checker.py: remove some uses of s.get_serverid(), not all warner@lothar.com**20110227011134 Ignore-this: e480a37efa9e94e8016d826c492f626e ] { hunk ./src/allmydata/immutable/checker.py 10 from allmydata.check_results import CheckResults from allmydata.uri import CHKFileVerifierURI from allmydata.util.assertutil import precondition -from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil +from allmydata.util import base32, deferredutil, dictutil, log, mathutil from allmydata.util.hashutil import file_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \ bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \ hunk ./src/allmydata/immutable/checker.py 484 self._verifycap.get_storage_index()) self.file_cancel_secret = fcs - def _get_renewal_secret(self, peerid): - return bucket_renewal_secret_hash(self.file_renewal_secret, peerid) - def _get_cancel_secret(self, peerid): - return bucket_cancel_secret_hash(self.file_cancel_secret, peerid) + def _get_renewal_secret(self, seed): + return bucket_renewal_secret_hash(self.file_renewal_secret, seed) + def _get_cancel_secret(self, seed): + return bucket_cancel_secret_hash(self.file_cancel_secret, seed) def _get_buckets(self, s, storageindex): """Return a deferred that eventually fires with ({sharenum: bucket}, hunk ./src/allmydata/immutable/checker.py 499 responded.)""" rref = s.get_rref() + lease_seed = s.get_lease_seed() serverid = s.get_serverid() if self._add_lease: hunk ./src/allmydata/immutable/checker.py 502 - renew_secret = self._get_renewal_secret(serverid) - cancel_secret = self._get_cancel_secret(serverid) + renew_secret = self._get_renewal_secret(lease_seed) + cancel_secret = self._get_cancel_secret(lease_seed) d2 = rref.callRemote("add_lease", storageindex, renew_secret, cancel_secret) hunk ./src/allmydata/immutable/checker.py 506 - d2.addErrback(self._add_lease_failed, serverid, storageindex) + d2.addErrback(self._add_lease_failed, s.name(), storageindex) d = rref.callRemote("get_buckets", storageindex) def _wrap_results(res): hunk ./src/allmydata/immutable/checker.py 524 d.addCallbacks(_wrap_results, _trap_errs) return d - def _add_lease_failed(self, f, peerid, storage_index): + def _add_lease_failed(self, f, server_name, storage_index): # Older versions of Tahoe didn't handle the add-lease message very # well: <=1.1.0 throws a NameError because it doesn't implement # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets hunk ./src/allmydata/immutable/checker.py 544 # this may ignore a bit too much, but that only hurts us # during debugging return - self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s", - peerid=idlib.shortnodeid_b2a(peerid), + self.log(format="error in add_lease from [%(name)s]: %(f_value)s", + name=server_name, f_value=str(f.value), failure=f, level=log.WEIRD, umid="atbAxw") hunk ./src/allmydata/immutable/checker.py 552 return # local errors are cause for alarm log.err(f, - format="local error in add_lease to [%(peerid)s]: %(f_value)s", - peerid=idlib.shortnodeid_b2a(peerid), + format="local error in add_lease to [%(name)s]: %(f_value)s", + name=server_name, f_value=str(f.value), level=log.WEIRD, umid="hEGuQg") } [immutable/upload.py: reduce use of get_serverid() warner@lothar.com**20110227011138 Ignore-this: ffdd7ff32bca890782119a6e9f1495f6 ] { hunk ./src/allmydata/immutable/upload.py 72 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) class ServerTracker: - def __init__(self, serverid, storage_server, + def __init__(self, server, sharesize, blocksize, num_segments, num_share_hashes, storage_index, bucket_renewal_secret, bucket_cancel_secret): hunk ./src/allmydata/immutable/upload.py 76 - precondition(isinstance(serverid, str), serverid) - precondition(len(serverid) == 20, serverid) - self.serverid = serverid - self._storageserver = storage_server # to an RIStorageServer + self._server = server self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize hunk ./src/allmydata/immutable/upload.py 83 wbp = layout.make_write_bucket_proxy(None, sharesize, blocksize, num_segments, num_share_hashes, - EXTENSION_SIZE, serverid) + EXTENSION_SIZE, server.get_serverid()) self.wbp_class = wbp.__class__ # to create more of them self.allocated_size = wbp.get_allocated_size() self.blocksize = blocksize hunk ./src/allmydata/immutable/upload.py 96 def __repr__(self): return ("" - % (idlib.shortnodeid_b2a(self.serverid), - si_b2a(self.storage_index)[:5])) + % (self._server.name(), si_b2a(self.storage_index)[:5])) + + def get_serverid(self): + return self._server.get_serverid() + def name(self): + return self._server.name() def query(self, sharenums): hunk ./src/allmydata/immutable/upload.py 104 - d = self._storageserver.callRemote("allocate_buckets", - self.storage_index, - self.renew_secret, - self.cancel_secret, - sharenums, - self.allocated_size, - canary=Referenceable()) + rref = self._server.get_rref() + d = rref.callRemote("allocate_buckets", + self.storage_index, + self.renew_secret, + self.cancel_secret, + sharenums, + self.allocated_size, + canary=Referenceable()) d.addCallback(self._got_reply) return d hunk ./src/allmydata/immutable/upload.py 116 def ask_about_existing_shares(self): - return self._storageserver.callRemote("get_buckets", - self.storage_index) + rref = self._server.get_rref() + return rref.callRemote("get_buckets", self.storage_index) def _got_reply(self, (alreadygot, buckets)): #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) hunk ./src/allmydata/immutable/upload.py 128 self.num_segments, self.num_share_hashes, EXTENSION_SIZE, - self.serverid) + self._server.get_serverid()) b[sharenum] = bp self.buckets.update(b) return (alreadygot, set(b.keys())) hunk ./src/allmydata/immutable/upload.py 214 num_share_hashes, EXTENSION_SIZE, None) allocated_size = wbp.get_allocated_size() - all_servers = [(s.get_serverid(), s.get_rref()) - for s in storage_broker.get_servers_for_psi(storage_index)] + all_servers = storage_broker.get_servers_for_psi(storage_index) if not all_servers: raise NoServersError("client gave us zero servers") hunk ./src/allmydata/immutable/upload.py 223 # field) from getting large shares (for files larger than about # 12GiB). See #439 for details. def _get_maxsize(server): - (serverid, conn) = server - v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v0 = server.get_rref().version + v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] writable_servers = [server for server in all_servers if _get_maxsize(server) >= allocated_size] hunk ./src/allmydata/immutable/upload.py 241 storage_index) def _make_trackers(servers): trackers = [] - for (serverid, conn) in servers: - seed = serverid + for s in servers: + seed = s.get_lease_seed() renew = bucket_renewal_secret_hash(file_renewal_secret, seed) cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) hunk ./src/allmydata/immutable/upload.py 245 - st = ServerTracker(serverid, conn, + st = ServerTracker(s, share_size, block_size, num_segments, num_share_hashes, storage_index, hunk ./src/allmydata/immutable/upload.py 272 for tracker in readonly_trackers: assert isinstance(tracker, ServerTracker) d = tracker.ask_about_existing_shares() - d.addBoth(self._handle_existing_response, tracker.serverid) + d.addBoth(self._handle_existing_response, tracker) ds.append(d) self.num_servers_contacted += 1 self.query_count += 1 hunk ./src/allmydata/immutable/upload.py 277 self.log("asking server %s for any existing shares" % - (idlib.shortnodeid_b2a(tracker.serverid),), - level=log.NOISY) + (tracker.name(),), level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) return dl hunk ./src/allmydata/immutable/upload.py 283 - def _handle_existing_response(self, res, serverid): + def _handle_existing_response(self, res, tracker): """ I handle responses to the queries sent by Tahoe2ServerSelector._existing_shares. hunk ./src/allmydata/immutable/upload.py 288 """ + serverid = tracker.get_serverid() if isinstance(res, failure.Failure): self.log("%s got error during existing shares check: %s" hunk ./src/allmydata/immutable/upload.py 291 - % (idlib.shortnodeid_b2a(serverid), res), - level=log.UNUSUAL) + % (tracker.name(), res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 else: hunk ./src/allmydata/immutable/upload.py 299 if buckets: self.serverids_with_shares.add(serverid) self.log("response to get_buckets() from server %s: alreadygot=%s" - % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))), + % (tracker.name(), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: self.preexisting_shares.setdefault(bucket, set()).add(serverid) hunk ./src/allmydata/immutable/upload.py 407 if self._status: self._status.set_status("Contacting Servers [%s] (first query)," " %d shares left.." - % (idlib.shortnodeid_b2a(tracker.serverid), + % (tracker.name(), len(self.homeless_shares))) d = tracker.query(shares_to_ask) d.addBoth(self._got_response, tracker, shares_to_ask, hunk ./src/allmydata/immutable/upload.py 428 if self._status: self._status.set_status("Contacting Servers [%s] (second query)," " %d shares left.." - % (idlib.shortnodeid_b2a(tracker.serverid), + % (tracker.name(), len(self.homeless_shares))) d = tracker.query(shares_to_ask) d.addBoth(self._got_response, tracker, shares_to_ask, hunk ./src/allmydata/immutable/upload.py 489 else: (alreadygot, allocated) = res self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s" - % (idlib.shortnodeid_b2a(tracker.serverid), + % (tracker.name(), tuple(sorted(alreadygot)), tuple(sorted(allocated))), level=log.NOISY) progress = False hunk ./src/allmydata/immutable/upload.py 494 for s in alreadygot: - self.preexisting_shares.setdefault(s, set()).add(tracker.serverid) + self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid()) if s in self.homeless_shares: self.homeless_shares.remove(s) progress = True hunk ./src/allmydata/immutable/upload.py 508 progress = True if allocated or alreadygot: - self.serverids_with_shares.add(tracker.serverid) + self.serverids_with_shares.add(tracker.get_serverid()) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) hunk ./src/allmydata/immutable/upload.py 951 buckets.update(tracker.buckets) for shnum in tracker.buckets: self._server_trackers[shnum] = tracker - servermap.setdefault(shnum, set()).add(tracker.serverid) + servermap.setdefault(shnum, set()).add(tracker.get_serverid()) assert len(buckets) == sum([len(tracker.buckets) for tracker in upload_trackers]), \ "%s (%s) != %s (%s)" % ( hunk ./src/allmydata/immutable/upload.py 958 len(buckets), buckets, sum([len(tracker.buckets) for tracker in upload_trackers]), - [(t.buckets, t.serverid) for t in upload_trackers] + [(t.buckets, t.get_serverid()) for t in upload_trackers] ) encoder.set_shareholders(buckets, servermap) hunk ./src/allmydata/immutable/upload.py 967 r = self._results for shnum in self._encoder.get_shares_placed(): server_tracker = self._server_trackers[shnum] - serverid = server_tracker.serverid + serverid = server_tracker.get_serverid() r.sharemap.add(shnum, serverid) r.servermap.add(serverid, shnum) r.pushed_shares = len(self._encoder.get_shares_placed()) hunk ./src/allmydata/test/test_upload.py 730 class FakeServerTracker: def __init__(self, serverid, buckets): - self.serverid = serverid + self._serverid = serverid self.buckets = buckets hunk ./src/allmydata/test/test_upload.py 732 + def get_serverid(self): + return self._serverid class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, ShouldFailMixin): hunk ./src/allmydata/test/test_upload.py 794 for tracker in upload_trackers: buckets.update(tracker.buckets) for bucket in tracker.buckets: - servermap.setdefault(bucket, set()).add(tracker.serverid) + servermap.setdefault(bucket, set()).add(tracker.get_serverid()) encoder.set_shareholders(buckets, servermap) d = encoder.start() return d hunk ./src/allmydata/util/happinessutil.py 77 for tracker in upload_trackers: for shnum in tracker.buckets: - servermap.setdefault(shnum, set()).add(tracker.serverid) + servermap.setdefault(shnum, set()).add(tracker.get_serverid()) return servermap def servers_of_happiness(sharemap): } [immutable/offloaded.py: reduce use of get_serverid() a bit more warner@lothar.com**20110227011142 Ignore-this: b48acc1b2ae1b311da7f3ba4ffba38f ] { hunk ./src/allmydata/immutable/offloaded.py 12 from allmydata.immutable import upload from allmydata.immutable.layout import ReadBucketProxy from allmydata.util.assertutil import precondition -from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil +from allmydata.util import log, observer, fileutil, hashutil, dictutil class NotEnoughWritersError(Exception): hunk ./src/allmydata/immutable/offloaded.py 59 for s in self._peer_getter(storage_index): d = s.get_rref().callRemote("get_buckets", storage_index) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(s.get_serverid(),)) + callbackArgs=(s,)) dl.append(d) return defer.DeferredList(dl) hunk ./src/allmydata/immutable/offloaded.py 63 - def _got_response(self, buckets, peerid): + def _got_response(self, buckets, server): # buckets is a dict: maps shum to an rref of the server who holds it shnums_s = ",".join([str(shnum) for shnum in buckets]) self.log("got_response: [%s] has %d shares (%s)" % hunk ./src/allmydata/immutable/offloaded.py 67 - (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s), + (server.name(), len(buckets), shnums_s), level=log.NOISY) self._found_shares.update(buckets.keys()) for k in buckets: hunk ./src/allmydata/immutable/offloaded.py 71 - self._sharemap.add(k, peerid) - self._readers.update( [ (bucket, peerid) + self._sharemap.add(k, server.get_serverid()) + self._readers.update( [ (bucket, server) for bucket in buckets.values() ] ) def _got_error(self, f): hunk ./src/allmydata/immutable/offloaded.py 87 if not self._readers: self.log("no readers, so no UEB", level=log.NOISY) return - b,peerid = self._readers.pop() - rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index)) + b,server = self._readers.pop() + rbp = ReadBucketProxy(b, server.get_serverid(), si_b2a(self._storage_index)) d = rbp.get_uri_extension() d.addCallback(self._got_uri_extension) d.addErrback(self._ueb_error) } [immutable/downloader/finder.py: reduce use of get_serverid(), one left warner@lothar.com**20110227011146 Ignore-this: 5785be173b491ae8a78faf5142892020 ] { hunk ./src/allmydata/immutable/downloader/finder.py 5 import time now = time.time from foolscap.api import eventually -from allmydata.util import base32, log, idlib +from allmydata.util import base32, log from twisted.internet import reactor from share import Share, CommonShare hunk ./src/allmydata/immutable/downloader/finder.py 24 return res class RequestToken: - def __init__(self, peerid): - self.peerid = peerid + def __init__(self, server): + self.server = server class ShareFinder: OVERDUE_TIMEOUT = 10.0 hunk ./src/allmydata/immutable/downloader/finder.py 65 # test_dirnode, which creates us with storage_broker=None if not self._started: si = self.verifycap.storage_index - servers = [(s.get_serverid(), s.get_rref()) - for s in self._storage_broker.get_servers_for_psi(si)] + servers = self._storage_broker.get_servers_for_psi(si) self._servers = iter(servers) self._started = True hunk ./src/allmydata/immutable/downloader/finder.py 90 # internal methods def loop(self): - pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) + pending_s = ",".join([rt.server.name() for rt in self.pending_requests]) # sort? self.log(format="ShareFinder loop: running=%(running)s" " hungry=%(hungry)s, pending=%(pending)s", hunk ./src/allmydata/immutable/downloader/finder.py 133 eventually(self.share_consumer.no_more_shares) def send_request(self, server): - peerid, rref = server - req = RequestToken(peerid) + req = RequestToken(server) self.pending_requests.add(req) hunk ./src/allmydata/immutable/downloader/finder.py 135 - lp = self.log(format="sending DYHB to [%(peerid)s]", - peerid=idlib.shortnodeid_b2a(peerid), + lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(), level=log.NOISY, umid="Io7pyg") time_sent = now() hunk ./src/allmydata/immutable/downloader/finder.py 138 - d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) + d_ev = self._download_status.add_dyhb_sent(server.get_serverid(), + time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) hunk ./src/allmydata/immutable/downloader/finder.py 143 - d = rref.callRemote("get_buckets", self._storage_index) + d = server.get_rref().callRemote("get_buckets", self._storage_index) d.addBoth(incidentally, self._request_retired, req) d.addCallbacks(self._got_response, self._got_error, hunk ./src/allmydata/immutable/downloader/finder.py 146 - callbackArgs=(rref.version, peerid, req, d_ev, - time_sent, lp), - errbackArgs=(peerid, req, d_ev, lp)) + callbackArgs=(server, req, d_ev, time_sent, lp), + errbackArgs=(server, req, d_ev, lp)) d.addErrback(log.err, format="error in send_request", level=log.WEIRD, parent=lp, umid="rpdV0w") d.addCallback(incidentally, eventually, self.loop) hunk ./src/allmydata/immutable/downloader/finder.py 165 self.overdue_requests.add(req) eventually(self.loop) - def _got_response(self, buckets, server_version, peerid, req, d_ev, - time_sent, lp): + def _got_response(self, buckets, server, req, d_ev, time_sent, lp): shnums = sorted([shnum for shnum in buckets]) time_received = now() d_ev.finished(shnums, time_received) hunk ./src/allmydata/immutable/downloader/finder.py 171 dyhb_rtt = time_received - time_sent if not buckets: - self.log(format="no shares from [%(peerid)s]", - peerid=idlib.shortnodeid_b2a(peerid), + self.log(format="no shares from [%(name)s]", name=server.name(), level=log.NOISY, parent=lp, umid="U7d4JA") return shnums_s = ",".join([str(shnum) for shnum in shnums]) hunk ./src/allmydata/immutable/downloader/finder.py 175 - self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", - shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), + self.log(format="got shnums [%(shnums)s] from [%(name)s]", + shnums=shnums_s, name=server.name(), level=log.NOISY, parent=lp, umid="0fcEZw") shares = [] for shnum, bucket in buckets.iteritems(): hunk ./src/allmydata/immutable/downloader/finder.py 180 - s = self._create_share(shnum, bucket, server_version, peerid, - dyhb_rtt) + s = self._create_share(shnum, bucket, server, dyhb_rtt) shares.append(s) self._deliver_shares(shares) hunk ./src/allmydata/immutable/downloader/finder.py 184 - def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): + def _create_share(self, shnum, bucket, server, dyhb_rtt): if shnum in self._commonshares: cs = self._commonshares[shnum] else: hunk ./src/allmydata/immutable/downloader/finder.py 207 # 2: break _get_satisfaction into Deferred-attached pieces. # Yuck. self._commonshares[shnum] = cs - s = Share(bucket, server_version, self.verifycap, cs, self.node, - self._download_status, peerid, shnum, dyhb_rtt, + s = Share(bucket, server.get_version(), self.verifycap, cs, self.node, + self._download_status, server.get_serverid(), shnum, dyhb_rtt, self._node_logparent) return s hunk ./src/allmydata/immutable/downloader/finder.py 220 level=log.NOISY, umid="2n1qQw") eventually(self.share_consumer.got_shares, shares) - def _got_error(self, f, peerid, req, d_ev, lp): + def _got_error(self, f, server, req, d_ev, lp): d_ev.finished("error", now()) hunk ./src/allmydata/immutable/downloader/finder.py 222 - self.log(format="got error from [%(peerid)s]", - peerid=idlib.shortnodeid_b2a(peerid), failure=f, + self.log(format="got error from [%(name)s]", + name=server.name(), failure=f, level=log.UNUSUAL, parent=lp, umid="zUKdCw") } [immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps warner@lothar.com**20110227011150 Ignore-this: d8d56dd8e7b280792b40105e13664554 test_download.py: create+check MyShare instances better, make sure they share Server objects, now that finder.py cares ] { hunk ./src/allmydata/immutable/downloader/fetcher.py 192 sent_something = False want_more_diversity = False for sh in self._shares: # find one good share to fetch - shnum = sh._shnum ; serverid = sh._peerid + shnum = sh._shnum ; serverid = sh._server.get_serverid() if shnum in self._blocks: continue # don't request data we already have if shnum in self._active_share_map: hunk ./src/allmydata/immutable/downloader/fetcher.py 232 # called by Shares, in response to our s.send_request() calls. if not self._running: return - log.msg("SegmentFetcher(%s)._block_request_activity:" - " Share(sh%d-on-%s) -> %s" % - (self._node._si_prefix, shnum, share._peerid_s, state), + log.msg("SegmentFetcher(%s)._block_request_activity: %s -> %s" % + (self._node._si_prefix, repr(share), state), level=log.NOISY, parent=self._lp, umid="vilNWA") # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share # from all our tracking lists. hunk ./src/allmydata/immutable/downloader/finder.py 207 # 2: break _get_satisfaction into Deferred-attached pieces. # Yuck. self._commonshares[shnum] = cs - s = Share(bucket, server.get_version(), self.verifycap, cs, self.node, - self._download_status, server.get_serverid(), shnum, dyhb_rtt, + s = Share(bucket, server, self.verifycap, cs, self.node, + self._download_status, shnum, dyhb_rtt, self._node_logparent) return s hunk ./src/allmydata/immutable/downloader/share.py 35 # this is a specific implementation of IShare for tahoe's native storage # servers. A different backend would use a different class. - def __init__(self, rref, server_version, verifycap, commonshare, node, - download_status, peerid, shnum, dyhb_rtt, logparent): + def __init__(self, rref, server, verifycap, commonshare, node, + download_status, shnum, dyhb_rtt, logparent): self._rref = rref hunk ./src/allmydata/immutable/downloader/share.py 38 - self._server_version = server_version + self._server = server self._node = node # holds share_hash_tree and UEB self.actual_segment_size = node.segment_size # might still be None # XXX change node.guessed_segment_size to hunk ./src/allmydata/immutable/downloader/share.py 49 self._UEB_length = None self._commonshare = commonshare # holds block_hash_tree self._download_status = download_status - self._peerid = peerid - self._peerid_s = base32.b2a(peerid)[:5] self._storage_index = verifycap.storage_index self._si_prefix = base32.b2a(verifycap.storage_index)[:8] self._shnum = shnum hunk ./src/allmydata/immutable/downloader/share.py 83 # download can re-fetch it. self._requested_blocks = [] # (segnum, set(observer2..)) - ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"] + v = server.get_version() + ver = v["http://allmydata.org/tahoe/protocols/storage/v1"] self._overrun_ok = ver["tolerates-immutable-read-overrun"] # If _overrun_ok and we guess the offsets correctly, we can get # everything in one RTT. If _overrun_ok and we guess wrong, we might hunk ./src/allmydata/immutable/downloader/share.py 96 self.had_corruption = False # for unit tests def __repr__(self): - return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s) + return "Share(sh%d-on-%s)" % (self._shnum, self._server.name()) def is_alive(self): # XXX: reconsider. If the share sees a single error, should it remain hunk ./src/allmydata/immutable/downloader/share.py 729 share=repr(self), start=start, length=length, level=log.NOISY, parent=self._lp, umid="sgVAyA") - req_ev = ds.add_request_sent(self._peerid, self._shnum, + req_ev = ds.add_request_sent(self._server.get_serverid(), + self._shnum, start, length, now()) d = self._send_request(start, length) d.addCallback(self._got_data, start, length, req_ev, lp) hunk ./src/allmydata/immutable/downloader/share.py 792 log.msg(format="error requesting %(start)d+%(length)d" " from %(server)s for si %(si)s", start=start, length=length, - server=self._peerid_s, si=self._si_prefix, + server=self._server.name(), si=self._si_prefix, failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw") # retire our observers, assuming we won't be able to make any # further progress hunk ./src/allmydata/test/test_cli.py 2415 # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" - in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7vqgd) overdue= unused= need 3" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): hunk ./src/allmydata/test/test_download.py 11 from twisted.internet import defer, reactor from allmydata import uri from allmydata.storage.server import storage_index_to_dir -from allmydata.util import base32, fileutil, spans, log +from allmydata.util import base32, fileutil, spans, log, hashutil from allmydata.util.consumer import download_to_data, MemoryConsumer from allmydata.immutable import upload, layout hunk ./src/allmydata/test/test_download.py 14 -from allmydata.test.no_network import GridTestMixin +from allmydata.test.no_network import GridTestMixin, NoNetworkServer from allmydata.test.common import ShouldFailMixin from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.immutable.downloader.common import BadSegmentNumberError, \ hunk ./src/allmydata/test/test_download.py 1270 e2.finished(now+3) self.failUnlessEqual(ds.get_active(), False) +def make_server(clientid): + tubid = hashutil.tagged_hash("clientid", clientid)[:20] + return NoNetworkServer(tubid, None) +def make_servers(clientids): + servers = {} + for clientid in clientids: + servers[clientid] = make_server(clientid) + return servers + class MyShare: hunk ./src/allmydata/test/test_download.py 1280 - def __init__(self, shnum, peerid, rtt): + def __init__(self, shnum, server, rtt): self._shnum = shnum hunk ./src/allmydata/test/test_download.py 1282 - self._peerid = peerid - self._peerid_s = peerid + self._server = server self._dyhb_rtt = rtt def __repr__(self): hunk ./src/allmydata/test/test_download.py 1285 - return "sh%d-on-%s" % (self._shnum, self._peerid) + return "sh%d-on-%s" % (self._shnum, self._server.name()) class MySegmentFetcher(SegmentFetcher): def __init__(self, *args, **kwargs): hunk ./src/allmydata/test/test_download.py 1330 def test_only_one_share(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(0, "peer-A", 0.0)] + serverA = make_server("peer-A") + shares = [MyShare(0, serverA, 0.0)] sf.add_shares(shares) d = flushEventualQueue() def _check1(ign): hunk ./src/allmydata/test/test_download.py 1343 def _check2(ign): self.failUnless(node.failed) self.failUnless(node.failed.check(NotEnoughSharesError)) - self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", + sname = serverA.name() + self.failUnlessIn("complete= pending=sh0-on-%s overdue= unused=" % sname, str(node.failed)) d.addCallback(_check2) return d hunk ./src/allmydata/test/test_download.py 1352 def test_good_diversity_early(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)] sf.add_shares(shares) d = flushEventualQueue() def _check1(ign): hunk ./src/allmydata/test/test_download.py 1374 def test_good_diversity_late(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)] sf.add_shares([]) d = flushEventualQueue() def _check1(ign): hunk ./src/allmydata/test/test_download.py 1403 # we could satisfy the read entirely from the first server, but we'd # prefer not to. Instead, we expect to only pull one share from the # first server - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-A", 0.0), - MyShare(2, "peer-A", 0.0), - MyShare(3, "peer-B", 1.0), - MyShare(4, "peer-C", 2.0), + servers = make_servers(["peer-A", "peer-B", "peer-C"]) + shares = [MyShare(0, servers["peer-A"], 0.0), + MyShare(1, servers["peer-A"], 0.0), + MyShare(2, servers["peer-A"], 0.0), + MyShare(3, servers["peer-B"], 1.0), + MyShare(4, servers["peer-C"], 2.0), ] sf.add_shares([]) d = flushEventualQueue() hunk ./src/allmydata/test/test_download.py 1438 sf = MySegmentFetcher(node, 0, 3, None) # we satisfy the read entirely from the first server because we don't # have any other choice. - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-A", 0.0), - MyShare(2, "peer-A", 0.0), - MyShare(3, "peer-A", 0.0), - MyShare(4, "peer-A", 0.0), + serverA = make_server("peer-A") + shares = [MyShare(0, serverA, 0.0), + MyShare(1, serverA, 0.0), + MyShare(2, serverA, 0.0), + MyShare(3, serverA, 0.0), + MyShare(4, serverA, 0.0), ] sf.add_shares([]) d = flushEventualQueue() hunk ./src/allmydata/test/test_download.py 1474 sf = MySegmentFetcher(node, 0, 3, None) # we satisfy the read entirely from the first server because we don't # have any other choice. - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-A", 0.0), - MyShare(2, "peer-A", 0.0), - MyShare(3, "peer-A", 0.0), - MyShare(4, "peer-A", 0.0), + serverA = make_server("peer-A") + shares = [MyShare(0, serverA, 0.0), + MyShare(1, serverA, 0.0), + MyShare(2, serverA, 0.0), + MyShare(3, serverA, 0.0), + MyShare(4, serverA, 0.0), ] sf.add_shares(shares) d = flushEventualQueue() hunk ./src/allmydata/test/test_download.py 1503 def test_overdue(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + shares = [MyShare(i, make_server("peer-%d" % i), i) for i in range(10)] sf.add_shares(shares) d = flushEventualQueue() def _check1(ign): hunk ./src/allmydata/test/test_download.py 1531 def test_overdue_fails(self): node = FakeNode() sf = MySegmentFetcher(node, 0, 3, None) - shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] + servers = make_servers(["peer-%d" % i for i in range(6)]) + shares = [MyShare(i, servers["peer-%d" % i], i) for i in range(6)] sf.add_shares(shares) sf.no_more_shares() d = flushEventualQueue() hunk ./src/allmydata/test/test_download.py 1565 def _check4(ign): self.failUnless(node.failed) self.failUnless(node.failed.check(NotEnoughSharesError)) - self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", + sname = servers["peer-2"].name() + self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-%s unused=" % sname, str(node.failed)) d.addCallback(_check4) return d hunk ./src/allmydata/test/test_download.py 1577 # we could satisfy the read entirely from the first server, but we'd # prefer not to. Instead, we expect to only pull one share from the # first server - shares = [MyShare(0, "peer-A", 0.0), - MyShare(1, "peer-B", 1.0), - MyShare(0, "peer-C", 2.0), # this will be skipped - MyShare(1, "peer-D", 3.0), - MyShare(2, "peer-E", 4.0), + servers = make_servers(["peer-A", "peer-B", "peer-C", "peer-D", + "peer-E"]) + shares = [MyShare(0, servers["peer-A"],0.0), + MyShare(1, servers["peer-B"],1.0), + MyShare(0, servers["peer-C"],2.0), # this will be skipped + MyShare(1, servers["peer-D"],3.0), + MyShare(2, servers["peer-E"],4.0), ] sf.add_shares(shares[:3]) d = flushEventualQueue() } [immutable/downloader/fetcher.py: fix diversity bug in server-response handling warner@lothar.com**20110227011153 Ignore-this: bcd62232c9159371ae8a16ff63d22c1b When blocks terminate (either COMPLETE or CORRUPT/DEAD/BADSEGNUM), the _shares_from_server dict was being popped incorrectly (using shnum as the index instead of serverid). I'm still thinking through the consequences of this bug. It was probably benign and really hard to detect. I think it would cause us to incorrectly believe that we're pulling too many shares from a server, and thus prefer a different server rather than asking for a second share from the first server. The diversity code is intended to spread out the number of shares simultaneously being requested from each server, but with this bug, it might be spreading out the total number of shares requested at all, not just simultaneously. (note that SegmentFetcher is scoped to a single segment, so the effect doesn't last very long). ] hunk ./src/allmydata/immutable/downloader/fetcher.py 239 # from all our tracking lists. if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): self._share_observers.pop(share, None) - self._shares_from_server.discard(shnum, share) + self._shares_from_server.discard(share._server.get_serverid(), share) if self._active_share_map.get(shnum) is share: del self._active_share_map[shnum] self._overdue_share_map.discard(shnum, share) [immutable/downloader/fetcher.py: remove all get_serverid() calls warner@lothar.com**20110227011156 Ignore-this: fb5ef018ade1749348b546ec24f7f09a ] { hunk ./src/allmydata/immutable/downloader/fetcher.py 34 # responses arrive, or (for later segments) at # startup. We remove shares from it when we call # sh.get_block() on them. - self._shares_from_server = DictOfSets() # maps serverid to set of + self._shares_from_server = DictOfSets() # maps server to set of # Shares on that server for # which we have outstanding # get_block() calls. hunk ./src/allmydata/immutable/downloader/fetcher.py 192 sent_something = False want_more_diversity = False for sh in self._shares: # find one good share to fetch - shnum = sh._shnum ; serverid = sh._server.get_serverid() + shnum = sh._shnum ; server = sh._server # XXX if shnum in self._blocks: continue # don't request data we already have if shnum in self._active_share_map: hunk ./src/allmydata/immutable/downloader/fetcher.py 200 # and added to _overdue_share_map instead. continue # don't send redundant requests sfs = self._shares_from_server - if len(sfs.get(serverid,set())) >= self._max_shares_per_server: + if len(sfs.get(server,set())) >= self._max_shares_per_server: # don't pull too much from a single server want_more_diversity = True continue hunk ./src/allmydata/immutable/downloader/fetcher.py 207 # ok, we can use this share self._shares.remove(sh) self._active_share_map[shnum] = sh - self._shares_from_server.add(serverid, sh) + self._shares_from_server.add(server, sh) self._start_share(sh, shnum) sent_something = True break hunk ./src/allmydata/immutable/downloader/fetcher.py 239 # from all our tracking lists. if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): self._share_observers.pop(share, None) - self._shares_from_server.discard(share._server.get_serverid(), share) + server = share._server # XXX + self._shares_from_server.discard(server, share) if self._active_share_map.get(shnum) is share: del self._active_share_map[shnum] self._overdue_share_map.discard(shnum, share) } [web: remove some uses of s.get_serverid(), not all warner@lothar.com**20110227011159 Ignore-this: a9347d9cf6436537a47edc6efde9f8be ] { hunk ./src/allmydata/web/check_results.py 142 # this table is sorted by permuted order sb = c.get_storage_broker() - permuted_peer_ids = [s.get_serverid() - for s - in sb.get_servers_for_psi(cr.get_storage_index())] + permuted_servers = [s + for s + in sb.get_servers_for_psi(cr.get_storage_index())] num_shares_left = sum([len(shares) for shares in servers.values()]) servermap = [] hunk ./src/allmydata/web/check_results.py 148 - for serverid in permuted_peer_ids: - nickname = sb.get_nickname_for_serverid(serverid) - shareids = servers.get(serverid, []) + for s in permuted_servers: + nickname = s.get_nickname() + shareids = servers.get(s.get_serverid(), []) shareids.reverse() shareids_s = [ T.tt[shareid, " "] for shareid in sorted(shareids) ] servermap.append(T.tr[T.td[T.div(class_="nickname")[nickname], hunk ./src/allmydata/web/check_results.py 154 - T.div(class_="nodeid")[T.tt[base32.b2a(serverid)]]], + T.div(class_="nodeid")[T.tt[s.name()]]], T.td[shareids_s], ]) num_shares_left -= len(shareids) hunk ./src/allmydata/web/root.py 259 def render_service_row(self, ctx, server): nodeid = server.get_serverid() - ctx.fillSlots("peerid", idlib.nodeid_b2a(nodeid)) + ctx.fillSlots("peerid", server.longname()) ctx.fillSlots("nickname", server.get_nickname()) rhost = server.get_remote_host() if rhost: } [control.py: remove all uses of s.get_serverid() warner@lothar.com**20110227011203 Ignore-this: f80a787953bd7fa3d40e828bde00e855 ] { hunk ./src/allmydata/control.py 103 if not everyone_left: return results server = everyone_left.pop(0) - peerid = server.get_serverid() + server_name = server.longname() connection = server.get_rref() start = time.time() d = connection.callRemote("get_buckets", "\x00"*16) hunk ./src/allmydata/control.py 110 def _done(ignored): stop = time.time() elapsed = stop - start - if peerid in results: - results[peerid].append(elapsed) + if server_name in results: + results[server_name].append(elapsed) else: hunk ./src/allmydata/control.py 113 - results[peerid] = [elapsed] + results[server_name] = [elapsed] d.addCallback(_done) d.addCallback(self._do_one_ping, everyone_left, results) def _average(res): hunk ./src/allmydata/control.py 118 averaged = {} - for peerid,times in results.iteritems(): - averaged[peerid] = sum(times) / len(times) + for server_name,times in results.iteritems(): + averaged[server_name] = sum(times) / len(times) return averaged d.addCallback(_average) return d hunk ./src/allmydata/interfaces.py 2324 @return: a dictionary mapping peerid to a float (RTT time in seconds) """ - return DictOf(Nodeid, float) + return DictOf(str, float) UploadResults = Any() #DictOf(str, str) } Context: [docs/configuration.rst: add a "Frontend Configuration" section Brian Warner **20110222014323 Ignore-this: 657018aa501fe4f0efef9851628444ca this points to docs/frontends/*.rst, which were previously underlinked ] [web/filenode.py: avoid calling req.finish() on closed HTTP connections. Closes #1366 "Brian Warner "**20110221061544 Ignore-this: 799d4de19933f2309b3c0c19a63bb888 ] [Add unit tests for cross_check_pkg_resources_versus_import, and a regression test for ref #1355. This requires a little refactoring to make it testable. david-sarah@jacaranda.org**20110221015817 Ignore-this: 51d181698f8c20d3aca58b057e9c475a ] [allmydata/__init__.py: .name was used in place of the correct .__name__ when printing an exception. Also, robustify string formatting by using %r instead of %s in some places. fixes #1355. david-sarah@jacaranda.org**20110221020125 Ignore-this: b0744ed58f161bf188e037bad077fc48 ] [Refactor StorageFarmBroker handling of servers Brian Warner **20110221015804 Ignore-this: 842144ed92f5717699b8f580eab32a51 Pass around IServer instance instead of (peerid, rref) tuple. Replace "descriptor" with "server". Other replacements: get_all_servers -> get_connected_servers/get_known_servers get_servers_for_index -> get_servers_for_psi (now returns IServers) This change still needs to be pushed further down: lots of code is now getting the IServer and then distributing (peerid, rref) internally. Instead, it ought to distribute the IServer internally and delay extracting a serverid or rref until the last moment. no_network.py was updated to retain parallelism. ] [TAG allmydata-tahoe-1.8.2 warner@lothar.com**20110131020101] Patch bundle hash: 16ec08c4c679b07a1836ca90086a01711f1cc6ae