3 patches for repository zooko@dev.allmydata.org:/home/darcs/tahoe-lafs/trunk: Sun Jul 18 22:47:44 MDT 2010 zooko@zooko.com * trivial: rename and add in-line doc to clarify "used_peers" => "upload_servers" Mon Jul 19 02:20:00 MDT 2010 zooko@zooko.com * immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add more detailed messages about peer Mon Aug 2 00:28:09 MDT 2010 zooko@zooko.com * upload: tidy up logging messages reformat code to be less than 100 chars wide, refactor formatting of logging messages, add log levels to some logging messages, M-x whitespace-cleanup New patches: [trivial: rename and add in-line doc to clarify "used_peers" => "upload_servers" zooko@zooko.com**20100719044744 Ignore-this: 93c42081676e0dea181e55187cfc506d ] { hunk ./src/allmydata/immutable/upload.py 178 """ @return: (used_peers, already_peers), where used_peers is a set of PeerTracker instances that have agreed to hold some shares - for us (the shnum is stashed inside the PeerTracker), + for us (the shareids are stashed inside the PeerTracker), and already_peers is a dict mapping shnum to a set of peers which claim to already have the share. """ hunk ./src/allmydata/immutable/upload.py 913 def set_shareholders(self, (used_peers, already_peers), encoder): """ - @param used_peers: a sequence of PeerTracker objects + @param used_peers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker) @paran already_peers: a dict mapping sharenum to a set of peerids that claim to already have this share """ replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] used_peers upload_servers replace ./src/allmydata/test/test_upload.py [A-Za-z_0-9] used_peers upload_servers replace ./src/allmydata/util/happinessutil.py [A-Za-z_0-9] used_peers upload_servers } [immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add more detailed messages about peer zooko@zooko.com**20100719082000 Ignore-this: e034c4988b327f7e138a106d913a3082 ] { hunk ./src/allmydata/immutable/upload.py 77 # TODO: actual extensions are closer to 419 bytes, so we can probably lower # this. +def pretty_print_shnum_to_servers(s): + return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) + class PeerTracker: def __init__(self, peerid, storage_server, sharesize, blocksize, num_segments, num_share_hashes, hunk ./src/allmydata/immutable/upload.py 158 del self.buckets[sharenum] -class Tahoe2PeerSelector: +class Tahoe2PeerSelector(log.PrefixingLogMixin): def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id hunk ./src/allmydata/immutable/upload.py 169 self.num_peers_contacted = 0 self.last_failure_msg = None self._status = IUploadStatus(upload_status) - self._log_parent = log.msg("%s starting" % self, parent=logparent) + log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) + self.log("starting", level=log.OPERATIONAL) def __repr__(self): return "" % self.upload_id hunk ./src/allmydata/immutable/upload.py 275 ds.append(d) self.num_peers_contacted += 1 self.query_count += 1 - log.msg("asking peer %s for any existing shares for " - "upload id %s" - % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id), - level=log.NOISY, parent=self._log_parent) + self.log("asking peer %s for any existing shares" % + (idlib.shortnodeid_b2a(peer.peerid),), + level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) return dl hunk ./src/allmydata/immutable/upload.py 289 Tahoe2PeerSelector._existing_shares. """ if isinstance(res, failure.Failure): - log.msg("%s got error during existing shares check: %s" + self.log("%s got error during existing shares check: %s" % (idlib.shortnodeid_b2a(peer), res), hunk ./src/allmydata/immutable/upload.py 291 - level=log.UNUSUAL, parent=self._log_parent) + level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 else: hunk ./src/allmydata/immutable/upload.py 298 buckets = res if buckets: self.peers_with_shares.add(peer) - log.msg("response from peer %s: alreadygot=%s" + self.log("response to get_buckets() from peer %s: alreadygot=%s" % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), hunk ./src/allmydata/immutable/upload.py 300 - level=log.NOISY, parent=self._log_parent) + level=log.NOISY) for bucket in buckets: self.preexisting_shares.setdefault(bucket, set()).add(peer) if self.homeless_shares and bucket in self.homeless_shares: hunk ./src/allmydata/immutable/upload.py 334 merged = merge_peers(self.preexisting_shares, self.use_peers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: - msg = ("peer selection successful for %s: %s" % (self, - self._get_progress_message())) - log.msg(msg, parent=self._log_parent) + msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares))) + self.log(msg, level=log.OPERATIONAL) return (self.use_peers, self.preexisting_shares) else: # We're not okay right now, but maybe we can fix it by hunk ./src/allmydata/immutable/upload.py 379 self.needed_shares, self.servers_of_happiness, effective_happiness) - log.msg("server selection unsuccessful for %r: %s (%s), merged=%r" - % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT) + self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT) return self._failed("%s (%s)" % (msg, self._get_progress_message())) if self.uncontacted_peers: hunk ./src/allmydata/immutable/upload.py 402 elif self.contacted_peers: # ask a peer that we've already asked. if not self._started_second_pass: - log.msg("starting second pass", parent=self._log_parent, + self.log("starting second pass", level=log.NOISY) self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), hunk ./src/allmydata/immutable/upload.py 440 self._get_progress_message())) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) - log.msg(msg, level=log.UNUSUAL, parent=self._log_parent) + self.log(msg, level=log.UNUSUAL) return self._failed(msg) else: # we placed enough to be happy, so we're done hunk ./src/allmydata/immutable/upload.py 446 if self._status: self._status.set_status("Placed all shares") + msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, + self._get_progress_message(), pretty_print_shnum_to_servers(merged))) + self.log(msg, level=log.OPERATIONAL) return (self.use_peers, self.preexisting_shares) def _got_response(self, res, peer, shares_to_ask, put_peer_here): hunk ./src/allmydata/immutable/upload.py 455 if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. - log.msg("%s got error during peer selection: %s" % (peer, res), - level=log.UNUSUAL, parent=self._log_parent) + self.log("%s got error during peer selection: %s" % (peer, res), + level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 self.homeless_shares = list(shares_to_ask) + self.homeless_shares hunk ./src/allmydata/immutable/upload.py 475 self.last_failure_msg = msg else: (alreadygot, allocated) = res - log.msg("response from peer %s: alreadygot=%s, allocated=%s" + self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s" % (idlib.shortnodeid_b2a(peer.peerid), tuple(sorted(alreadygot)), tuple(sorted(allocated))), hunk ./src/allmydata/immutable/upload.py 478 - level=log.NOISY, parent=self._log_parent) + level=log.NOISY) progress = False for s in alreadygot: self.preexisting_shares.setdefault(s, set()).add(peer.peerid) hunk ./src/allmydata/immutable/upload.py 921 @paran already_peers: a dict mapping sharenum to a set of peerids that claim to already have this share """ - self.log("_send_shares, upload_servers is %s" % (upload_servers,)) + self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers)) # record already-present shares in self._results self._results.preexisting_shares = len(already_peers) hunk ./src/allmydata/immutable/upload.py 935 for shnum in peer.buckets: self._peer_trackers[shnum] = peer servermap.setdefault(shnum, set()).add(peer.peerid) + self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])) assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]) encoder.set_shareholders(buckets, servermap) hunk ./src/allmydata/storage/server.py 8 from zope.interface import implements from allmydata.interfaces import RIStorageServer, IStatsProducer -from allmydata.util import fileutil, log, time_format +from allmydata.util import fileutil, idlib, log, time_format import allmydata # for __full_version__ from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir hunk ./src/allmydata/storage/server.py 109 expiration_sharetypes) self.lease_checker.setServiceParent(self) + def __repr__(self): + return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) + def add_bucket_counter(self): statefile = os.path.join(self.storedir, "bucket_counter.state") self.bucket_counter = BucketCountingCrawler(self, statefile) hunk ./src/allmydata/test/test_upload.py 14 from allmydata import uri, monitor, client from allmydata.immutable import upload, encode from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError +from allmydata.util import log from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata.test.no_network import GridTestMixin hunk ./src/allmydata/test/test_upload.py 714 def is_happy_enough(servertoshnums, h, k): """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """ + print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k if len(servertoshnums) < h: return False # print "servertoshnums: ", servertoshnums, h, k hunk ./src/allmydata/test/test_upload.py 803 def _add_server(self, server_number, readonly=False): assert self.g, "I tried to find a grid at self.g, but failed" ss = self.g.make_server(server_number, readonly) + log.msg("just created a server, number: %s => %s" % (server_number, ss,)) self.g.add_server(server_number, ss) hunk ./src/allmydata/test/test_upload.py 806 - def _add_server_with_share(self, server_number, share_number=None, readonly=False): self._add_server(server_number, readonly) hunk ./src/allmydata/test/test_upload.py 866 d.addCallback(_store_shares) return d - def test_configure_parameters(self): self.basedir = self.mktemp() hooks = {0: self._set_up_nodes_extra_config} } [upload: tidy up logging messages zooko@zooko.com**20100802062809 Ignore-this: b7d8b8360cfe4bebd8b8493ce72d15fa reformat code to be less than 100 chars wide, refactor formatting of logging messages, add log levels to some logging messages, M-x whitespace-cleanup ] { hunk ./src/allmydata/immutable/upload.py 158 del self.buckets[sharenum] +def str_shareloc(shnum, bucketwriter): + return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),) + class Tahoe2PeerSelector(log.PrefixingLogMixin): def __init__(self, upload_id, logparent=None, upload_status=None): hunk ./src/allmydata/immutable/upload.py 337 merged = merge_peers(self.preexisting_shares, self.use_peers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: - msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares))) + msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " + "self.use_peers: %s, self.preexisting_shares: %s") + % (self, self._get_progress_message(), + pretty_print_shnum_to_servers(merged), + [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()]) + for p in self.use_peers], + pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) return (self.use_peers, self.preexisting_shares) else: hunk ./src/allmydata/immutable/upload.py 351 # redistributing some shares. In cases where one or two # servers has, before the upload, all or most of the # shares for a given SI, this can work by allowing _loop - # a chance to spread those out over the other peers, + # a chance to spread those out over the other peers, delta = self.servers_of_happiness - effective_happiness shares = shares_by_server(self.preexisting_shares) # Each server in shares maps to a set of shares stored on it. hunk ./src/allmydata/immutable/upload.py 355 - # Since we want to keep at least one share on each server + # Since we want to keep at least one share on each server # that has one (otherwise we'd only be making # the situation worse by removing distinct servers), # each server has len(its shares) - 1 to spread around. hunk ./src/allmydata/immutable/upload.py 384 else: # Redistribution won't help us; fail. peer_count = len(self.peers_with_shares) - msg = failure_message(peer_count, + failmsg = failure_message(peer_count, self.needed_shares, self.servers_of_happiness, effective_happiness) hunk ./src/allmydata/immutable/upload.py 388 - self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT) - return self._failed("%s (%s)" % (msg, self._get_progress_message())) + servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s" + servmsg = servmsgtempl % ( + self, + failmsg, + self._get_progress_message(), + pretty_print_shnum_to_servers(merged) + ) + self.log(servmsg, level=log.INFREQUENT) + return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) if self.uncontacted_peers: peer = self.uncontacted_peers.pop(0) hunk ./src/allmydata/immutable/upload.py 933 def set_shareholders(self, (upload_servers, already_peers), encoder): """ - @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker) + @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some + shares for us (the shareids are stashed inside the PeerTracker) @paran already_peers: a dict mapping sharenum to a set of peerids that claim to already have this share """ hunk ./src/allmydata/immutable/upload.py 938 - self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers)) + msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s" + values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()]) + for p in upload_servers], already_peers) + self.log(msgtempl % values, level=log.OPERATIONAL) # record already-present shares in self._results self._results.preexisting_shares = len(already_peers) hunk ./src/allmydata/immutable/upload.py 955 for shnum in peer.buckets: self._peer_trackers[shnum] = peer servermap.setdefault(shnum, set()).add(peer.peerid) - self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])) - assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]) + assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), + "%s (%s) != %s (%s)" % ( + len(buckets), + buckets, + sum([len(peer.buckets) for peer in upload_servers]), + [(p.buckets, p.peerid) for p in upload_servers] + ) encoder.set_shareholders(buckets, servermap) def _encrypted_done(self, verifycap): hunk ./src/allmydata/immutable/upload.py 1165 now = self._time_contacting_helper_start = time.time() self._storage_index_elapsed = now - self._started self.log(format="contacting helper for SI %(si)s..", - si=si_b2a(self._storage_index)) + si=si_b2a(self._storage_index), level=log.NOISY) self._upload_status.set_status("Contacting Helper") d = self._helper.callRemote("upload_chk", self._storage_index) d.addCallback(self._contacted_helper) hunk ./src/allmydata/immutable/upload.py 1176 elapsed = now - self._time_contacting_helper_start self._elapsed_time_contacting_helper = elapsed if upload_helper: - self.log("helper says we need to upload") + self.log("helper says we need to upload", level=log.NOISY) self._upload_status.set_status("Uploading Ciphertext") # we need to upload the file reu = RemoteEncryptedUploadable(self._encuploadable, hunk ./src/allmydata/immutable/upload.py 1187 upload_helper.callRemote("upload", reu)) # this Deferred will fire with the upload results return d - self.log("helper says file is already uploaded") + self.log("helper says file is already uploaded", level=log.OPERATIONAL) self._upload_status.set_progress(1, 1.0) self._upload_status.set_results(upload_results) return upload_results hunk ./src/allmydata/immutable/upload.py 1210 upload_results.sharemap = None def _build_verifycap(self, upload_results): - self.log("upload finished, building readcap") + self.log("upload finished, building readcap", level=log.OPERATIONAL) self._convert_old_upload_results(upload_results) self._upload_status.set_status("Building Readcap") r = upload_results } Context: [docs: add Jacob Lyles to CREDITS zooko@zooko.com**20100730230500 Ignore-this: 9dbbd6a591b4b1a5a8dcb69b7b757792 ] [web: don't use %d formatting on a potentially large negative float -- there is a bug in Python 2.5 in that case jacob.lyles@gmail.com**20100730220550 Ignore-this: 7080eb4bddbcce29cba5447f8f4872ee fixes #1055 ] [docs: fix licensing typo that was earlier fixed in [20090921164651-92b7f-7f97b58101d93dc588445c52a9aaa56a2c7ae336] zooko@zooko.com**20100729052923 Ignore-this: a975d79115911688e5469d4d869e1664 I wish we didn't copies of this licensing text in several different files so that changes can be accidentally omitted from some of them. ] [misc/build_helpers/run-with-pythonpath.py: fix stale comment, and remove 'trial' example that is not the right way to run trial. david-sarah@jacaranda.org**20100726225729 Ignore-this: a61f55557ad69a1633bfb2b8172cce97 ] [docs/specifications/dirnodes.txt: 'mesh'->'grid'. david-sarah@jacaranda.org**20100723061616 Ignore-this: 887bcf921ef00afba8e05e9239035bca ] [docs/specifications/dirnodes.txt: bring layer terminology up-to-date with architecture.txt, and a few other updates (e.g. note that the MAC is no longer verified, and that URIs can be unknown). Also 'Tahoe'->'Tahoe-LAFS'. david-sarah@jacaranda.org**20100723054703 Ignore-this: f3b98183e7d0a0f391225b8b93ac6c37 ] [__init__.py: silence DeprecationWarning about BaseException.message globally. fixes #1129 david-sarah@jacaranda.org**20100720011939 Ignore-this: 38808986ba79cb2786b010504a22f89 ] [test_runner: test that 'tahoe --version' outputs no noise (e.g. DeprecationWarnings). david-sarah@jacaranda.org**20100720011345 Ignore-this: dd358b7b2e5d57282cbe133e8069702e ] [docs: use current cap to Zooko's wiki page in example text zooko@zooko.com**20100721010543 Ignore-this: 4f36f36758f9fdbaf9eb73eac23b6652 fixes #1134 ] [TAG allmydata-tahoe-1.7.1 zooko@zooko.com**20100719131352 Ignore-this: 6942056548433dc653a746703819ad8c ] Patch bundle hash: b0908cb6006dc648a50f57ed437eab671279abf2