diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py index e30ced8..e78d37e 100644 --- a/src/allmydata/immutable/downloader/fetcher.py +++ b/src/allmydata/immutable/downloader/fetcher.py @@ -4,8 +4,8 @@ from foolscap.api import eventually from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.util import log from allmydata.util.dictutil import DictOfSets -from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \ - BADSEGNUM, BadSegmentNumberError +from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \ + BadSegmentNumberError class SegmentFetcher: """I am responsible for acquiring blocks for a single segment. I will use @@ -22,35 +22,42 @@ class SegmentFetcher: will shut down and do no further work. My parent can also call my stop() method to have me shut down early.""" - def __init__(self, node, segnum, k): + def __init__(self, node, segnum, k, logparent): self._node = node # _Node self.segnum = segnum self._k = k - self._shares = {} # maps non-dead Share instance to a state, one of - # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). - # State transition map is: - # AVAILABLE -(send-read)-> PENDING - # PENDING -(timer)-> OVERDUE - # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # If a share becomes DEAD, it is removed from the - # dict. If it becomes BADSEGNUM, the whole fetch is - # terminated. + self._shares = [] # unused Share instances, sorted by "goodness" + # (RTT), then shnum. This is populated when DYHB + # responses arrive, or (for later segments) at + # startup. We remove shares from it when we call + # sh.get_block() on them. + self._shares_from_server = DictOfSets() # maps serverid to set of + # Shares on that server for + # which we have outstanding + # get_block() calls. + self._max_shares_per_server = 1 # how many Shares we're allowed to + # pull from each server. This starts + # at 1 and grows if we don't have + # sufficient diversity. + self._active_share_map = {} # maps shnum to outstanding (and not + # OVERDUE) Share that provides it. + self._overdue_share_map = DictOfSets() # shares in the OVERDUE state + self._lp = logparent self._share_observers = {} # maps Share to EventStreamObserver for # active ones - self._shnums = DictOfSets() # maps shnum to the shares that provide it self._blocks = {} # maps shnum to validated block data self._no_more_shares = False - self._bad_segnum = False self._last_failure = None self._running = True def stop(self): log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix, - level=log.NOISY, umid="LWyqpg") + level=log.NOISY, parent=self._lp, umid="LWyqpg") self._cancel_all_requests() self._running = False - self._shares.clear() # let GC work # ??? XXX + # help GC ??? XXX + del self._shares, self._shares_from_server, self._active_share_map + del self._share_observers # called by our parent _Node @@ -59,9 +66,8 @@ class SegmentFetcher: # called when ShareFinder locates a new share, and when a non-initial # segment fetch is started and we already know about shares from the # previous segment - for s in shares: - self._shares[s] = AVAILABLE - self._shnums.add(s._shnum, s) + self._shares.extend(shares) + self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) ) eventually(self.loop) def no_more_shares(self): @@ -71,15 +77,6 @@ class SegmentFetcher: # internal methods - def _count_shnums(self, *states): - """shnums for which at least one state is in the following list""" - shnums = [] - for shnum,shares in self._shnums.iteritems(): - matches = [s for s in shares if self._shares.get(s) in states] - if matches: - shnums.append(shnum) - return len(shnums) - def loop(self): try: # if any exception occurs here, kill the download @@ -92,7 +89,8 @@ class SegmentFetcher: k = self._k if not self._running: return - if self._bad_segnum: + numsegs, authoritative = self._node.get_num_segments() + if authoritative and self.segnum >= numsegs: # oops, we were asking for a segment number beyond the end of the # file. This is an error. self.stop() @@ -102,98 +100,125 @@ class SegmentFetcher: self._node.fetch_failed(self, f) return + #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares + # Should we sent out more requests? + while len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + ) < k: + # we don't have data or active requests for enough shares. Are + # there any unused shares we can start using? + (sent_something, want_more_diversity) = self._find_and_use_share() + if sent_something: + # great. loop back around in case we need to send more. + continue + if want_more_diversity: + # we could have sent something if we'd been allowed to pull + # more shares per server. Increase the limit and try again. + self._max_shares_per_server += 1 + log.msg("SegmentFetcher(%s) increasing diversity limit to %d" + % (self._node._si_prefix, self._max_shares_per_server), + level=log.NOISY, umid="xY2pBA") + # Also ask for more shares, in the hopes of achieving better + # diversity for the next segment. + self._ask_for_more_shares() + continue + # we need more shares than the ones in self._shares to make + # progress + self._ask_for_more_shares() + if self._no_more_shares: + # But there are no more shares to be had. If we're going to + # succeed, it will be with the shares we've already seen. + # Will they be enough? + if len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + | set(self._overdue_share_map.keys()) + ) < k: + # nope. bail. + self._no_shares_error() # this calls self.stop() + return + # our outstanding or overdue requests may yet work. + # more shares may be coming. Wait until then. + return + # are we done? - if self._count_shnums(COMPLETE) >= k: + if len(set(self._blocks.keys())) >= k: # yay! self.stop() self._node.process_blocks(self.segnum, self._blocks) return - # we may have exhausted everything - if (self._no_more_shares and - self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): - # no more new shares are coming, and the remaining hopeful shares - # aren't going to be enough. boo! - - log.msg("share states: %r" % (self._shares,), - level=log.NOISY, umid="0ThykQ") - if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: - format = ("no shares (need %(k)d)." - " Last failure: %(last_failure)s") - args = { "k": k, - "last_failure": self._last_failure } - error = NoSharesError - else: - format = ("ran out of shares: %(complete)d complete," - " %(pending)d pending, %(overdue)d overdue," - " %(unused)d unused, need %(k)d." - " Last failure: %(last_failure)s") - args = {"complete": self._count_shnums(COMPLETE), - "pending": self._count_shnums(PENDING), - "overdue": self._count_shnums(OVERDUE), - # 'unused' should be zero - "unused": self._count_shnums(AVAILABLE), - "k": k, - "last_failure": self._last_failure, - } - error = NotEnoughSharesError - log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) - e = error(format % args) - f = Failure(e) - self.stop() - self._node.fetch_failed(self, f) - return + def _no_shares_error(self): + if not (self._shares or self._active_share_map or + self._overdue_share_map or self._blocks): + format = ("no shares (need %(k)d)." + " Last failure: %(last_failure)s") + args = { "k": self._k, + "last_failure": self._last_failure } + error = NoSharesError + else: + format = ("ran out of shares: complete=%(complete)s" + " pending=%(pending)s overdue=%(overdue)s" + " unused=%(unused)s need %(k)d." + " Last failure: %(last_failure)s") + def join(shnums): return ",".join(["sh%d" % shnum + for shnum in sorted(shnums)]) + pending_s = ",".join([str(sh) + for sh in self._active_share_map.values()]) + overdue = set() + for shares in self._overdue_share_map.values(): + overdue |= shares + overdue_s = ",".join([str(sh) for sh in overdue]) + args = {"complete": join(self._blocks.keys()), + "pending": pending_s, + "overdue": overdue_s, + # 'unused' should be zero + "unused": ",".join([str(sh) for sh in self._shares]), + "k": self._k, + "last_failure": self._last_failure, + } + error = NotEnoughSharesError + log.msg(format=format, + level=log.UNUSUAL, parent=self._lp, umid="1DsnTg", + **args) + e = error(format % args) + f = Failure(e) + self.stop() + self._node.fetch_failed(self, f) - # nope, not done. Are we "block-hungry" (i.e. do we want to send out - # more read requests, or do we think we have enough in flight - # already?) - while self._count_shnums(PENDING, COMPLETE) < k: - # we're hungry.. are there any unused shares? - sent = self._send_new_request() - if not sent: - break - - # ok, now are we "share-hungry" (i.e. do we have enough known shares - # to make us happy, or should we ask the ShareFinder to get us more?) - if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: - # we're hungry for more shares - self._node.want_more_shares() - # that will trigger the ShareFinder to keep looking - - def _find_one(self, shares, state): - # TODO could choose fastest, or avoid servers already in use - for s in shares: - if self._shares[s] == state: - return s - # can never get here, caller has assert in case of code bug - - def _send_new_request(self): - # TODO: this is probably O(k^2), and we're called from a range(k) - # loop, so O(k^3) - - # this first loop prefers sh0, then sh1, sh2, etc - for shnum,shares in sorted(self._shnums.iteritems()): - states = [self._shares[s] for s in shares] - if COMPLETE in states or PENDING in states: - # don't send redundant requests + def _find_and_use_share(self): + sent_something = False + want_more_diversity = False + for sh in self._shares: # find one good share to fetch + shnum = sh._shnum ; serverid = sh._peerid + if shnum in self._blocks: + continue # don't request data we already have + if shnum in self._active_share_map: + # note: OVERDUE shares are removed from _active_share_map + # and added to _overdue_share_map instead. + continue # don't send redundant requests + sfs = self._shares_from_server + if len(sfs.get(serverid,set())) >= self._max_shares_per_server: + # don't pull too much from a single server + want_more_diversity = True continue - if AVAILABLE not in states: - # no candidates for this shnum, move on - continue - # here's a candidate. Send a request. - s = self._find_one(shares, AVAILABLE) - assert s - self._shares[s] = PENDING - self._share_observers[s] = o = s.get_block(self.segnum) - o.subscribe(self._block_request_activity, share=s, shnum=shnum) - # TODO: build up a list of candidates, then walk through the - # list, sending requests to the most desireable servers, - # re-checking our block-hunger each time. For non-initial segment - # fetches, this would let us stick with faster servers. - return True - # nothing was sent: don't call us again until you have more shares to - # work with, or one of the existing shares has been declared OVERDUE - return False + # ok, we can use this share + self._shares.remove(sh) + self._active_share_map[shnum] = sh + self._shares_from_server.add(serverid, sh) + self._start_share(sh, shnum) + sent_something = True + break + return (sent_something, want_more_diversity) + + def _start_share(self, share, shnum): + self._share_observers[share] = o = share.get_block(self.segnum) + o.subscribe(self._block_request_activity, share=share, shnum=shnum) + + def _ask_for_more_shares(self): + if not self._no_more_shares: + self._node.want_more_shares() + # that will trigger the ShareFinder to keep looking, and call our + # add_shares() or no_more_shares() later. def _cancel_all_requests(self): for o in self._share_observers.values(): @@ -207,27 +232,33 @@ class SegmentFetcher: log.msg("SegmentFetcher(%s)._block_request_activity:" " Share(sh%d-on-%s) -> %s" % (self._node._si_prefix, shnum, share._peerid_s, state), - level=log.NOISY, umid="vilNWA") - # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. + level=log.NOISY, parent=self._lp, umid="vilNWA") + # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share + # from all our tracking lists. if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): self._share_observers.pop(share, None) + self._shares_from_server.discard(shnum, share) + if self._active_share_map.get(shnum) is share: + del self._active_share_map[shnum] + self._overdue_share_map.discard(shnum, share) + if state is COMPLETE: - # 'block' is fully validated - self._shares[share] = COMPLETE + # 'block' is fully validated and complete self._blocks[shnum] = block - elif state is OVERDUE: - self._shares[share] = OVERDUE + + if state is OVERDUE: + # no longer active, but still might complete + del self._active_share_map[shnum] + self._overdue_share_map.add(shnum, share) # OVERDUE is not terminal: it will eventually transition to # COMPLETE, CORRUPT, or DEAD. - elif state is CORRUPT: - self._shares[share] = CORRUPT - elif state is DEAD: - del self._shares[share] - self._shnums[shnum].remove(share) - self._last_failure = f - elif state is BADSEGNUM: - self._shares[share] = BADSEGNUM # ??? - self._bad_segnum = True - eventually(self.loop) + if state is DEAD: + self._last_failure = f + if state is BADSEGNUM: + # our main loop will ask the DownloadNode each time for the + # number of segments, so we'll deal with this in the top of + # _do_loop + pass + eventually(self.loop) diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 9adee99..fa6204c 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -35,11 +35,9 @@ class ShareFinder: self._storage_broker = storage_broker self.share_consumer = self.node = node self.max_outstanding_requests = max_outstanding_requests - self._hungry = False self._commonshares = {} # shnum to CommonShare instance - self.undelivered_shares = [] self.pending_requests = set() self.overdue_requests = set() # subset of pending_requests self.overdue_timers = {} @@ -52,6 +50,12 @@ class ShareFinder: si=self._si_prefix, level=log.NOISY, parent=logparent, umid="2xjj2A") + def update_num_segments(self): + (numsegs, authoritative) = self.node.get_num_segments() + assert authoritative + for cs in self._commonshares.values(): + cs.set_authoritative_num_segments(numsegs) + def start_finding_servers(self): # don't get servers until somebody uses us: creating the # ImmutableFileNode should not cause work to happen yet. Test case is @@ -83,30 +87,16 @@ class ShareFinder: # internal methods def loop(self): - undelivered_s = ",".join(["sh%d@%s" % - (s._shnum, idlib.shortnodeid_b2a(s._peerid)) - for s in self.undelivered_shares]) pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) for rt in self.pending_requests]) # sort? self.log(format="ShareFinder loop: running=%(running)s" - " hungry=%(hungry)s, undelivered=%(undelivered)s," - " pending=%(pending)s", - running=self.running, hungry=self._hungry, - undelivered=undelivered_s, pending=pending_s, + " hungry=%(hungry)s, pending=%(pending)s", + running=self.running, hungry=self._hungry, pending=pending_s, level=log.NOISY, umid="kRtS4Q") if not self.running: return if not self._hungry: return - if self.undelivered_shares: - sh = self.undelivered_shares.pop(0) - # they will call hungry() again if they want more - self._hungry = False - self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)", - shnum=sh._shnum, peerid=sh._peerid_s, - level=log.NOISY, umid="2n1qQw") - eventually(self.share_consumer.got_shares, [sh]) - return non_overdue = self.pending_requests - self.overdue_requests if len(non_overdue) >= self.max_outstanding_requests: @@ -146,14 +136,16 @@ class ShareFinder: lp = self.log(format="sending DYHB to [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="Io7pyg") - d_ev = self._download_status.add_dyhb_sent(peerid, now()) + time_sent = now() + d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) d = rref.callRemote("get_buckets", self._storage_index) d.addBoth(incidentally, self._request_retired, req) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(rref.version, peerid, req, d_ev, lp), + callbackArgs=(rref.version, peerid, req, d_ev, + time_sent, lp), errbackArgs=(peerid, req, d_ev, lp)) d.addErrback(log.err, format="error in send_request", level=log.WEIRD, parent=lp, umid="rpdV0w") @@ -172,33 +164,37 @@ class ShareFinder: self.overdue_requests.add(req) eventually(self.loop) - def _got_response(self, buckets, server_version, peerid, req, d_ev, lp): + def _got_response(self, buckets, server_version, peerid, req, d_ev, + time_sent, lp): shnums = sorted([shnum for shnum in buckets]) - d_ev.finished(shnums, now()) - if buckets: - shnums_s = ",".join([str(shnum) for shnum in shnums]) - self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", - shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), - level=log.NOISY, parent=lp, umid="0fcEZw") - else: + time_received = now() + d_ev.finished(shnums, time_received) + dyhb_rtt = time_received - time_sent + if not buckets: self.log(format="no shares from [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, parent=lp, umid="U7d4JA") - if self.node.num_segments is None: - best_numsegs = self.node.guessed_num_segments - else: - best_numsegs = self.node.num_segments + return + shnums_s = ",".join([str(shnum) for shnum in shnums]) + self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", + shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="0fcEZw") + shares = [] for shnum, bucket in buckets.iteritems(): - self._create_share(best_numsegs, shnum, bucket, server_version, - peerid) + s = self._create_share(shnum, bucket, server_version, peerid, + dyhb_rtt) + shares.append(s) + self._deliver_shares(shares) - def _create_share(self, best_numsegs, shnum, bucket, server_version, - peerid): + def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): if shnum in self._commonshares: cs = self._commonshares[shnum] else: - cs = CommonShare(best_numsegs, self._si_prefix, shnum, + numsegs, authoritative = self.node.get_num_segments() + cs = CommonShare(numsegs, self._si_prefix, shnum, self._node_logparent) + if authoritative: + cs.set_authoritative_num_segments(numsegs) # Share._get_satisfaction is responsible for updating # CommonShare.set_numsegs after we know the UEB. Alternatives: # 1: d = self.node.get_num_segments() @@ -214,9 +210,17 @@ class ShareFinder: # Yuck. self._commonshares[shnum] = cs s = Share(bucket, server_version, self.verifycap, cs, self.node, - self._download_status, peerid, shnum, + self._download_status, peerid, shnum, dyhb_rtt, self._node_logparent) - self.undelivered_shares.append(s) + return s + + def _deliver_shares(self, shares): + # they will call hungry() again if they want more + self._hungry = False + shares_s = ",".join([str(sh) for sh in shares]) + self.log(format="delivering shares: %s" % shares_s, + level=log.NOISY, umid="2n1qQw") + eventually(self.share_consumer.got_shares, shares) def _got_error(self, f, peerid, req, d_ev, lp): d_ev.finished("error", now()) diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 4c92dd8..33c16cf 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -72,7 +72,7 @@ class DownloadNode: # things to track callers that want data # _segment_requests can have duplicates - self._segment_requests = [] # (segnum, d, cancel_handle) + self._segment_requests = [] # (segnum, d, cancel_handle, logparent) self._active_segment = None # a SegmentFetcher, with .segnum self._segsize_observers = observer.OneShotObserverList() @@ -81,7 +81,8 @@ class DownloadNode: # for each read() call. Segmentation and get_segment() messages are # associated with the read() call, everything else is tied to the # _Node's log entry. - lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d," + lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:" + " size=%(size)d," " guessed_segsize=%(guessed_segsize)d," " guessed_numsegs=%(guessed_numsegs)d", si=self._si_prefix, size=verifycap.size, @@ -103,9 +104,10 @@ class DownloadNode: # as with CommonShare, our ciphertext_hash_tree is a stub until we # get the real num_segments self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) + self.ciphertext_hash_tree_leaves = self.guessed_num_segments def __repr__(self): - return "Imm_Node(%s)" % (self._si_prefix,) + return "ImmutableDownloadNode(%s)" % (self._si_prefix,) def stop(self): # called by the Terminator at shutdown, mostly for tests @@ -175,14 +177,14 @@ class DownloadNode: The Deferred can also errback with other fatal problems, such as NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. """ - log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", - si=base32.b2a(self._verifycap.storage_index)[:8], - segnum=segnum, - level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") + lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", + si=base32.b2a(self._verifycap.storage_index)[:8], + segnum=segnum, + level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") self._download_status.add_segment_request(segnum, now()) d = defer.Deferred() c = Cancel(self._cancel_request) - self._segment_requests.append( (segnum, d, c) ) + self._segment_requests.append( (segnum, d, c, lp) ) self._start_new_segment() return (d, c) @@ -208,10 +210,11 @@ class DownloadNode: if self._active_segment is None and self._segment_requests: segnum = self._segment_requests[0][0] k = self._verifycap.needed_shares + lp = self._segment_requests[0][3] log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", node=repr(self), segnum=segnum, - level=log.NOISY, umid="wAlnHQ") - self._active_segment = fetcher = SegmentFetcher(self, segnum, k) + level=log.NOISY, parent=lp, umid="wAlnHQ") + self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) active_shares = [s for s in self._shares if s.is_alive()] fetcher.add_shares(active_shares) # this triggers the loop @@ -234,13 +237,17 @@ class DownloadNode: h = hashutil.uri_extension_hash(UEB_s) if h != self._verifycap.uri_extension_hash: raise BadHashError - UEB_dict = uri.unpack_extension(UEB_s) - self._parse_and_store_UEB(UEB_dict) # sets self._stuff + self._parse_and_store_UEB(UEB_s) # sets self._stuff # TODO: a malformed (but authentic) UEB could throw an assertion in # _parse_and_store_UEB, and we should abandon the download. self.have_UEB = True - def _parse_and_store_UEB(self, d): + # inform the ShareFinder about our correct number of segments. This + # will update the block-hash-trees in all existing CommonShare + # instances, and will populate new ones with the correct value. + self._sharefinder.update_num_segments() + + def _parse_and_store_UEB(self, UEB_s): # Note: the UEB contains needed_shares and total_shares. These are # redundant and inferior (the filecap contains the authoritative # values). However, because it is possible to encode the same file in @@ -252,8 +259,11 @@ class DownloadNode: # therefore, we ignore d['total_shares'] and d['needed_shares']. + d = uri.unpack_extension(UEB_s) + log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", - ueb=repr(d), vcap=self._verifycap.to_string(), + ueb=repr(uri.unpack_extension_readable(UEB_s)), + vcap=self._verifycap.to_string(), level=log.NOISY, parent=self._lp, umid="cVqZnA") k, N = self._verifycap.needed_shares, self._verifycap.total_shares @@ -292,6 +302,7 @@ class DownloadNode: # shares of file B. self.ciphertext_hash_tree was a guess before: # this is where we create it for real. self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) + self.ciphertext_hash_tree_leaves = self.num_segments self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) self.share_hash_tree.set_hashes({0: d['share_root_hash']}) @@ -344,9 +355,15 @@ class DownloadNode: % (hashnum, len(self.share_hash_tree))) self.share_hash_tree.set_hashes(share_hashes) + def get_desired_ciphertext_hashes(self, segnum): + if segnum < self.ciphertext_hash_tree_leaves: + return self.ciphertext_hash_tree.needed_hashes(segnum, + include_leaf=True) + return [] def get_needed_ciphertext_hashes(self, segnum): cht = self.ciphertext_hash_tree return cht.needed_hashes(segnum, include_leaf=True) + def process_ciphertext_hashes(self, hashes): assert self.num_segments is not None # this may raise BadHashError or NotEnoughHashesError @@ -457,7 +474,7 @@ class DownloadNode: def _extract_requests(self, segnum): """Remove matching requests and return their (d,c) tuples so that the caller can retire them.""" - retire = [(d,c) for (segnum0, d, c) in self._segment_requests + retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests if segnum0 == segnum] self._segment_requests = [t for t in self._segment_requests if t[0] != segnum] @@ -466,10 +483,18 @@ class DownloadNode: def _cancel_request(self, c): self._segment_requests = [t for t in self._segment_requests if t[2] != c] - segnums = [segnum for (segnum,d,c) in self._segment_requests] + segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] # self._active_segment might be None in rare circumstances, so make # sure we tolerate it if self._active_segment and self._active_segment.segnum not in segnums: self._active_segment.stop() self._active_segment = None self._start_new_segment() + + # called by ShareFinder to choose hashtree sizes in CommonShares, and by + # SegmentFetcher to tell if it is still fetching a valid segnum. + def get_num_segments(self): + # returns (best_num_segments, authoritative) + if self.num_segments is None: + return (self.guessed_num_segments, False) + return (self.num_segments, True) diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index 413f907..78cce8e 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -33,7 +33,7 @@ class Share: # servers. A different backend would use a different class. def __init__(self, rref, server_version, verifycap, commonshare, node, - download_status, peerid, shnum, logparent): + download_status, peerid, shnum, dyhb_rtt, logparent): self._rref = rref self._server_version = server_version self._node = node # holds share_hash_tree and UEB @@ -51,6 +51,7 @@ class Share: self._storage_index = verifycap.storage_index self._si_prefix = base32.b2a(verifycap.storage_index)[:8] self._shnum = shnum + self._dyhb_rtt = dyhb_rtt # self._alive becomes False upon fatal corruption or server error self._alive = True self._lp = log.msg(format="%(share)s created", share=repr(self), @@ -278,15 +279,16 @@ class Share: if not self._satisfy_UEB(): # can't check any hashes without the UEB return False + # the call to _satisfy_UEB() will immediately set the + # authoritative num_segments in all our CommonShares. If we + # guessed wrong, we might stil be working on a bogus segnum + # (beyond the real range). We catch this and signal BADSEGNUM + # before invoking any further code that touches hashtrees. self.actual_segment_size = self._node.segment_size # might be updated assert self.actual_segment_size is not None - # knowing the UEB means knowing num_segments. Despite the redundancy, - # this is the best place to set this. CommonShare.set_numsegs will - # ignore duplicate calls. + # knowing the UEB means knowing num_segments assert self._node.num_segments is not None - cs = self._commonshare - cs.set_numsegs(self._node.num_segments) segnum, observers = self._active_segnum_and_observers() # if segnum is None, we don't really need to do anything (we have no @@ -304,9 +306,9 @@ class Share: # can't check block_hash_tree without a root return False - if cs.need_block_hash_root(): + if self._commonshare.need_block_hash_root(): block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) - cs.set_block_hash_root(block_hash_root) + self._commonshare.set_block_hash_root(block_hash_root) if segnum is None: return False # we don't want any particular segment right now @@ -360,7 +362,8 @@ class Share: ] ): offsets[field] = fields[i] self.actual_offsets = offsets - log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) + log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields), + level=log.NOISY, parent=self._lp, umid="jedQcw") self._received.remove(0, 4) # don't need this anymore # validate the offsets a bit @@ -517,7 +520,8 @@ class Share: block = self._received.pop(blockstart, blocklen) if not block: log.msg("no data for block %s (want [%d:+%d])" % (repr(self), - blockstart, blocklen)) + blockstart, blocklen), + level=log.NOISY, parent=self._lp, umid="aK0RFw") return False log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]", share=repr(self), start=blockstart, length=blocklen, @@ -589,29 +593,17 @@ class Share: if self.actual_offsets or self._overrun_ok: if not self._node.have_UEB: self._desire_UEB(desire, o) - # They might ask for a segment that doesn't look right. - # _satisfy() will catch+reject bad segnums once we know the UEB - # (and therefore segsize and numsegs), so we'll only fail this - # test if we're still guessing. We want to avoid asking the - # hashtrees for needed_hashes() for bad segnums. So don't enter - # _desire_hashes or _desire_data unless the segnum looks - # reasonable. - if segnum < r["num_segments"]: - # XXX somehow we're getting here for sh5. we don't yet know - # the actual_segment_size, we're still working off the guess. - # the ciphertext_hash_tree has been corrected, but the - # commonshare._block_hash_tree is still in the guessed state. - self._desire_share_hashes(desire, o) - if segnum is not None: - self._desire_block_hashes(desire, o, segnum) - self._desire_data(desire, o, r, segnum, segsize) - else: - log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)" - % (segnum, r["num_segments"]), - level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + self._desire_share_hashes(desire, o) + if segnum is not None: + # They might be asking for a segment number that is beyond + # what we guess the file contains, but _desire_block_hashes + # and _desire_data will tolerate that. + self._desire_block_hashes(desire, o, segnum) + self._desire_data(desire, o, r, segnum, segsize) log.msg("end _desire: want_it=%s need_it=%s gotta=%s" - % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump())) + % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()), + level=log.NOISY, parent=self._lp, umid="IG7CgA") if self.actual_offsets: return (want_it, need_it+gotta_gotta_have_it) else: @@ -681,14 +673,30 @@ class Share: (want_it, need_it, gotta_gotta_have_it) = desire # block hash chain - for hashnum in self._commonshare.get_needed_block_hashes(segnum): + for hashnum in self._commonshare.get_desired_block_hashes(segnum): need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) # ciphertext hash chain - for hashnum in self._node.get_needed_ciphertext_hashes(segnum): + for hashnum in self._node.get_desired_ciphertext_hashes(segnum): need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) def _desire_data(self, desire, o, r, segnum, segsize): + if segnum > r["num_segments"]: + # they're asking for a segment that's beyond what we think is the + # end of the file. We won't get here if we've already learned the + # real UEB: _get_satisfaction() will notice the out-of-bounds and + # terminate the loop. So we must still be guessing, which means + # that they might be correct in asking for such a large segnum. + # But if they're right, then our segsize/segnum guess is + # certainly wrong, which means we don't know what data blocks to + # ask for yet. So don't bother adding anything. When the UEB + # comes back and we learn the correct segsize/segnums, we'll + # either reject the request or have enough information to proceed + # normally. This costs one roundtrip. + log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)" + % (segnum, r["num_segments"]), + level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + return (want_it, need_it, gotta_gotta_have_it) = desire tail = (segnum == r["num_segments"]-1) datastart = o["data"] @@ -803,34 +811,62 @@ class Share: class CommonShare: + # TODO: defer creation of the hashtree until somebody uses us. There will + # be a lot of unused shares, and we shouldn't spend the memory on a large + # hashtree unless necessary. """I hold data that is common across all instances of a single share, like sh2 on both servers A and B. This is just the block hash tree. """ - def __init__(self, guessed_numsegs, si_prefix, shnum, logparent): + def __init__(self, best_numsegs, si_prefix, shnum, logparent): self.si_prefix = si_prefix self.shnum = shnum + # in the beginning, before we have the real UEB, we can only guess at # the number of segments. But we want to ask for block hashes early. # So if we're asked for which block hashes are needed before we know # numsegs for sure, we return a guess. - self._block_hash_tree = IncompleteHashTree(guessed_numsegs) - self._know_numsegs = False + self._block_hash_tree = IncompleteHashTree(best_numsegs) + self._block_hash_tree_is_authoritative = False + self._block_hash_tree_leaves = best_numsegs self._logparent = logparent - def set_numsegs(self, numsegs): - if self._know_numsegs: - return - self._block_hash_tree = IncompleteHashTree(numsegs) - self._know_numsegs = True + def __repr__(self): + return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum) + + def set_authoritative_num_segments(self, numsegs): + if self._block_hash_tree_leaves != numsegs: + self._block_hash_tree = IncompleteHashTree(numsegs) + self._block_hash_tree_leaves = numsegs + self._block_hash_tree_is_authoritative = True def need_block_hash_root(self): return bool(not self._block_hash_tree[0]) def set_block_hash_root(self, roothash): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative self._block_hash_tree.set_hashes({0: roothash}) + def get_desired_block_hashes(self, segnum): + if segnum < self._block_hash_tree_leaves: + return self._block_hash_tree.needed_hashes(segnum, + include_leaf=True) + + # the segnum might be out-of-bounds. Originally it was due to a race + # between the receipt of the UEB on one share (from which we learn + # the correct number of segments, update all hash trees to the right + # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery + # of a new Share to the SegmentFetcher while that BADSEGNUM was + # queued (which sends out requests to the stale segnum, now larger + # than the hash tree). I fixed that (by making SegmentFetcher.loop + # check for a bad segnum at the start of each pass, instead of using + # the queued BADSEGNUM or a flag it sets), but just in case this + # still happens, I'm leaving the < in place. If it gets hit, there's + # a potential lost-progress problem, but I'm pretty sure that it will + # get cleared up on the following turn. + return [] + def get_needed_block_hashes(self, segnum): + assert self._block_hash_tree_is_authoritative # XXX: include_leaf=True needs thought: how did the old downloader do # it? I think it grabbed *all* block hashes and set them all at once. # Since we want to fetch less data, we either need to fetch the leaf @@ -840,12 +876,25 @@ class CommonShare: return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) def process_block_hashes(self, block_hashes): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(block_hashes) def check_block(self, segnum, block): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative h = hashutil.block_hash(block) # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(leaves={segnum: h}) + +# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an +# auxilliary OVERDUE callback. Just make sure to get all the messages in the +# right order and on the right turns. + +# TODO: we're asking for too much data. We probably don't need +# include_leaf=True in the block hash tree or ciphertext hash tree. + +# TODO: we ask for ciphertext hash tree nodes from all shares (whenever +# _desire is called while we're missing those nodes), but we only consume it +# from the first response, leaving the rest of the data sitting in _received. +# This was ameliorated by clearing self._received after each block is +# complete. diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index db5bf5f..2453126 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -2303,8 +2303,8 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): # the download is abandoned as soon as it's clear that we won't get # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. - in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" - in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" + in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 71a556b..40f0d62 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -15,8 +15,9 @@ from allmydata.test.no_network import GridTestMixin from allmydata.test.common import ShouldFailMixin from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.immutable.downloader.common import BadSegmentNumberError, \ - BadCiphertextHashError, DownloadStopped + BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD from allmydata.immutable.downloader.status import DownloadStatus +from allmydata.immutable.downloader.fetcher import SegmentFetcher from allmydata.codec import CRSDecoder from foolscap.eventual import fireEventually, flushEventualQueue @@ -295,7 +296,7 @@ class DownloadTest(_Base, unittest.TestCase): # shares servers = [] shares = sorted([s._shnum for s in self.n._cnode._node._shares]) - self.failUnlessEqual(shares, [0,1,2]) + self.failUnlessEqual(shares, [0,1,2,3]) # break the RIBucketReader references for s in self.n._cnode._node._shares: s._rref.broken = True @@ -318,7 +319,7 @@ class DownloadTest(_Base, unittest.TestCase): self.failUnlessEqual("".join(c.chunks), plaintext) shares = sorted([s._shnum for s in self.n._cnode._node._shares]) # we should now be using more shares than we were before - self.failIfEqual(shares, [0,1,2]) + self.failIfEqual(shares, [0,1,2,3]) d.addCallback(_check_failover) return d @@ -539,7 +540,7 @@ class DownloadTest(_Base, unittest.TestCase): def _con1_should_not_succeed(res): self.fail("the first read should not have succeeded") def _con1_failed(f): - self.failUnless(f.check(NotEnoughSharesError)) + self.failUnless(f.check(NoSharesError)) con2.producer.stopProducing() return d2 d.addCallbacks(_con1_should_not_succeed, _con1_failed) @@ -583,7 +584,7 @@ class DownloadTest(_Base, unittest.TestCase): def _con1_should_not_succeed(res): self.fail("the first read should not have succeeded") def _con1_failed(f): - self.failUnless(f.check(NotEnoughSharesError)) + self.failUnless(f.check(NoSharesError)) # we *don't* cancel the second one here: this exercises a # lost-progress bug from #1154. We just wait for it to # succeed. @@ -1121,7 +1122,7 @@ class Corruption(_Base, unittest.TestCase): # All these tests result in a failed download. d.addCallback(self._corrupt_flip_all, imm_uri, i) d.addCallback(lambda ign: - self.shouldFail(NotEnoughSharesError, which, + self.shouldFail(NoSharesError, which, substring, _download, imm_uri)) d.addCallback(lambda ign: self.restore_all_shares(self.shares)) @@ -1257,3 +1258,332 @@ class Status(unittest.TestCase): e2.update(1000, 2.0, 2.0) e2.finished(now+5) self.failUnlessEqual(ds.get_progress(), 1.0) + +class MyShare: + def __init__(self, shnum, peerid, rtt): + self._shnum = shnum + self._peerid = peerid + self._peerid_s = peerid + self._dyhb_rtt = rtt + def __repr__(self): + return "sh%d-on-%s" % (self._shnum, self._peerid) + +class MySegmentFetcher(SegmentFetcher): + def __init__(self, *args, **kwargs): + SegmentFetcher.__init__(self, *args, **kwargs) + self._test_start_shares = [] + def _start_share(self, share, shnum): + self._test_start_shares.append(share) + +class FakeNode: + def __init__(self): + self.want_more = 0 + self.failed = None + self.processed = None + self._si_prefix = "si_prefix" + def want_more_shares(self): + self.want_more += 1 + def fetch_failed(self, fetcher, f): + self.failed = f + def process_blocks(self, segnum, blocks): + self.processed = (segnum, blocks) + def get_num_segments(self): + return 1, True + +class Selection(unittest.TestCase): + def test_no_shares(self): + node = FakeNode() + sf = SegmentFetcher(node, 0, 3, None) + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NoSharesError)) + d.addCallback(_check2) + return d + + def test_only_one_share(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(0, "peer-A", 0.0)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", + str(node.failed)) + d.addCallback(_check2) + return d + + def test_good_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_good_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_avoid_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-B", 1.0), + MyShare(4, "peer-C", 2.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[3], shares[4]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 3: "block-3", + 4: "block-4"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(node.want_more, 3) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 2) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_overdue(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {3: "block-3", + 4: "block-4", + 5: "block-5"}) ) + d.addCallback(_check3) + return d + + def test_overdue_fails(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] + sf.add_shares(shares) + sf.no_more_shares() + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + # we're still waiting + self.failUnlessEqual(node.processed, None) + self.failUnlessEqual(node.failed, None) + # now complete one of the overdue ones, and kill one of the other + # ones, leaving one hanging. This should trigger a failure, since + # we cannot succeed. + live = sf._test_start_shares[0] + die = sf._test_start_shares[1] + sf._block_request_activity(live, live._shnum, COMPLETE, "block") + sf._block_request_activity(die, die._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", + str(node.failed)) + d.addCallback(_check4) + return d + + def test_avoid_redundancy(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-B", 1.0), + MyShare(0, "peer-C", 2.0), # this will be skipped + MyShare(1, "peer-D", 3.0), + MyShare(2, "peer-E", 4.0), + ] + sf.add_shares(shares[:3]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1]]) + # allow sh1 to retire + sf._block_request_activity(shares[1], 1, COMPLETE, "block-1") + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + # and then feed in the remaining shares + sf.add_shares(shares[3:]) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[4]]) + sf._block_request_activity(shares[0], 0, COMPLETE, "block-0") + sf._block_request_activity(shares[4], 2, COMPLETE, "block-2") + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check4) + return d diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index 288332d..511a865 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -52,7 +52,7 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 36, + self.failIf(after_download_reads-before_download_reads > 41, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 3008046..f68e98d 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -4259,15 +4259,20 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi def _check_one_share(body): self.failIf("" in body, body) body = " ".join(body.strip().split()) - msg = ("NotEnoughSharesError: This indicates that some " - "servers were unavailable, or that shares have been " - "lost to server departure, hard drive failure, or disk " - "corruption. You should perform a filecheck on " - "this object to learn more. The full error message is:" - " ran out of shares: %d complete, %d pending, 0 overdue," - " 0 unused, need 3. Last failure: None") - msg1 = msg % (1, 0) - msg2 = msg % (0, 1) + msgbase = ("NotEnoughSharesError: This indicates that some " + "servers were unavailable, or that shares have been " + "lost to server departure, hard drive failure, or disk " + "corruption. You should perform a filecheck on " + "this object to learn more. The full error message is:" + ) + msg1 = msgbase + (" ran out of shares:" + " complete=sh0" + " pending=" + " overdue= unused= need 3. Last failure: None") + msg2 = msgbase + (" ran out of shares:" + " complete=" + " pending=Share(sh0-on-xgru5)" + " overdue= unused= need 3. Last failure: None") self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share)