diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py index e30ced8..83c9beb 100644 --- a/src/allmydata/immutable/downloader/fetcher.py +++ b/src/allmydata/immutable/downloader/fetcher.py @@ -4,8 +4,8 @@ from foolscap.api import eventually from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.util import log from allmydata.util.dictutil import DictOfSets -from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \ - BADSEGNUM, BadSegmentNumberError +from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \ + BadSegmentNumberError class SegmentFetcher: """I am responsible for acquiring blocks for a single segment. I will use @@ -26,22 +26,26 @@ class SegmentFetcher: self._node = node # _Node self.segnum = segnum self._k = k - self._shares = {} # maps non-dead Share instance to a state, one of - # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). - # State transition map is: - # AVAILABLE -(send-read)-> PENDING - # PENDING -(timer)-> OVERDUE - # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # If a share becomes DEAD, it is removed from the - # dict. If it becomes BADSEGNUM, the whole fetch is - # terminated. + self._shares = [] # unused Share instances, sorted by "goodness" + # (RTT), then shnum. This is populated when DYHB + # responses arrive, or (for later segments) at + # startup. We remove shares from it when we call + # sh.get_block() on them. + self._shares_from_server = DictOfSets() # maps serverid to set of + # Shares on that server for + # which we have outstanding + # get_block() calls. + self._max_shares_per_server = 1 # how many Shares we're allowed to + # pull from each server. This starts + # at 1 and grows if we don't have + # sufficient diversity. + self._active_share_map = {} # maps shnum to outstanding (and not + # OVERDUE) Share that provides it. + self._overdue_share_map = DictOfSets() # shares in the OVERDUE state self._share_observers = {} # maps Share to EventStreamObserver for # active ones - self._shnums = DictOfSets() # maps shnum to the shares that provide it self._blocks = {} # maps shnum to validated block data self._no_more_shares = False - self._bad_segnum = False self._last_failure = None self._running = True @@ -50,7 +54,9 @@ class SegmentFetcher: level=log.NOISY, umid="LWyqpg") self._cancel_all_requests() self._running = False - self._shares.clear() # let GC work # ??? XXX + # help GC ??? XXX + del self._shares, self._shares_from_server, self._active_share_map + del self._share_observers # called by our parent _Node @@ -59,9 +65,8 @@ class SegmentFetcher: # called when ShareFinder locates a new share, and when a non-initial # segment fetch is started and we already know about shares from the # previous segment - for s in shares: - self._shares[s] = AVAILABLE - self._shnums.add(s._shnum, s) + self._shares.extend(shares) + self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) ) eventually(self.loop) def no_more_shares(self): @@ -71,15 +76,6 @@ class SegmentFetcher: # internal methods - def _count_shnums(self, *states): - """shnums for which at least one state is in the following list""" - shnums = [] - for shnum,shares in self._shnums.iteritems(): - matches = [s for s in shares if self._shares.get(s) in states] - if matches: - shnums.append(shnum) - return len(shnums) - def loop(self): try: # if any exception occurs here, kill the download @@ -92,7 +88,8 @@ class SegmentFetcher: k = self._k if not self._running: return - if self._bad_segnum: + numsegs, authoritative = self._node.get_num_segments() + if authoritative and self.segnum >= numsegs: # oops, we were asking for a segment number beyond the end of the # file. This is an error. self.stop() @@ -102,98 +99,121 @@ class SegmentFetcher: self._node.fetch_failed(self, f) return + #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares + # Should we sent out more requests? + while len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + ) < k: + # we don't have data or active requests for enough shares. Are + # there any unused shares we can start using? + (sent_something, want_diversity) = self._find_and_use_share() + if sent_something: + # great. loop back around in case we need to send more. + continue + if want_diversity: + # we could have sent something if we'd been allowed to pull + # more shares per server. Increase the limit and try again. + self._max_shares_per_server += 1 + log.msg("SegmentFetcher(%s) increasing diversity limit to %d" + % (self._node._si_prefix, self._max_shares_per_server), + level=log.NOISY, umid="xY2pBA") + # Also ask for more shares, in the hopes of achieving better + # diversity for the next segment. + self._ask_for_more_shares() + continue + # we need more shares than the ones in self._shares to make + # progress + self._ask_for_more_shares() + if self._no_more_shares: + # But there are no more shares to be had. If we're going to + # succeed, it will be with the shares we've already seen. + # Will they be enough? + if len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + | set(self._overdue_share_map.keys()) + ) < k: + # nope. bail. + self._no_shares_error() # this calls self.stop() + return + # our outstanding or overdue requests may yet work. + # more shares may be coming. Wait until then. + return + # are we done? - if self._count_shnums(COMPLETE) >= k: + if len(set(self._blocks.keys())) >= k: # yay! self.stop() self._node.process_blocks(self.segnum, self._blocks) return - # we may have exhausted everything - if (self._no_more_shares and - self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): - # no more new shares are coming, and the remaining hopeful shares - # aren't going to be enough. boo! - - log.msg("share states: %r" % (self._shares,), - level=log.NOISY, umid="0ThykQ") - if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: - format = ("no shares (need %(k)d)." - " Last failure: %(last_failure)s") - args = { "k": k, - "last_failure": self._last_failure } - error = NoSharesError - else: - format = ("ran out of shares: %(complete)d complete," - " %(pending)d pending, %(overdue)d overdue," - " %(unused)d unused, need %(k)d." - " Last failure: %(last_failure)s") - args = {"complete": self._count_shnums(COMPLETE), - "pending": self._count_shnums(PENDING), - "overdue": self._count_shnums(OVERDUE), - # 'unused' should be zero - "unused": self._count_shnums(AVAILABLE), - "k": k, - "last_failure": self._last_failure, - } - error = NotEnoughSharesError - log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) - e = error(format % args) - f = Failure(e) - self.stop() - self._node.fetch_failed(self, f) - return + def _no_shares_error(self): + if not (self._shares or self._active_share_map or + self._overdue_share_map or self._blocks): + format = ("no shares (need %(k)d)." + " Last failure: %(last_failure)s") + args = { "k": self._k, + "last_failure": self._last_failure } + error = NoSharesError + else: + format = ("ran out of shares: complete=%(complete)s" + " pending=%(pending)s overdue=%(overdue)s" + " unused=%(unused)s need %(k)d." + " Last failure: %(last_failure)s") + def join(shnums): return ",".join(["sh%d" % shnum + for shnum in sorted(shnums)]) + pending_s = ",".join([str(sh) + for sh in self._active_share_map.values()]) + overdue = set() + for shares in self._overdue_share_map.values(): + overdue |= shares + overdue_s = ",".join([str(sh) for sh in overdue]) + args = {"complete": join(self._blocks.keys()), + "pending": pending_s, + "overdue": overdue_s, + # 'unused' should be zero + "unused": ",".join([str(sh) for sh in self._shares]), + "k": self._k, + "last_failure": self._last_failure, + } + error = NotEnoughSharesError + log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) + e = error(format % args) + f = Failure(e) + self.stop() + self._node.fetch_failed(self, f) - # nope, not done. Are we "block-hungry" (i.e. do we want to send out - # more read requests, or do we think we have enough in flight - # already?) - while self._count_shnums(PENDING, COMPLETE) < k: - # we're hungry.. are there any unused shares? - sent = self._send_new_request() - if not sent: - break - - # ok, now are we "share-hungry" (i.e. do we have enough known shares - # to make us happy, or should we ask the ShareFinder to get us more?) - if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: - # we're hungry for more shares - self._node.want_more_shares() - # that will trigger the ShareFinder to keep looking - - def _find_one(self, shares, state): - # TODO could choose fastest, or avoid servers already in use - for s in shares: - if self._shares[s] == state: - return s - # can never get here, caller has assert in case of code bug - - def _send_new_request(self): - # TODO: this is probably O(k^2), and we're called from a range(k) - # loop, so O(k^3) - - # this first loop prefers sh0, then sh1, sh2, etc - for shnum,shares in sorted(self._shnums.iteritems()): - states = [self._shares[s] for s in shares] - if COMPLETE in states or PENDING in states: - # don't send redundant requests + def _find_and_use_share(self): + sent_something = False + want_diversity = False + for sh in self._shares: # find one good share to fetch + shnum = sh._shnum ; serverid = sh._peerid + if shnum in self._blocks: + continue # don't request data we already have + if shnum in self._active_share_map: + continue # don't send redundant requests + sfs = self._shares_from_server + if len(sfs.get(serverid,set())) >= self._max_shares_per_server: + # don't pull too much from a single server + want_diversity = True continue - if AVAILABLE not in states: - # no candidates for this shnum, move on - continue - # here's a candidate. Send a request. - s = self._find_one(shares, AVAILABLE) - assert s - self._shares[s] = PENDING - self._share_observers[s] = o = s.get_block(self.segnum) - o.subscribe(self._block_request_activity, share=s, shnum=shnum) - # TODO: build up a list of candidates, then walk through the - # list, sending requests to the most desireable servers, - # re-checking our block-hunger each time. For non-initial segment - # fetches, this would let us stick with faster servers. - return True - # nothing was sent: don't call us again until you have more shares to - # work with, or one of the existing shares has been declared OVERDUE - return False + # ok, we can use this share + self._shares.remove(sh) + self._active_share_map[shnum] = sh + self._shares_from_server.add(serverid, sh) + self._start_share(sh, shnum) + sent_something = True + break + return (sent_something, want_diversity) + + def _start_share(self, share, shnum): + self._share_observers[share] = o = share.get_block(self.segnum) + o.subscribe(self._block_request_activity, share=share, shnum=shnum) + + def _ask_for_more_shares(self): + if not self._no_more_shares: + self._node.want_more_shares() + # that will trigger the ShareFinder to keep looking, and call our + # add_shares() or no_more_shares() later. def _cancel_all_requests(self): for o in self._share_observers.values(): @@ -208,26 +228,32 @@ class SegmentFetcher: " Share(sh%d-on-%s) -> %s" % (self._node._si_prefix, shnum, share._peerid_s, state), level=log.NOISY, umid="vilNWA") - # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. + # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share + # from all our tracking lists. if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): self._share_observers.pop(share, None) + self._shares_from_server.discard(shnum, share) + if self._active_share_map.get(shnum) is share: + del self._active_share_map[shnum] + self._overdue_share_map.discard(shnum, share) + if state is COMPLETE: - # 'block' is fully validated - self._shares[share] = COMPLETE + # 'block' is fully validated and complete self._blocks[shnum] = block - elif state is OVERDUE: - self._shares[share] = OVERDUE + + if state is OVERDUE: + # no longer active, but still might complete + del self._active_share_map[shnum] + self._overdue_share_map.add(shnum, share) # OVERDUE is not terminal: it will eventually transition to # COMPLETE, CORRUPT, or DEAD. - elif state is CORRUPT: - self._shares[share] = CORRUPT - elif state is DEAD: - del self._shares[share] - self._shnums[shnum].remove(share) - self._last_failure = f - elif state is BADSEGNUM: - self._shares[share] = BADSEGNUM # ??? - self._bad_segnum = True - eventually(self.loop) + if state is DEAD: + self._last_failure = f + if state is BADSEGNUM: + # our main loop will ask the DownloadNode each time for the + # number of segments, so we'll deal with this in the top of + # _do_loop + pass + eventually(self.loop) diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 9adee99..f2843b1 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -35,11 +35,13 @@ class ShareFinder: self._storage_broker = storage_broker self.share_consumer = self.node = node self.max_outstanding_requests = max_outstanding_requests - self._hungry = False self._commonshares = {} # shnum to CommonShare instance self.undelivered_shares = [] + # TODO: change this to deliver all shares as soon as possible. And + # figure out how self.hungry() should interact with the + # SegmentFetcher caller. self.pending_requests = set() self.overdue_requests = set() # subset of pending_requests self.overdue_timers = {} @@ -52,6 +54,12 @@ class ShareFinder: si=self._si_prefix, level=log.NOISY, parent=logparent, umid="2xjj2A") + def update_num_segments(self): + (numsegs, authoritative) = self.node.get_num_segments() + assert authoritative + for cs in self._commonshares.values(): + cs.set_authoritative_num_segments(numsegs) + def start_finding_servers(self): # don't get servers until somebody uses us: creating the # ImmutableFileNode should not cause work to happen yet. Test case is @@ -146,14 +154,16 @@ class ShareFinder: lp = self.log(format="sending DYHB to [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="Io7pyg") - d_ev = self._download_status.add_dyhb_sent(peerid, now()) + time_sent = now() + d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) d = rref.callRemote("get_buckets", self._storage_index) d.addBoth(incidentally, self._request_retired, req) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(rref.version, peerid, req, d_ev, lp), + callbackArgs=(rref.version, peerid, req, d_ev, + time_sent, lp), errbackArgs=(peerid, req, d_ev, lp)) d.addErrback(log.err, format="error in send_request", level=log.WEIRD, parent=lp, umid="rpdV0w") @@ -172,9 +182,12 @@ class ShareFinder: self.overdue_requests.add(req) eventually(self.loop) - def _got_response(self, buckets, server_version, peerid, req, d_ev, lp): + def _got_response(self, buckets, server_version, peerid, req, d_ev, + time_sent, lp): shnums = sorted([shnum for shnum in buckets]) - d_ev.finished(shnums, now()) + time_received = now() + d_ev.finished(shnums, time_received) + dyhb_rtt = time_received - time_sent if buckets: shnums_s = ",".join([str(shnum) for shnum in shnums]) self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", @@ -184,21 +197,18 @@ class ShareFinder: self.log(format="no shares from [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, parent=lp, umid="U7d4JA") - if self.node.num_segments is None: - best_numsegs = self.node.guessed_num_segments - else: - best_numsegs = self.node.num_segments for shnum, bucket in buckets.iteritems(): - self._create_share(best_numsegs, shnum, bucket, server_version, - peerid) + self._create_share(shnum, bucket, server_version, peerid, dyhb_rtt) - def _create_share(self, best_numsegs, shnum, bucket, server_version, - peerid): + def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): if shnum in self._commonshares: cs = self._commonshares[shnum] else: - cs = CommonShare(best_numsegs, self._si_prefix, shnum, + numsegs, authoritative = self.node.get_num_segments() + cs = CommonShare(numsegs, self._si_prefix, shnum, self._node_logparent) + if authoritative: + cs.set_authoritative_num_segments(numsegs) # Share._get_satisfaction is responsible for updating # CommonShare.set_numsegs after we know the UEB. Alternatives: # 1: d = self.node.get_num_segments() @@ -214,7 +224,7 @@ class ShareFinder: # Yuck. self._commonshares[shnum] = cs s = Share(bucket, server_version, self.verifycap, cs, self.node, - self._download_status, peerid, shnum, + self._download_status, peerid, shnum, dyhb_rtt, self._node_logparent) self.undelivered_shares.append(s) diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 4c92dd8..292e767 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -103,6 +103,7 @@ class DownloadNode: # as with CommonShare, our ciphertext_hash_tree is a stub until we # get the real num_segments self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) + self.ciphertext_hash_tree_leaves = self.guessed_num_segments def __repr__(self): return "Imm_Node(%s)" % (self._si_prefix,) @@ -240,6 +241,11 @@ class DownloadNode: # _parse_and_store_UEB, and we should abandon the download. self.have_UEB = True + # inform the ShareFinder about our correct number of segments. This + # will update the block-hash-trees in all existing CommonShare + # instances, and will populate new ones with the correct value. + self._sharefinder.update_num_segments() + def _parse_and_store_UEB(self, d): # Note: the UEB contains needed_shares and total_shares. These are # redundant and inferior (the filecap contains the authoritative @@ -292,6 +298,7 @@ class DownloadNode: # shares of file B. self.ciphertext_hash_tree was a guess before: # this is where we create it for real. self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) + self.ciphertext_hash_tree_leaves = self.num_segments self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) self.share_hash_tree.set_hashes({0: d['share_root_hash']}) @@ -344,9 +351,15 @@ class DownloadNode: % (hashnum, len(self.share_hash_tree))) self.share_hash_tree.set_hashes(share_hashes) + def get_desired_ciphertext_hashes(self, segnum): + if segnum < self.ciphertext_hash_tree_leaves: + return self.ciphertext_hash_tree.needed_hashes(segnum, + include_leaf=True) + return [] def get_needed_ciphertext_hashes(self, segnum): cht = self.ciphertext_hash_tree return cht.needed_hashes(segnum, include_leaf=True) + def process_ciphertext_hashes(self, hashes): assert self.num_segments is not None # this may raise BadHashError or NotEnoughHashesError @@ -473,3 +486,11 @@ class DownloadNode: self._active_segment.stop() self._active_segment = None self._start_new_segment() + + # called by ShareFinder to choose hashtree sizes in CommonShares, and by + # SegmentFetcher to tell if it is still fetching a valid segnum. + def get_num_segments(self): + # returns (best_num_segments, authoritative) + if self.num_segments is None: + return (self.guessed_num_segments, False) + return (self.num_segments, True) diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index f7ed4e8..0034845 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -33,7 +33,7 @@ class Share: # servers. A different backend would use a different class. def __init__(self, rref, server_version, verifycap, commonshare, node, - download_status, peerid, shnum, logparent): + download_status, peerid, shnum, dyhb_rtt, logparent): self._rref = rref self._server_version = server_version self._node = node # holds share_hash_tree and UEB @@ -51,6 +51,7 @@ class Share: self._storage_index = verifycap.storage_index self._si_prefix = base32.b2a(verifycap.storage_index)[:8] self._shnum = shnum + self._dyhb_rtt = dyhb_rtt # self._alive becomes False upon fatal corruption or server error self._alive = True self._lp = log.msg(format="%(share)s created", share=repr(self), @@ -278,15 +279,16 @@ class Share: if not self._satisfy_UEB(): # can't check any hashes without the UEB return False + # the call to _satisfy_UEB() will immediately set the + # authoritative num_segments in all our CommonShares. If we + # guessed wrong, we might stil be working on a bogus segnum + # (beyond the real range). We catch this and signal BADSEGNUM + # before invoking any further code that touches hashtrees. self.actual_segment_size = self._node.segment_size # might be updated assert self.actual_segment_size is not None - # knowing the UEB means knowing num_segments. Despite the redundancy, - # this is the best place to set this. CommonShare.set_numsegs will - # ignore duplicate calls. + # knowing the UEB means knowing num_segments assert self._node.num_segments is not None - cs = self._commonshare - cs.set_numsegs(self._node.num_segments) segnum, observers = self._active_segnum_and_observers() # if segnum is None, we don't really need to do anything (we have no @@ -304,9 +306,9 @@ class Share: # can't check block_hash_tree without a root return False - if cs.need_block_hash_root(): + if self._commonshare.need_block_hash_root(): block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) - cs.set_block_hash_root(block_hash_root) + self._commonshare.set_block_hash_root(block_hash_root) if segnum is None: return False # we don't want any particular segment right now @@ -531,6 +533,9 @@ class Share: for o in observers: # goes to SegmentFetcher._block_request_activity o.notify(state=COMPLETE, block=block) + # now clear our received data, to dodge the #1170 spans.py + # complexity bug + self._received = DataSpans() except (BadHashError, NotEnoughHashesError), e: # rats, we have a corrupt block. Notify our clients that they # need to look elsewhere, and advise the server. Unlike @@ -586,26 +591,13 @@ class Share: if self.actual_offsets or self._overrun_ok: if not self._node.have_UEB: self._desire_UEB(desire, o) - # They might ask for a segment that doesn't look right. - # _satisfy() will catch+reject bad segnums once we know the UEB - # (and therefore segsize and numsegs), so we'll only fail this - # test if we're still guessing. We want to avoid asking the - # hashtrees for needed_hashes() for bad segnums. So don't enter - # _desire_hashes or _desire_data unless the segnum looks - # reasonable. - if segnum < r["num_segments"]: - # XXX somehow we're getting here for sh5. we don't yet know - # the actual_segment_size, we're still working off the guess. - # the ciphertext_hash_tree has been corrected, but the - # commonshare._block_hash_tree is still in the guessed state. - self._desire_share_hashes(desire, o) - if segnum is not None: - self._desire_block_hashes(desire, o, segnum) - self._desire_data(desire, o, r, segnum, segsize) - else: - log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)" - % (segnum, r["num_segments"]), - level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + self._desire_share_hashes(desire, o) + if segnum is not None: + # They might be asking for a segment number that is beyond + # what we guess the file contains, but _desire_block_hashes + # and _desire_data will tolerate that. + self._desire_block_hashes(desire, o, segnum) + self._desire_data(desire, o, r, segnum, segsize) log.msg("end _desire: want_it=%s need_it=%s gotta=%s" % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump())) @@ -678,14 +670,30 @@ class Share: (want_it, need_it, gotta_gotta_have_it) = desire # block hash chain - for hashnum in self._commonshare.get_needed_block_hashes(segnum): + for hashnum in self._commonshare.get_desired_block_hashes(segnum): need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) # ciphertext hash chain - for hashnum in self._node.get_needed_ciphertext_hashes(segnum): + for hashnum in self._node.get_desired_ciphertext_hashes(segnum): need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) def _desire_data(self, desire, o, r, segnum, segsize): + if segnum > r["num_segments"]: + # they're asking for a segment that's beyond what we think is the + # end of the file. We won't get here if we've already learned the + # real UEB: _get_satisfaction() will notice the out-of-bounds and + # terminate the loop. So we must still be guessing, which means + # that they might be correct in asking for such a large segnum. + # But if they're right, then our segsize/segnum guess is + # certainly wrong, which means we don't know what data blocks to + # ask for yet. So don't bother adding anything. When the UEB + # comes back and we learn the correct segsize/segnums, we'll + # either reject the request or have enough information to proceed + # normally. This costs one roundtrip. + log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)" + % (segnum, r["num_segments"]), + level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + return (want_it, need_it, gotta_gotta_have_it) = desire tail = (segnum == r["num_segments"]-1) datastart = o["data"] @@ -800,34 +808,62 @@ class Share: class CommonShare: + # TODO: defer creation of the hashtree until somebody uses us. There will + # be a lot of unused shares, and we shouldn't spend the memory on a large + # hashtree unless necessary. """I hold data that is common across all instances of a single share, like sh2 on both servers A and B. This is just the block hash tree. """ - def __init__(self, guessed_numsegs, si_prefix, shnum, logparent): + def __init__(self, best_numsegs, si_prefix, shnum, logparent): self.si_prefix = si_prefix self.shnum = shnum + # in the beginning, before we have the real UEB, we can only guess at # the number of segments. But we want to ask for block hashes early. # So if we're asked for which block hashes are needed before we know # numsegs for sure, we return a guess. - self._block_hash_tree = IncompleteHashTree(guessed_numsegs) - self._know_numsegs = False + self._block_hash_tree = IncompleteHashTree(best_numsegs) + self._block_hash_tree_is_authoritative = False + self._block_hash_tree_leaves = best_numsegs self._logparent = logparent - def set_numsegs(self, numsegs): - if self._know_numsegs: - return - self._block_hash_tree = IncompleteHashTree(numsegs) - self._know_numsegs = True + def __repr__(self): + return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum) + + def set_authoritative_num_segments(self, numsegs): + if self._block_hash_tree_leaves != numsegs: + self._block_hash_tree = IncompleteHashTree(numsegs) + self._block_hash_tree_leaves = numsegs + self._block_hash_tree_is_authoritative = True def need_block_hash_root(self): return bool(not self._block_hash_tree[0]) def set_block_hash_root(self, roothash): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative self._block_hash_tree.set_hashes({0: roothash}) + def get_desired_block_hashes(self, segnum): + if segnum < self._block_hash_tree_leaves: + return self._block_hash_tree.needed_hashes(segnum, + include_leaf=True) + + # the segnum might be out-of-bounds. Originally it was due to a race + # between the receipt of the UEB on one share (from which we learn + # the correct number of segments, update all hash trees to the right + # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery + # of a new Share to the SegmentFetcher while that BADSEGNUM was + # queued (which sends out requests to the stale segnum, now larger + # than the hash tree). I fixed that (by making SegmentFetcher.loop + # check for a bad segnum at the start of each pass, instead of using + # the queued BADSEGNUM or a flag it sets), but just in case this + # still happens, I'm leaving the < in place. If it gets hit, there's + # a potential lost-progress problem, but I'm pretty sure that it will + # get cleared up on the following turn. + return [] + def get_needed_block_hashes(self, segnum): + assert self._block_hash_tree_is_authoritative # XXX: include_leaf=True needs thought: how did the old downloader do # it? I think it grabbed *all* block hashes and set them all at once. # Since we want to fetch less data, we either need to fetch the leaf @@ -837,12 +873,23 @@ class CommonShare: return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) def process_block_hashes(self, block_hashes): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(block_hashes) def check_block(self, segnum, block): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative h = hashutil.block_hash(block) # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(leaves={segnum: h}) + +# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an +# auxilliary OVERDUE callback. Just make sure to get all the messages in the +# right order and on the right turns. + +# TODO: we're asking for too much data. We probably don't need +# include_leaf=True in the block hash tree or ciphertext hash tree. + +# TODO: we ask for ciphertext hash tree nodes from all shares (whenever +# _desire is called while we're missing those nodes), but we only consume it +# from the first response, leaving the rest of the data sitting in _received. diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index db5bf5f..2453126 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -2303,8 +2303,8 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): # the download is abandoned as soon as it's clear that we won't get # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. - in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" - in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" + in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 71a556b..80ce713 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -15,8 +15,9 @@ from allmydata.test.no_network import GridTestMixin from allmydata.test.common import ShouldFailMixin from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.immutable.downloader.common import BadSegmentNumberError, \ - BadCiphertextHashError, DownloadStopped + BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD from allmydata.immutable.downloader.status import DownloadStatus +from allmydata.immutable.downloader.fetcher import SegmentFetcher from allmydata.codec import CRSDecoder from foolscap.eventual import fireEventually, flushEventualQueue @@ -1257,3 +1258,332 @@ class Status(unittest.TestCase): e2.update(1000, 2.0, 2.0) e2.finished(now+5) self.failUnlessEqual(ds.get_progress(), 1.0) + +class MyShare: + def __init__(self, shnum, peerid, rtt): + self._shnum = shnum + self._peerid = peerid + self._peerid_s = peerid + self._dyhb_rtt = rtt + def __repr__(self): + return "sh%d-on-%s" % (self._shnum, self._peerid) + +class MySegmentFetcher(SegmentFetcher): + def __init__(self, *args, **kwargs): + SegmentFetcher.__init__(self, *args, **kwargs) + self._test_start_shares = [] + def _start_share(self, share, shnum): + self._test_start_shares.append(share) + +class FakeNode: + def __init__(self): + self.want_more = 0 + self.failed = None + self.processed = None + self._si_prefix = "si_prefix" + def want_more_shares(self): + self.want_more += 1 + def fetch_failed(self, fetcher, f): + self.failed = f + def process_blocks(self, segnum, blocks): + self.processed = (segnum, blocks) + def get_num_segments(self): + return 1, True + +class Selection(unittest.TestCase): + def test_no_shares(self): + node = FakeNode() + sf = SegmentFetcher(node, 0, 3) + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NoSharesError)) + d.addCallback(_check2) + return d + + def test_only_one_share(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + shares = [MyShare(0, "peer-A", 0.0)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", + str(node.failed)) + d.addCallback(_check2) + return d + + def test_good_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_good_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_avoid_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-B", 1.0), + MyShare(4, "peer-C", 2.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[3], shares[4]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 3: "block-3", + 4: "block-4"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(node.want_more, 3) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 2) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_overdue(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {3: "block-3", + 4: "block-4", + 5: "block-5"}) ) + d.addCallback(_check3) + return d + + def test_overdue_fails(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] + sf.add_shares(shares) + sf.no_more_shares() + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + # we're still waiting + self.failUnlessEqual(node.processed, None) + self.failUnlessEqual(node.failed, None) + # now complete one of the overdue ones, and kill one of the other + # ones, leaving one hanging. This should trigger a failure, since + # we cannot succeed. + live = sf._test_start_shares[0] + die = sf._test_start_shares[1] + sf._block_request_activity(live, live._shnum, COMPLETE, "block") + sf._block_request_activity(die, die._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", + str(node.failed)) + d.addCallback(_check4) + return d + + def test_avoid_redundancy(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-B", 1.0), + MyShare(0, "peer-C", 2.0), # this will be skipped + MyShare(1, "peer-D", 3.0), + MyShare(2, "peer-E", 4.0), + ] + sf.add_shares(shares[:3]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1]]) + # allow sh1 to retire + sf._block_request_activity(shares[1], 1, COMPLETE, "block-1") + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + # and then feed in the remaining shares + sf.add_shares(shares[3:]) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[4]]) + sf._block_request_activity(shares[0], 0, COMPLETE, "block-0") + sf._block_request_activity(shares[4], 2, COMPLETE, "block-2") + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check4) + return d diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index 813c5be..17d8472 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -52,7 +52,7 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 27, + self.failIf(after_download_reads-before_download_reads > 40, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) @@ -70,7 +70,7 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) + self.failIf(after_download_reads-before_download_reads > 37, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 78b9902..c05db3b 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -4239,15 +4239,20 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi def _check_one_share(body): self.failIf("" in body, body) body = " ".join(body.strip().split()) - msg = ("NotEnoughSharesError: This indicates that some " - "servers were unavailable, or that shares have been " - "lost to server departure, hard drive failure, or disk " - "corruption. You should perform a filecheck on " - "this object to learn more. The full error message is:" - " ran out of shares: %d complete, %d pending, 0 overdue," - " 0 unused, need 3. Last failure: None") - msg1 = msg % (1, 0) - msg2 = msg % (0, 1) + msgbase = ("NotEnoughSharesError: This indicates that some " + "servers were unavailable, or that shares have been " + "lost to server departure, hard drive failure, or disk " + "corruption. You should perform a filecheck on " + "this object to learn more. The full error message is:" + ) + msg1 = msgbase + (" ran out of shares:" + " complete=sh0" + " pending=" + " overdue= unused= need 3. Last failure: None") + msg2 = msgbase + (" ran out of shares:" + " complete=" + " pending=Share(sh0-on-xgru5)" + " overdue= unused= need 3. Last failure: None") self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share)