Tue Aug 31 18:35:58 PDT 2010 "Brian Warner " * Share: drop received data after each block finishes. Quick fix for the #1170 spans.py complexity bug. Tue Aug 31 18:37:02 PDT 2010 "Brian Warner " * SegmentFetcher: use new diversity-seeking share-selection algorithm, and deliver all shares at once instead of feeding them out one-at-a-time. Also fix distribution of real-number-of-segments information: now all CommonShares (not just the ones used for the first segment) get a correctly-sized hashtree. Previously, the late ones might not, which would make them crash and get dropped (causing the download to fail if the initial set were insufficient, perhaps because one of their servers went away). Update tests, add some TODO notes, improve variable names and comments. Improve logging: add logparents, set more appropriate levels. Tue Aug 31 20:48:17 PDT 2010 "Brian Warner " * Add Protovis.js-based download-status timeline visualization. Still kinda rough, but illuminating. Also add dl-status test for /download-%s/event_json, remove /download-%s?t=json New patches: [Share: drop received data after each block finishes. Quick fix for the #1170 spans.py complexity bug. "Brian Warner "**20100901013558] { hunk ./src/allmydata/immutable/downloader/share.py 534 for o in observers: # goes to SegmentFetcher._block_request_activity o.notify(state=COMPLETE, block=block) + # now clear our received data, to dodge the #1170 spans.py + # complexity bug + self._received = DataSpans() except (BadHashError, NotEnoughHashesError), e: # rats, we have a corrupt block. Notify our clients that they # need to look elsewhere, and advise the server. Unlike hunk ./src/allmydata/test/test_immutable.py 55 def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 27, + self.failIf(after_download_reads-before_download_reads > 36, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) hunk ./src/allmydata/test/test_immutable.py 73 def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) + self.failIf(after_download_reads-before_download_reads > 37, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d } [SegmentFetcher: use new diversity-seeking share-selection algorithm, and "Brian Warner "**20100901013702 deliver all shares at once instead of feeding them out one-at-a-time. Also fix distribution of real-number-of-segments information: now all CommonShares (not just the ones used for the first segment) get a correctly-sized hashtree. Previously, the late ones might not, which would make them crash and get dropped (causing the download to fail if the initial set were insufficient, perhaps because one of their servers went away). Update tests, add some TODO notes, improve variable names and comments. Improve logging: add logparents, set more appropriate levels. ] { hunk ./src/allmydata/immutable/downloader/fetcher.py 7 from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.util import log from allmydata.util.dictutil import DictOfSets -from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \ - BADSEGNUM, BadSegmentNumberError +from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \ + BadSegmentNumberError class SegmentFetcher: """I am responsible for acquiring blocks for a single segment. I will use hunk ./src/allmydata/immutable/downloader/fetcher.py 25 will shut down and do no further work. My parent can also call my stop() method to have me shut down early.""" - def __init__(self, node, segnum, k): + def __init__(self, node, segnum, k, logparent): self._node = node # _Node self.segnum = segnum self._k = k hunk ./src/allmydata/immutable/downloader/fetcher.py 29 - self._shares = {} # maps non-dead Share instance to a state, one of - # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). - # State transition map is: - # AVAILABLE -(send-read)-> PENDING - # PENDING -(timer)-> OVERDUE - # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM - # If a share becomes DEAD, it is removed from the - # dict. If it becomes BADSEGNUM, the whole fetch is - # terminated. + self._shares = [] # unused Share instances, sorted by "goodness" + # (RTT), then shnum. This is populated when DYHB + # responses arrive, or (for later segments) at + # startup. We remove shares from it when we call + # sh.get_block() on them. + self._shares_from_server = DictOfSets() # maps serverid to set of + # Shares on that server for + # which we have outstanding + # get_block() calls. + self._max_shares_per_server = 1 # how many Shares we're allowed to + # pull from each server. This starts + # at 1 and grows if we don't have + # sufficient diversity. + self._active_share_map = {} # maps shnum to outstanding (and not + # OVERDUE) Share that provides it. + self._overdue_share_map = DictOfSets() # shares in the OVERDUE state + self._lp = logparent self._share_observers = {} # maps Share to EventStreamObserver for # active ones hunk ./src/allmydata/immutable/downloader/fetcher.py 48 - self._shnums = DictOfSets() # maps shnum to the shares that provide it self._blocks = {} # maps shnum to validated block data self._no_more_shares = False hunk ./src/allmydata/immutable/downloader/fetcher.py 50 - self._bad_segnum = False self._last_failure = None self._running = True hunk ./src/allmydata/immutable/downloader/fetcher.py 55 def stop(self): log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix, - level=log.NOISY, umid="LWyqpg") + level=log.NOISY, parent=self._lp, umid="LWyqpg") self._cancel_all_requests() self._running = False hunk ./src/allmydata/immutable/downloader/fetcher.py 58 - self._shares.clear() # let GC work # ??? XXX + # help GC ??? XXX + del self._shares, self._shares_from_server, self._active_share_map + del self._share_observers # called by our parent _Node hunk ./src/allmydata/immutable/downloader/fetcher.py 69 # called when ShareFinder locates a new share, and when a non-initial # segment fetch is started and we already know about shares from the # previous segment - for s in shares: - self._shares[s] = AVAILABLE - self._shnums.add(s._shnum, s) + self._shares.extend(shares) + self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) ) eventually(self.loop) def no_more_shares(self): hunk ./src/allmydata/immutable/downloader/fetcher.py 80 # internal methods - def _count_shnums(self, *states): - """shnums for which at least one state is in the following list""" - shnums = [] - for shnum,shares in self._shnums.iteritems(): - matches = [s for s in shares if self._shares.get(s) in states] - if matches: - shnums.append(shnum) - return len(shnums) - def loop(self): try: # if any exception occurs here, kill the download hunk ./src/allmydata/immutable/downloader/fetcher.py 92 k = self._k if not self._running: return - if self._bad_segnum: + numsegs, authoritative = self._node.get_num_segments() + if authoritative and self.segnum >= numsegs: # oops, we were asking for a segment number beyond the end of the # file. This is an error. self.stop() hunk ./src/allmydata/immutable/downloader/fetcher.py 103 self._node.fetch_failed(self, f) return + #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares + # Should we sent out more requests? + while len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + ) < k: + # we don't have data or active requests for enough shares. Are + # there any unused shares we can start using? + (sent_something, want_more_diversity) = self._find_and_use_share() + if sent_something: + # great. loop back around in case we need to send more. + continue + if want_more_diversity: + # we could have sent something if we'd been allowed to pull + # more shares per server. Increase the limit and try again. + self._max_shares_per_server += 1 + log.msg("SegmentFetcher(%s) increasing diversity limit to %d" + % (self._node._si_prefix, self._max_shares_per_server), + level=log.NOISY, umid="xY2pBA") + # Also ask for more shares, in the hopes of achieving better + # diversity for the next segment. + self._ask_for_more_shares() + continue + # we need more shares than the ones in self._shares to make + # progress + self._ask_for_more_shares() + if self._no_more_shares: + # But there are no more shares to be had. If we're going to + # succeed, it will be with the shares we've already seen. + # Will they be enough? + if len(set(self._blocks.keys()) + | set(self._active_share_map.keys()) + | set(self._overdue_share_map.keys()) + ) < k: + # nope. bail. + self._no_shares_error() # this calls self.stop() + return + # our outstanding or overdue requests may yet work. + # more shares may be coming. Wait until then. + return + # are we done? hunk ./src/allmydata/immutable/downloader/fetcher.py 144 - if self._count_shnums(COMPLETE) >= k: + if len(set(self._blocks.keys())) >= k: # yay! self.stop() self._node.process_blocks(self.segnum, self._blocks) hunk ./src/allmydata/immutable/downloader/fetcher.py 150 return - # we may have exhausted everything - if (self._no_more_shares and - self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): - # no more new shares are coming, and the remaining hopeful shares - # aren't going to be enough. boo! + def _no_shares_error(self): + if not (self._shares or self._active_share_map or + self._overdue_share_map or self._blocks): + format = ("no shares (need %(k)d)." + " Last failure: %(last_failure)s") + args = { "k": self._k, + "last_failure": self._last_failure } + error = NoSharesError + else: + format = ("ran out of shares: complete=%(complete)s" + " pending=%(pending)s overdue=%(overdue)s" + " unused=%(unused)s need %(k)d." + " Last failure: %(last_failure)s") + def join(shnums): return ",".join(["sh%d" % shnum + for shnum in sorted(shnums)]) + pending_s = ",".join([str(sh) + for sh in self._active_share_map.values()]) + overdue = set() + for shares in self._overdue_share_map.values(): + overdue |= shares + overdue_s = ",".join([str(sh) for sh in overdue]) + args = {"complete": join(self._blocks.keys()), + "pending": pending_s, + "overdue": overdue_s, + # 'unused' should be zero + "unused": ",".join([str(sh) for sh in self._shares]), + "k": self._k, + "last_failure": self._last_failure, + } + error = NotEnoughSharesError + log.msg(format=format, + level=log.UNUSUAL, parent=self._lp, umid="1DsnTg", + **args) + e = error(format % args) + f = Failure(e) + self.stop() + self._node.fetch_failed(self, f) hunk ./src/allmydata/immutable/downloader/fetcher.py 188 - log.msg("share states: %r" % (self._shares,), - level=log.NOISY, umid="0ThykQ") - if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: - format = ("no shares (need %(k)d)." - " Last failure: %(last_failure)s") - args = { "k": k, - "last_failure": self._last_failure } - error = NoSharesError - else: - format = ("ran out of shares: %(complete)d complete," - " %(pending)d pending, %(overdue)d overdue," - " %(unused)d unused, need %(k)d." - " Last failure: %(last_failure)s") - args = {"complete": self._count_shnums(COMPLETE), - "pending": self._count_shnums(PENDING), - "overdue": self._count_shnums(OVERDUE), - # 'unused' should be zero - "unused": self._count_shnums(AVAILABLE), - "k": k, - "last_failure": self._last_failure, - } - error = NotEnoughSharesError - log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) - e = error(format % args) - f = Failure(e) - self.stop() - self._node.fetch_failed(self, f) - return + def _find_and_use_share(self): + sent_something = False + want_more_diversity = False + for sh in self._shares: # find one good share to fetch + shnum = sh._shnum ; serverid = sh._peerid + if shnum in self._blocks: + continue # don't request data we already have + if shnum in self._active_share_map: + # note: OVERDUE shares are removed from _active_share_map + # and added to _overdue_share_map instead. + continue # don't send redundant requests + sfs = self._shares_from_server + if len(sfs.get(serverid,set())) >= self._max_shares_per_server: + # don't pull too much from a single server + want_more_diversity = True + continue + # ok, we can use this share + self._shares.remove(sh) + self._active_share_map[shnum] = sh + self._shares_from_server.add(serverid, sh) + self._start_share(sh, shnum) + sent_something = True + break + return (sent_something, want_more_diversity) hunk ./src/allmydata/immutable/downloader/fetcher.py 213 - # nope, not done. Are we "block-hungry" (i.e. do we want to send out - # more read requests, or do we think we have enough in flight - # already?) - while self._count_shnums(PENDING, COMPLETE) < k: - # we're hungry.. are there any unused shares? - sent = self._send_new_request() - if not sent: - break + def _start_share(self, share, shnum): + self._share_observers[share] = o = share.get_block(self.segnum) + o.subscribe(self._block_request_activity, share=share, shnum=shnum) hunk ./src/allmydata/immutable/downloader/fetcher.py 217 - # ok, now are we "share-hungry" (i.e. do we have enough known shares - # to make us happy, or should we ask the ShareFinder to get us more?) - if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: - # we're hungry for more shares + def _ask_for_more_shares(self): + if not self._no_more_shares: self._node.want_more_shares() hunk ./src/allmydata/immutable/downloader/fetcher.py 220 - # that will trigger the ShareFinder to keep looking - - def _find_one(self, shares, state): - # TODO could choose fastest, or avoid servers already in use - for s in shares: - if self._shares[s] == state: - return s - # can never get here, caller has assert in case of code bug - - def _send_new_request(self): - # TODO: this is probably O(k^2), and we're called from a range(k) - # loop, so O(k^3) - - # this first loop prefers sh0, then sh1, sh2, etc - for shnum,shares in sorted(self._shnums.iteritems()): - states = [self._shares[s] for s in shares] - if COMPLETE in states or PENDING in states: - # don't send redundant requests - continue - if AVAILABLE not in states: - # no candidates for this shnum, move on - continue - # here's a candidate. Send a request. - s = self._find_one(shares, AVAILABLE) - assert s - self._shares[s] = PENDING - self._share_observers[s] = o = s.get_block(self.segnum) - o.subscribe(self._block_request_activity, share=s, shnum=shnum) - # TODO: build up a list of candidates, then walk through the - # list, sending requests to the most desireable servers, - # re-checking our block-hunger each time. For non-initial segment - # fetches, this would let us stick with faster servers. - return True - # nothing was sent: don't call us again until you have more shares to - # work with, or one of the existing shares has been declared OVERDUE - return False + # that will trigger the ShareFinder to keep looking, and call our + # add_shares() or no_more_shares() later. def _cancel_all_requests(self): for o in self._share_observers.values(): hunk ./src/allmydata/immutable/downloader/fetcher.py 235 log.msg("SegmentFetcher(%s)._block_request_activity:" " Share(sh%d-on-%s) -> %s" % (self._node._si_prefix, shnum, share._peerid_s, state), - level=log.NOISY, umid="vilNWA") - # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. + level=log.NOISY, parent=self._lp, umid="vilNWA") + # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share + # from all our tracking lists. if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): self._share_observers.pop(share, None) hunk ./src/allmydata/immutable/downloader/fetcher.py 240 + self._shares_from_server.discard(shnum, share) + if self._active_share_map.get(shnum) is share: + del self._active_share_map[shnum] + self._overdue_share_map.discard(shnum, share) + if state is COMPLETE: hunk ./src/allmydata/immutable/downloader/fetcher.py 246 - # 'block' is fully validated - self._shares[share] = COMPLETE + # 'block' is fully validated and complete self._blocks[shnum] = block hunk ./src/allmydata/immutable/downloader/fetcher.py 248 - elif state is OVERDUE: - self._shares[share] = OVERDUE + + if state is OVERDUE: + # no longer active, but still might complete + del self._active_share_map[shnum] + self._overdue_share_map.add(shnum, share) # OVERDUE is not terminal: it will eventually transition to # COMPLETE, CORRUPT, or DEAD. hunk ./src/allmydata/immutable/downloader/fetcher.py 255 - elif state is CORRUPT: - self._shares[share] = CORRUPT - elif state is DEAD: - del self._shares[share] - self._shnums[shnum].remove(share) - self._last_failure = f - elif state is BADSEGNUM: - self._shares[share] = BADSEGNUM # ??? - self._bad_segnum = True - eventually(self.loop) hunk ./src/allmydata/immutable/downloader/fetcher.py 256 + if state is DEAD: + self._last_failure = f + if state is BADSEGNUM: + # our main loop will ask the DownloadNode each time for the + # number of segments, so we'll deal with this in the top of + # _do_loop + pass hunk ./src/allmydata/immutable/downloader/fetcher.py 264 + eventually(self.loop) hunk ./src/allmydata/immutable/downloader/finder.py 38 self._storage_broker = storage_broker self.share_consumer = self.node = node self.max_outstanding_requests = max_outstanding_requests - self._hungry = False self._commonshares = {} # shnum to CommonShare instance hunk ./src/allmydata/immutable/downloader/finder.py 41 - self.undelivered_shares = [] self.pending_requests = set() self.overdue_requests = set() # subset of pending_requests self.overdue_timers = {} hunk ./src/allmydata/immutable/downloader/finder.py 53 si=self._si_prefix, level=log.NOISY, parent=logparent, umid="2xjj2A") + def update_num_segments(self): + (numsegs, authoritative) = self.node.get_num_segments() + assert authoritative + for cs in self._commonshares.values(): + cs.set_authoritative_num_segments(numsegs) + def start_finding_servers(self): # don't get servers until somebody uses us: creating the # ImmutableFileNode should not cause work to happen yet. Test case is hunk ./src/allmydata/immutable/downloader/finder.py 90 # internal methods def loop(self): - undelivered_s = ",".join(["sh%d@%s" % - (s._shnum, idlib.shortnodeid_b2a(s._peerid)) - for s in self.undelivered_shares]) pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) for rt in self.pending_requests]) # sort? self.log(format="ShareFinder loop: running=%(running)s" hunk ./src/allmydata/immutable/downloader/finder.py 93 - " hungry=%(hungry)s, undelivered=%(undelivered)s," - " pending=%(pending)s", - running=self.running, hungry=self._hungry, - undelivered=undelivered_s, pending=pending_s, + " hungry=%(hungry)s, pending=%(pending)s", + running=self.running, hungry=self._hungry, pending=pending_s, level=log.NOISY, umid="kRtS4Q") if not self.running: return hunk ./src/allmydata/immutable/downloader/finder.py 100 if not self._hungry: return - if self.undelivered_shares: - sh = self.undelivered_shares.pop(0) - # they will call hungry() again if they want more - self._hungry = False - self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)", - shnum=sh._shnum, peerid=sh._peerid_s, - level=log.NOISY, umid="2n1qQw") - eventually(self.share_consumer.got_shares, [sh]) - return non_overdue = self.pending_requests - self.overdue_requests if len(non_overdue) >= self.max_outstanding_requests: hunk ./src/allmydata/immutable/downloader/finder.py 139 lp = self.log(format="sending DYHB to [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="Io7pyg") - d_ev = self._download_status.add_dyhb_sent(peerid, now()) + time_sent = now() + d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) hunk ./src/allmydata/immutable/downloader/finder.py 147 d = rref.callRemote("get_buckets", self._storage_index) d.addBoth(incidentally, self._request_retired, req) d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(rref.version, peerid, req, d_ev, lp), + callbackArgs=(rref.version, peerid, req, d_ev, + time_sent, lp), errbackArgs=(peerid, req, d_ev, lp)) d.addErrback(log.err, format="error in send_request", level=log.WEIRD, parent=lp, umid="rpdV0w") hunk ./src/allmydata/immutable/downloader/finder.py 167 self.overdue_requests.add(req) eventually(self.loop) - def _got_response(self, buckets, server_version, peerid, req, d_ev, lp): + def _got_response(self, buckets, server_version, peerid, req, d_ev, + time_sent, lp): shnums = sorted([shnum for shnum in buckets]) hunk ./src/allmydata/immutable/downloader/finder.py 170 - d_ev.finished(shnums, now()) - if buckets: - shnums_s = ",".join([str(shnum) for shnum in shnums]) - self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", - shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), - level=log.NOISY, parent=lp, umid="0fcEZw") - else: + time_received = now() + d_ev.finished(shnums, time_received) + dyhb_rtt = time_received - time_sent + if not buckets: self.log(format="no shares from [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, parent=lp, umid="U7d4JA") hunk ./src/allmydata/immutable/downloader/finder.py 177 - if self.node.num_segments is None: - best_numsegs = self.node.guessed_num_segments - else: - best_numsegs = self.node.num_segments + return + shnums_s = ",".join([str(shnum) for shnum in shnums]) + self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", + shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="0fcEZw") + shares = [] for shnum, bucket in buckets.iteritems(): hunk ./src/allmydata/immutable/downloader/finder.py 184 - self._create_share(best_numsegs, shnum, bucket, server_version, - peerid) + s = self._create_share(shnum, bucket, server_version, peerid, + dyhb_rtt) + shares.append(s) + self._deliver_shares(shares) hunk ./src/allmydata/immutable/downloader/finder.py 189 - def _create_share(self, best_numsegs, shnum, bucket, server_version, - peerid): + def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): if shnum in self._commonshares: cs = self._commonshares[shnum] else: hunk ./src/allmydata/immutable/downloader/finder.py 193 - cs = CommonShare(best_numsegs, self._si_prefix, shnum, + numsegs, authoritative = self.node.get_num_segments() + cs = CommonShare(numsegs, self._si_prefix, shnum, self._node_logparent) hunk ./src/allmydata/immutable/downloader/finder.py 196 + if authoritative: + cs.set_authoritative_num_segments(numsegs) # Share._get_satisfaction is responsible for updating # CommonShare.set_numsegs after we know the UEB. Alternatives: # 1: d = self.node.get_num_segments() hunk ./src/allmydata/immutable/downloader/finder.py 213 # Yuck. self._commonshares[shnum] = cs s = Share(bucket, server_version, self.verifycap, cs, self.node, - self._download_status, peerid, shnum, + self._download_status, peerid, shnum, dyhb_rtt, self._node_logparent) hunk ./src/allmydata/immutable/downloader/finder.py 215 - self.undelivered_shares.append(s) + return s + + def _deliver_shares(self, shares): + # they will call hungry() again if they want more + self._hungry = False + shares_s = ",".join([str(sh) for sh in shares]) + self.log(format="delivering shares: %s" % shares_s, + level=log.NOISY, umid="2n1qQw") + eventually(self.share_consumer.got_shares, shares) def _got_error(self, f, peerid, req, d_ev, lp): d_ev.finished("error", now()) hunk ./src/allmydata/immutable/downloader/node.py 75 # things to track callers that want data # _segment_requests can have duplicates - self._segment_requests = [] # (segnum, d, cancel_handle) + self._segment_requests = [] # (segnum, d, cancel_handle, logparent) self._active_segment = None # a SegmentFetcher, with .segnum self._segsize_observers = observer.OneShotObserverList() hunk ./src/allmydata/immutable/downloader/node.py 84 # for each read() call. Segmentation and get_segment() messages are # associated with the read() call, everything else is tied to the # _Node's log entry. - lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d," + lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:" + " size=%(size)d," " guessed_segsize=%(guessed_segsize)d," " guessed_numsegs=%(guessed_numsegs)d", si=self._si_prefix, size=verifycap.size, hunk ./src/allmydata/immutable/downloader/node.py 107 # as with CommonShare, our ciphertext_hash_tree is a stub until we # get the real num_segments self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) + self.ciphertext_hash_tree_leaves = self.guessed_num_segments def __repr__(self): hunk ./src/allmydata/immutable/downloader/node.py 110 - return "Imm_Node(%s)" % (self._si_prefix,) + return "ImmutableDownloadNode(%s)" % (self._si_prefix,) def stop(self): # called by the Terminator at shutdown, mostly for tests hunk ./src/allmydata/immutable/downloader/node.py 180 The Deferred can also errback with other fatal problems, such as NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. """ - log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", - si=base32.b2a(self._verifycap.storage_index)[:8], - segnum=segnum, - level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") + lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", + si=base32.b2a(self._verifycap.storage_index)[:8], + segnum=segnum, + level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") self._download_status.add_segment_request(segnum, now()) d = defer.Deferred() c = Cancel(self._cancel_request) hunk ./src/allmydata/immutable/downloader/node.py 187 - self._segment_requests.append( (segnum, d, c) ) + self._segment_requests.append( (segnum, d, c, lp) ) self._start_new_segment() return (d, c) hunk ./src/allmydata/immutable/downloader/node.py 213 if self._active_segment is None and self._segment_requests: segnum = self._segment_requests[0][0] k = self._verifycap.needed_shares + lp = self._segment_requests[0][3] log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", node=repr(self), segnum=segnum, hunk ./src/allmydata/immutable/downloader/node.py 216 - level=log.NOISY, umid="wAlnHQ") - self._active_segment = fetcher = SegmentFetcher(self, segnum, k) + level=log.NOISY, parent=lp, umid="wAlnHQ") + self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) active_shares = [s for s in self._shares if s.is_alive()] fetcher.add_shares(active_shares) # this triggers the loop hunk ./src/allmydata/immutable/downloader/node.py 240 h = hashutil.uri_extension_hash(UEB_s) if h != self._verifycap.uri_extension_hash: raise BadHashError - UEB_dict = uri.unpack_extension(UEB_s) - self._parse_and_store_UEB(UEB_dict) # sets self._stuff + self._parse_and_store_UEB(UEB_s) # sets self._stuff # TODO: a malformed (but authentic) UEB could throw an assertion in # _parse_and_store_UEB, and we should abandon the download. self.have_UEB = True hunk ./src/allmydata/immutable/downloader/node.py 245 - def _parse_and_store_UEB(self, d): + # inform the ShareFinder about our correct number of segments. This + # will update the block-hash-trees in all existing CommonShare + # instances, and will populate new ones with the correct value. + self._sharefinder.update_num_segments() + + def _parse_and_store_UEB(self, UEB_s): # Note: the UEB contains needed_shares and total_shares. These are # redundant and inferior (the filecap contains the authoritative # values). However, because it is possible to encode the same file in hunk ./src/allmydata/immutable/downloader/node.py 262 # therefore, we ignore d['total_shares'] and d['needed_shares']. + d = uri.unpack_extension(UEB_s) + log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", hunk ./src/allmydata/immutable/downloader/node.py 265 - ueb=repr(d), vcap=self._verifycap.to_string(), + ueb=repr(uri.unpack_extension_readable(UEB_s)), + vcap=self._verifycap.to_string(), level=log.NOISY, parent=self._lp, umid="cVqZnA") k, N = self._verifycap.needed_shares, self._verifycap.total_shares hunk ./src/allmydata/immutable/downloader/node.py 305 # shares of file B. self.ciphertext_hash_tree was a guess before: # this is where we create it for real. self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) + self.ciphertext_hash_tree_leaves = self.num_segments self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) self.share_hash_tree.set_hashes({0: d['share_root_hash']}) hunk ./src/allmydata/immutable/downloader/node.py 358 % (hashnum, len(self.share_hash_tree))) self.share_hash_tree.set_hashes(share_hashes) + def get_desired_ciphertext_hashes(self, segnum): + if segnum < self.ciphertext_hash_tree_leaves: + return self.ciphertext_hash_tree.needed_hashes(segnum, + include_leaf=True) + return [] def get_needed_ciphertext_hashes(self, segnum): cht = self.ciphertext_hash_tree return cht.needed_hashes(segnum, include_leaf=True) hunk ./src/allmydata/immutable/downloader/node.py 366 + def process_ciphertext_hashes(self, hashes): assert self.num_segments is not None # this may raise BadHashError or NotEnoughHashesError hunk ./src/allmydata/immutable/downloader/node.py 477 def _extract_requests(self, segnum): """Remove matching requests and return their (d,c) tuples so that the caller can retire them.""" - retire = [(d,c) for (segnum0, d, c) in self._segment_requests + retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests if segnum0 == segnum] self._segment_requests = [t for t in self._segment_requests if t[0] != segnum] hunk ./src/allmydata/immutable/downloader/node.py 486 def _cancel_request(self, c): self._segment_requests = [t for t in self._segment_requests if t[2] != c] - segnums = [segnum for (segnum,d,c) in self._segment_requests] + segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] # self._active_segment might be None in rare circumstances, so make # sure we tolerate it if self._active_segment and self._active_segment.segnum not in segnums: hunk ./src/allmydata/immutable/downloader/node.py 493 self._active_segment.stop() self._active_segment = None self._start_new_segment() + + # called by ShareFinder to choose hashtree sizes in CommonShares, and by + # SegmentFetcher to tell if it is still fetching a valid segnum. + def get_num_segments(self): + # returns (best_num_segments, authoritative) + if self.num_segments is None: + return (self.guessed_num_segments, False) + return (self.num_segments, True) hunk ./src/allmydata/immutable/downloader/share.py 36 # servers. A different backend would use a different class. def __init__(self, rref, server_version, verifycap, commonshare, node, - download_status, peerid, shnum, logparent): + download_status, peerid, shnum, dyhb_rtt, logparent): self._rref = rref self._server_version = server_version self._node = node # holds share_hash_tree and UEB hunk ./src/allmydata/immutable/downloader/share.py 54 self._storage_index = verifycap.storage_index self._si_prefix = base32.b2a(verifycap.storage_index)[:8] self._shnum = shnum + self._dyhb_rtt = dyhb_rtt # self._alive becomes False upon fatal corruption or server error self._alive = True self._lp = log.msg(format="%(share)s created", share=repr(self), hunk ./src/allmydata/immutable/downloader/share.py 282 if not self._satisfy_UEB(): # can't check any hashes without the UEB return False + # the call to _satisfy_UEB() will immediately set the + # authoritative num_segments in all our CommonShares. If we + # guessed wrong, we might stil be working on a bogus segnum + # (beyond the real range). We catch this and signal BADSEGNUM + # before invoking any further code that touches hashtrees. self.actual_segment_size = self._node.segment_size # might be updated assert self.actual_segment_size is not None hunk ./src/allmydata/immutable/downloader/share.py 290 - # knowing the UEB means knowing num_segments. Despite the redundancy, - # this is the best place to set this. CommonShare.set_numsegs will - # ignore duplicate calls. + # knowing the UEB means knowing num_segments assert self._node.num_segments is not None hunk ./src/allmydata/immutable/downloader/share.py 292 - cs = self._commonshare - cs.set_numsegs(self._node.num_segments) segnum, observers = self._active_segnum_and_observers() # if segnum is None, we don't really need to do anything (we have no hunk ./src/allmydata/immutable/downloader/share.py 309 # can't check block_hash_tree without a root return False - if cs.need_block_hash_root(): + if self._commonshare.need_block_hash_root(): block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) hunk ./src/allmydata/immutable/downloader/share.py 311 - cs.set_block_hash_root(block_hash_root) + self._commonshare.set_block_hash_root(block_hash_root) if segnum is None: return False # we don't want any particular segment right now hunk ./src/allmydata/immutable/downloader/share.py 365 ] ): offsets[field] = fields[i] self.actual_offsets = offsets - log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) + log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields), + level=log.NOISY, parent=self._lp, umid="jedQcw") self._received.remove(0, 4) # don't need this anymore # validate the offsets a bit hunk ./src/allmydata/immutable/downloader/share.py 523 block = self._received.pop(blockstart, blocklen) if not block: log.msg("no data for block %s (want [%d:+%d])" % (repr(self), - blockstart, blocklen)) + blockstart, blocklen), + level=log.NOISY, parent=self._lp, umid="aK0RFw") return False log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]", share=repr(self), start=blockstart, length=blocklen, hunk ./src/allmydata/immutable/downloader/share.py 596 if self.actual_offsets or self._overrun_ok: if not self._node.have_UEB: self._desire_UEB(desire, o) - # They might ask for a segment that doesn't look right. - # _satisfy() will catch+reject bad segnums once we know the UEB - # (and therefore segsize and numsegs), so we'll only fail this - # test if we're still guessing. We want to avoid asking the - # hashtrees for needed_hashes() for bad segnums. So don't enter - # _desire_hashes or _desire_data unless the segnum looks - # reasonable. - if segnum < r["num_segments"]: - # XXX somehow we're getting here for sh5. we don't yet know - # the actual_segment_size, we're still working off the guess. - # the ciphertext_hash_tree has been corrected, but the - # commonshare._block_hash_tree is still in the guessed state. - self._desire_share_hashes(desire, o) - if segnum is not None: - self._desire_block_hashes(desire, o, segnum) - self._desire_data(desire, o, r, segnum, segsize) - else: - log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)" - % (segnum, r["num_segments"]), - level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + self._desire_share_hashes(desire, o) + if segnum is not None: + # They might be asking for a segment number that is beyond + # what we guess the file contains, but _desire_block_hashes + # and _desire_data will tolerate that. + self._desire_block_hashes(desire, o, segnum) + self._desire_data(desire, o, r, segnum, segsize) log.msg("end _desire: want_it=%s need_it=%s gotta=%s" hunk ./src/allmydata/immutable/downloader/share.py 605 - % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump())) + % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()), + level=log.NOISY, parent=self._lp, umid="IG7CgA") if self.actual_offsets: return (want_it, need_it+gotta_gotta_have_it) else: hunk ./src/allmydata/immutable/downloader/share.py 676 (want_it, need_it, gotta_gotta_have_it) = desire # block hash chain - for hashnum in self._commonshare.get_needed_block_hashes(segnum): + for hashnum in self._commonshare.get_desired_block_hashes(segnum): need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) # ciphertext hash chain hunk ./src/allmydata/immutable/downloader/share.py 680 - for hashnum in self._node.get_needed_ciphertext_hashes(segnum): + for hashnum in self._node.get_desired_ciphertext_hashes(segnum): need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) def _desire_data(self, desire, o, r, segnum, segsize): hunk ./src/allmydata/immutable/downloader/share.py 684 + if segnum > r["num_segments"]: + # they're asking for a segment that's beyond what we think is the + # end of the file. We won't get here if we've already learned the + # real UEB: _get_satisfaction() will notice the out-of-bounds and + # terminate the loop. So we must still be guessing, which means + # that they might be correct in asking for such a large segnum. + # But if they're right, then our segsize/segnum guess is + # certainly wrong, which means we don't know what data blocks to + # ask for yet. So don't bother adding anything. When the UEB + # comes back and we learn the correct segsize/segnums, we'll + # either reject the request or have enough information to proceed + # normally. This costs one roundtrip. + log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)" + % (segnum, r["num_segments"]), + level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + return (want_it, need_it, gotta_gotta_have_it) = desire tail = (segnum == r["num_segments"]-1) datastart = o["data"] hunk ./src/allmydata/immutable/downloader/share.py 814 class CommonShare: + # TODO: defer creation of the hashtree until somebody uses us. There will + # be a lot of unused shares, and we shouldn't spend the memory on a large + # hashtree unless necessary. """I hold data that is common across all instances of a single share, like sh2 on both servers A and B. This is just the block hash tree. """ hunk ./src/allmydata/immutable/downloader/share.py 820 - def __init__(self, guessed_numsegs, si_prefix, shnum, logparent): + def __init__(self, best_numsegs, si_prefix, shnum, logparent): self.si_prefix = si_prefix self.shnum = shnum hunk ./src/allmydata/immutable/downloader/share.py 823 + # in the beginning, before we have the real UEB, we can only guess at # the number of segments. But we want to ask for block hashes early. # So if we're asked for which block hashes are needed before we know hunk ./src/allmydata/immutable/downloader/share.py 828 # numsegs for sure, we return a guess. - self._block_hash_tree = IncompleteHashTree(guessed_numsegs) - self._know_numsegs = False + self._block_hash_tree = IncompleteHashTree(best_numsegs) + self._block_hash_tree_is_authoritative = False + self._block_hash_tree_leaves = best_numsegs self._logparent = logparent hunk ./src/allmydata/immutable/downloader/share.py 833 - def set_numsegs(self, numsegs): - if self._know_numsegs: - return - self._block_hash_tree = IncompleteHashTree(numsegs) - self._know_numsegs = True + def __repr__(self): + return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum) + + def set_authoritative_num_segments(self, numsegs): + if self._block_hash_tree_leaves != numsegs: + self._block_hash_tree = IncompleteHashTree(numsegs) + self._block_hash_tree_leaves = numsegs + self._block_hash_tree_is_authoritative = True def need_block_hash_root(self): return bool(not self._block_hash_tree[0]) hunk ./src/allmydata/immutable/downloader/share.py 846 def set_block_hash_root(self, roothash): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative self._block_hash_tree.set_hashes({0: roothash}) hunk ./src/allmydata/immutable/downloader/share.py 849 + def get_desired_block_hashes(self, segnum): + if segnum < self._block_hash_tree_leaves: + return self._block_hash_tree.needed_hashes(segnum, + include_leaf=True) + + # the segnum might be out-of-bounds. Originally it was due to a race + # between the receipt of the UEB on one share (from which we learn + # the correct number of segments, update all hash trees to the right + # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery + # of a new Share to the SegmentFetcher while that BADSEGNUM was + # queued (which sends out requests to the stale segnum, now larger + # than the hash tree). I fixed that (by making SegmentFetcher.loop + # check for a bad segnum at the start of each pass, instead of using + # the queued BADSEGNUM or a flag it sets), but just in case this + # still happens, I'm leaving the < in place. If it gets hit, there's + # a potential lost-progress problem, but I'm pretty sure that it will + # get cleared up on the following turn. + return [] + def get_needed_block_hashes(self, segnum): hunk ./src/allmydata/immutable/downloader/share.py 869 + assert self._block_hash_tree_is_authoritative # XXX: include_leaf=True needs thought: how did the old downloader do # it? I think it grabbed *all* block hashes and set them all at once. # Since we want to fetch less data, we either need to fetch the leaf hunk ./src/allmydata/immutable/downloader/share.py 879 return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) def process_block_hashes(self, block_hashes): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(block_hashes) hunk ./src/allmydata/immutable/downloader/share.py 884 def check_block(self, segnum, block): - assert self._know_numsegs + assert self._block_hash_tree_is_authoritative h = hashutil.block_hash(block) # this may raise BadHashError or NotEnoughHashesError self._block_hash_tree.set_hashes(leaves={segnum: h}) hunk ./src/allmydata/immutable/downloader/share.py 888 + +# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an +# auxilliary OVERDUE callback. Just make sure to get all the messages in the +# right order and on the right turns. + +# TODO: we're asking for too much data. We probably don't need +# include_leaf=True in the block hash tree or ciphertext hash tree. + +# TODO: we ask for ciphertext hash tree nodes from all shares (whenever +# _desire is called while we're missing those nodes), but we only consume it +# from the first response, leaving the rest of the data sitting in _received. +# This was ameliorated by clearing self._received after each block is +# complete. hunk ./src/allmydata/test/test_cli.py 2306 # the download is abandoned as soon as it's clear that we won't get # enough shares. The one remaining share might be in either the # COMPLETE or the PENDING state. - in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" - in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" + in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" + in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): hunk ./src/allmydata/test/test_download.py 18 from allmydata.test.common import ShouldFailMixin from allmydata.interfaces import NotEnoughSharesError, NoSharesError from allmydata.immutable.downloader.common import BadSegmentNumberError, \ - BadCiphertextHashError, DownloadStopped + BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD from allmydata.immutable.downloader.status import DownloadStatus hunk ./src/allmydata/test/test_download.py 20 +from allmydata.immutable.downloader.fetcher import SegmentFetcher from allmydata.codec import CRSDecoder from foolscap.eventual import fireEventually, flushEventualQueue hunk ./src/allmydata/test/test_download.py 299 # shares servers = [] shares = sorted([s._shnum for s in self.n._cnode._node._shares]) - self.failUnlessEqual(shares, [0,1,2]) + self.failUnlessEqual(shares, [0,1,2,3]) # break the RIBucketReader references for s in self.n._cnode._node._shares: s._rref.broken = True hunk ./src/allmydata/test/test_download.py 322 self.failUnlessEqual("".join(c.chunks), plaintext) shares = sorted([s._shnum for s in self.n._cnode._node._shares]) # we should now be using more shares than we were before - self.failIfEqual(shares, [0,1,2]) + self.failIfEqual(shares, [0,1,2,3]) d.addCallback(_check_failover) return d hunk ./src/allmydata/test/test_download.py 543 def _con1_should_not_succeed(res): self.fail("the first read should not have succeeded") def _con1_failed(f): - self.failUnless(f.check(NotEnoughSharesError)) + self.failUnless(f.check(NoSharesError)) con2.producer.stopProducing() return d2 d.addCallbacks(_con1_should_not_succeed, _con1_failed) hunk ./src/allmydata/test/test_download.py 587 def _con1_should_not_succeed(res): self.fail("the first read should not have succeeded") def _con1_failed(f): - self.failUnless(f.check(NotEnoughSharesError)) + self.failUnless(f.check(NoSharesError)) # we *don't* cancel the second one here: this exercises a # lost-progress bug from #1154. We just wait for it to # succeed. hunk ./src/allmydata/test/test_download.py 1125 # All these tests result in a failed download. d.addCallback(self._corrupt_flip_all, imm_uri, i) d.addCallback(lambda ign: - self.shouldFail(NotEnoughSharesError, which, + self.shouldFail(NoSharesError, which, substring, _download, imm_uri)) d.addCallback(lambda ign: self.restore_all_shares(self.shares)) hunk ./src/allmydata/test/test_download.py 1261 e2.update(1000, 2.0, 2.0) e2.finished(now+5) self.failUnlessEqual(ds.get_progress(), 1.0) + +class MyShare: + def __init__(self, shnum, peerid, rtt): + self._shnum = shnum + self._peerid = peerid + self._peerid_s = peerid + self._dyhb_rtt = rtt + def __repr__(self): + return "sh%d-on-%s" % (self._shnum, self._peerid) + +class MySegmentFetcher(SegmentFetcher): + def __init__(self, *args, **kwargs): + SegmentFetcher.__init__(self, *args, **kwargs) + self._test_start_shares = [] + def _start_share(self, share, shnum): + self._test_start_shares.append(share) + +class FakeNode: + def __init__(self): + self.want_more = 0 + self.failed = None + self.processed = None + self._si_prefix = "si_prefix" + def want_more_shares(self): + self.want_more += 1 + def fetch_failed(self, fetcher, f): + self.failed = f + def process_blocks(self, segnum, blocks): + self.processed = (segnum, blocks) + def get_num_segments(self): + return 1, True + +class Selection(unittest.TestCase): + def test_no_shares(self): + node = FakeNode() + sf = SegmentFetcher(node, 0, 3, None) + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NoSharesError)) + d.addCallback(_check2) + return d + + def test_only_one_share(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(0, "peer-A", 0.0)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(node.failed, None) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", + str(node.failed)) + d.addCallback(_check2) + return d + + def test_good_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_good_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_avoid_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-B", 1.0), + MyShare(4, "peer-C", 2.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[3], shares[4]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 3: "block-3", + 4: "block-4"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_late(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares([]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + sf.add_shares(shares) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(node.want_more, 3) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check3) + return d + + def test_suffer_bad_diversity_early(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we satisfy the read entirely from the first server because we don't + # have any other choice. + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-A", 0.0), + MyShare(2, "peer-A", 0.0), + MyShare(3, "peer-A", 0.0), + MyShare(4, "peer-A", 0.0), + ] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 2) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[2]]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check2) + return d + + def test_overdue(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] + sf.add_shares(shares) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, COMPLETE, + "block-%d" % sh._shnum) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {3: "block-3", + 4: "block-4", + 5: "block-5"}) ) + d.addCallback(_check3) + return d + + def test_overdue_fails(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] + sf.add_shares(shares) + sf.no_more_shares() + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 0) + self.failUnlessEqual(sf._test_start_shares, shares[:3]) + for sh in sf._test_start_shares: + sf._block_request_activity(sh, sh._shnum, OVERDUE) + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + self.failUnlessEqual(sf._test_start_shares, shares[:6]) + for sh in sf._test_start_shares[3:]: + sf._block_request_activity(sh, sh._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + # we're still waiting + self.failUnlessEqual(node.processed, None) + self.failUnlessEqual(node.failed, None) + # now complete one of the overdue ones, and kill one of the other + # ones, leaving one hanging. This should trigger a failure, since + # we cannot succeed. + live = sf._test_start_shares[0] + die = sf._test_start_shares[1] + sf._block_request_activity(live, live._shnum, COMPLETE, "block") + sf._block_request_activity(die, die._shnum, DEAD) + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failUnless(node.failed) + self.failUnless(node.failed.check(NotEnoughSharesError)) + self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", + str(node.failed)) + d.addCallback(_check4) + return d + + def test_avoid_redundancy(self): + node = FakeNode() + sf = MySegmentFetcher(node, 0, 3, None) + # we could satisfy the read entirely from the first server, but we'd + # prefer not to. Instead, we expect to only pull one share from the + # first server + shares = [MyShare(0, "peer-A", 0.0), + MyShare(1, "peer-B", 1.0), + MyShare(0, "peer-C", 2.0), # this will be skipped + MyShare(1, "peer-D", 3.0), + MyShare(2, "peer-E", 4.0), + ] + sf.add_shares(shares[:3]) + d = flushEventualQueue() + def _check1(ign): + self.failUnlessEqual(node.want_more, 1) + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1]]) + # allow sh1 to retire + sf._block_request_activity(shares[1], 1, COMPLETE, "block-1") + return flushEventualQueue() + d.addCallback(_check1) + def _check2(ign): + # and then feed in the remaining shares + sf.add_shares(shares[3:]) + sf.no_more_shares() + return flushEventualQueue() + d.addCallback(_check2) + def _check3(ign): + self.failUnlessEqual(sf._test_start_shares, + [shares[0], shares[1], shares[4]]) + sf._block_request_activity(shares[0], 0, COMPLETE, "block-0") + sf._block_request_activity(shares[4], 2, COMPLETE, "block-2") + return flushEventualQueue() + d.addCallback(_check3) + def _check4(ign): + self.failIfEqual(node.processed, None) + self.failUnlessEqual(node.processed, (0, {0: "block-0", + 1: "block-1", + 2: "block-2"}) ) + d.addCallback(_check4) + return d hunk ./src/allmydata/test/test_immutable.py 55 def _after_download(unused=None): after_download_reads = self._count_reads() #print before_download_reads, after_download_reads - self.failIf(after_download_reads-before_download_reads > 36, + self.failIf(after_download_reads-before_download_reads > 41, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) hunk ./src/allmydata/test/test_web.py 4262 def _check_one_share(body): self.failIf("" in body, body) body = " ".join(body.strip().split()) - msg = ("NotEnoughSharesError: This indicates that some " - "servers were unavailable, or that shares have been " - "lost to server departure, hard drive failure, or disk " - "corruption. You should perform a filecheck on " - "this object to learn more. The full error message is:" - " ran out of shares: %d complete, %d pending, 0 overdue," - " 0 unused, need 3. Last failure: None") - msg1 = msg % (1, 0) - msg2 = msg % (0, 1) + msgbase = ("NotEnoughSharesError: This indicates that some " + "servers were unavailable, or that shares have been " + "lost to server departure, hard drive failure, or disk " + "corruption. You should perform a filecheck on " + "this object to learn more. The full error message is:" + ) + msg1 = msgbase + (" ran out of shares:" + " complete=sh0" + " pending=" + " overdue= unused= need 3. Last failure: None") + msg2 = msgbase + (" ran out of shares:" + " complete=" + " pending=Share(sh0-on-xgru5)" + " overdue= unused= need 3. Last failure: None") self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share) } [Add Protovis.js-based download-status timeline visualization. Still kinda "Brian Warner "**20100901034817 rough, but illuminating. Also add dl-status test for /download-%s/event_json, remove /download-%s?t=json ] { hunk ./src/allmydata/immutable/downloader/finder.py 140 peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY, umid="Io7pyg") time_sent = now() - d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) + d_ev = self._download_status.add_dyhb_request(peerid, time_sent) # TODO: get the timer from a Server object, it knows best self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, self.overdue, req) hunk ./src/allmydata/immutable/downloader/finder.py 226 eventually(self.share_consumer.got_shares, shares) def _got_error(self, f, peerid, req, d_ev, lp): - d_ev.finished("error", now()) + d_ev.error(now()) self.log(format="got error from [%(peerid)s]", peerid=idlib.shortnodeid_b2a(peerid), failure=f, level=log.UNUSUAL, parent=lp, umid="zUKdCw") hunk ./src/allmydata/immutable/downloader/node.py 75 # things to track callers that want data # _segment_requests can have duplicates - self._segment_requests = [] # (segnum, d, cancel_handle, logparent) + self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp) self._active_segment = None # a SegmentFetcher, with .segnum self._segsize_observers = observer.OneShotObserverList() hunk ./src/allmydata/immutable/downloader/node.py 122 # things called by outside callers, via CiphertextFileNode. get_segment() # may also be called by Segmentation. - def read(self, consumer, offset=0, size=None, read_ev=None): + def read(self, consumer, offset, size, read_ev): """I am the main entry point, from which FileNode.read() can get data. I feed the consumer with the desired range of ciphertext. I return a Deferred that fires (with the consumer) when the read is hunk ./src/allmydata/immutable/downloader/node.py 129 finished. Note that there is no notion of a 'file pointer': each call to read() - uses an independent offset= value.""" - # for concurrent operations: each gets its own Segmentation manager - if size is None: - size = self._verifycap.size - # clip size so offset+size does not go past EOF - size = min(size, self._verifycap.size-offset) - if read_ev is None: - read_ev = self._download_status.add_read_event(offset, size, now()) + uses an independent offset= value. + """ + assert size is not None + assert read_ev is not None lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)", si=base32.b2a(self._verifycap.storage_index)[:8], hunk ./src/allmydata/immutable/downloader/node.py 142 sp = self._history.stats_provider sp.count("downloader.files_downloaded", 1) # really read() calls sp.count("downloader.bytes_downloaded", size) + # for concurrent operations, each read() gets its own Segmentation + # manager s = Segmentation(self, offset, size, consumer, read_ev, lp) hunk ./src/allmydata/immutable/downloader/node.py 145 + # this raises an interesting question: what segments to fetch? if # offset=0, always fetch the first segment, and then allow # Segmentation to be responsible for pulling the subsequent ones if hunk ./src/allmydata/immutable/downloader/node.py 183 si=base32.b2a(self._verifycap.storage_index)[:8], segnum=segnum, level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") - self._download_status.add_segment_request(segnum, now()) + seg_ev = self._download_status.add_segment_request(segnum, now()) d = defer.Deferred() c = Cancel(self._cancel_request) hunk ./src/allmydata/immutable/downloader/node.py 186 - self._segment_requests.append( (segnum, d, c, lp) ) + self._segment_requests.append( (segnum, d, c, seg_ev, lp) ) self._start_new_segment() return (d, c) hunk ./src/allmydata/immutable/downloader/node.py 210 def _start_new_segment(self): if self._active_segment is None and self._segment_requests: - segnum = self._segment_requests[0][0] + (segnum, d, c, seg_ev, lp) = self._segment_requests[0] k = self._verifycap.needed_shares hunk ./src/allmydata/immutable/downloader/node.py 212 - lp = self._segment_requests[0][3] log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", node=repr(self), segnum=segnum, level=log.NOISY, parent=lp, umid="wAlnHQ") hunk ./src/allmydata/immutable/downloader/node.py 216 self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) + seg_ev.activate(now()) active_shares = [s for s in self._shares if s.is_alive()] fetcher.add_shares(active_shares) # this triggers the loop hunk ./src/allmydata/immutable/downloader/node.py 380 def fetch_failed(self, sf, f): assert sf is self._active_segment # deliver error upwards - for (d,c) in self._extract_requests(sf.segnum): + for (d,c,seg_ev) in self._extract_requests(sf.segnum): + seg_ev.error(now()) eventually(self._deliver, d, c, f) self._active_segment = None self._start_new_segment() hunk ./src/allmydata/immutable/downloader/node.py 390 d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) d.addCallback(self._check_ciphertext_hash, segnum) def _deliver(result): - ds = self._download_status - if isinstance(result, Failure): - ds.add_segment_error(segnum, now()) - else: - (offset, segment, decodetime) = result - ds.add_segment_delivery(segnum, now(), - offset, len(segment), decodetime) log.msg(format="delivering segment(%(segnum)d)", segnum=segnum, level=log.OPERATIONAL, parent=self._lp, hunk ./src/allmydata/immutable/downloader/node.py 394 umid="j60Ojg") - for (d,c) in self._extract_requests(segnum): - eventually(self._deliver, d, c, result) + when = now() + if isinstance(result, Failure): + # this catches failures in decode or ciphertext hash + for (d,c,seg_ev) in self._extract_requests(segnum): + seg_ev.error(when) + eventually(self._deliver, d, c, result) + else: + (offset, segment, decodetime) = result + for (d,c,seg_ev) in self._extract_requests(segnum): + # when we have two requests for the same segment, the + # second one will not be "activated" before the data is + # delivered, so to allow the status-reporting code to see + # consistent behavior, we activate them all now. The + # SegmentEvent will ignore duplicate activate() calls. + # Note that this will result in an infinite "receive + # speed" for the second request. + seg_ev.activate(when) + seg_ev.deliver(when, offset, len(segment), decodetime) + eventually(self._deliver, d, c, result) self._active_segment = None self._start_new_segment() d.addBoth(_deliver) hunk ./src/allmydata/immutable/downloader/node.py 416 - d.addErrback(lambda f: - log.err("unhandled error during process_blocks", - failure=f, level=log.WEIRD, - parent=self._lp, umid="MkEsCg")) + d.addErrback(log.err, "unhandled error during process_blocks", + level=log.WEIRD, parent=self._lp, umid="MkEsCg") def _decode_blocks(self, segnum, blocks): tail = (segnum == self.num_segments-1) hunk ./src/allmydata/immutable/downloader/node.py 485 def _extract_requests(self, segnum): """Remove matching requests and return their (d,c) tuples so that the caller can retire them.""" - retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests + retire = [(d,c,seg_ev) + for (segnum0,d,c,seg_ev,lp) in self._segment_requests if segnum0 == segnum] self._segment_requests = [t for t in self._segment_requests if t[0] != segnum] hunk ./src/allmydata/immutable/downloader/node.py 495 def _cancel_request(self, c): self._segment_requests = [t for t in self._segment_requests if t[2] != c] - segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] + segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests] # self._active_segment might be None in rare circumstances, so make # sure we tolerate it if self._active_segment and self._active_segment.segnum not in segnums: hunk ./src/allmydata/immutable/downloader/segmentation.py 126 # the consumer might call our .pauseProducing() inside that write() # call, setting self._hungry=False self._read_ev.update(len(desired_data), 0, 0) + # note: filenode.DecryptingConsumer is responsible for calling + # _read_ev.update with how much decrypt_time was consumed self._maybe_fetch_next() def _retry_bad_segment(self, f): hunk ./src/allmydata/immutable/downloader/share.py 730 share=repr(self), start=start, length=length, level=log.NOISY, parent=self._lp, umid="sgVAyA") - req_ev = ds.add_request_sent(self._peerid, self._shnum, - start, length, now()) + block_ev = ds.add_block_request(self._peerid, self._shnum, + start, length, now()) d = self._send_request(start, length) hunk ./src/allmydata/immutable/downloader/share.py 733 - d.addCallback(self._got_data, start, length, req_ev, lp) - d.addErrback(self._got_error, start, length, req_ev, lp) + d.addCallback(self._got_data, start, length, block_ev, lp) + d.addErrback(self._got_error, start, length, block_ev, lp) d.addCallback(self._trigger_loop) d.addErrback(lambda f: log.err(format="unhandled error during send_request", hunk ./src/allmydata/immutable/downloader/share.py 744 def _send_request(self, start, length): return self._rref.callRemote("read", start, length) - def _got_data(self, data, start, length, req_ev, lp): - req_ev.finished(len(data), now()) + def _got_data(self, data, start, length, block_ev, lp): + block_ev.finished(len(data), now()) if not self._alive: return log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d", hunk ./src/allmydata/immutable/downloader/share.py 787 # the wanted/needed span is only "wanted" for the first pass. Once # the offset table arrives, it's all "needed". - def _got_error(self, f, start, length, req_ev, lp): - req_ev.finished("error", now()) + def _got_error(self, f, start, length, block_ev, lp): + block_ev.error(now()) log.msg(format="error requesting %(start)d+%(length)d" " from %(server)s for si %(si)s", start=start, length=length, hunk ./src/allmydata/immutable/downloader/status.py 6 from zope.interface import implements from allmydata.interfaces import IDownloadStatus -class RequestEvent: - def __init__(self, download_status, tag): - self._download_status = download_status - self._tag = tag - def finished(self, received, when): - self._download_status.add_request_finished(self._tag, received, when) +class ReadEvent: + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def update(self, bytes, decrypttime, pausetime): + self._ev["bytes_returned"] += bytes + self._ev["decrypt_time"] += decrypttime + self._ev["paused_time"] += pausetime + def finished(self, finishtime): + self._ev["finish_time"] = finishtime + self._ds.update_last_timestamp(finishtime) + +class SegmentEvent: + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def activate(self, when): + if self._ev["active_time"] is None: + self._ev["active_time"] = when + def deliver(self, when, start, length, decodetime): + assert self._ev["active_time"] is not None + self._ev["finish_time"] = when + self._ev["success"] = True + self._ev["decode_time"] = decodetime + self._ev["segment_start"] = start + self._ev["segment_length"] = length + self._ds.update_last_timestamp(when) + def error(self, when): + self._ev["finish_time"] = when + self._ev["success"] = False + self._ds.update_last_timestamp(when) class DYHBEvent: hunk ./src/allmydata/immutable/downloader/status.py 39 - def __init__(self, download_status, tag): - self._download_status = download_status - self._tag = tag + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def error(self, when): + self._ev["finish_time"] = when + self._ev["success"] = False + self._ds.update_last_timestamp(when) def finished(self, shnums, when): hunk ./src/allmydata/immutable/downloader/status.py 47 - self._download_status.add_dyhb_finished(self._tag, shnums, when) + self._ev["finish_time"] = when + self._ev["success"] = True + self._ev["response_shnums"] = shnums + self._ds.update_last_timestamp(when) + +class BlockRequestEvent: + def __init__(self, ev, ds): + self._ev = ev + self._ds = ds + def finished(self, received, when): + self._ev["finish_time"] = when + self._ev["success"] = True + self._ev["response_length"] = received + self._ds.update_last_timestamp(when) + def error(self, when): + self._ev["finish_time"] = when + self._ev["success"] = False + self._ds.update_last_timestamp(when) hunk ./src/allmydata/immutable/downloader/status.py 66 -class ReadEvent: - def __init__(self, download_status, tag): - self._download_status = download_status - self._tag = tag - def update(self, bytes, decrypttime, pausetime): - self._download_status.update_read_event(self._tag, bytes, - decrypttime, pausetime) - def finished(self, finishtime): - self._download_status.finish_read_event(self._tag, finishtime) class DownloadStatus: # There is one DownloadStatus for each CiphertextFileNode. The status hunk ./src/allmydata/immutable/downloader/status.py 78 self.size = size self.counter = self.statusid_counter.next() self.helper = False - self.started = None - # self.dyhb_requests tracks "do you have a share" requests and - # responses. It maps serverid to a tuple of: - # send time - # tuple of response shnums (None if response hasn't arrived, "error") - # response time (None if response hasn't arrived yet) - self.dyhb_requests = {} hunk ./src/allmydata/immutable/downloader/status.py 79 - # self.requests tracks share-data requests and responses. It maps - # serverid to a tuple of: - # shnum, - # start,length, (of data requested) - # send time - # response length (None if reponse hasn't arrived yet, or "error") - # response time (None if response hasn't arrived) - self.requests = {} + self.first_timestamp = None + self.last_timestamp = None hunk ./src/allmydata/immutable/downloader/status.py 82 - # self.segment_events tracks segment requests and delivery. It is a - # list of: - # type ("request", "delivery", "error") - # segment number - # event time - # segment start (file offset of first byte, None except in "delivery") - # segment length (only in "delivery") - # time spent in decode (only in "delivery") - self.segment_events = [] + # all four of these _events lists are sorted by start_time, because + # they are strictly append-only (some elements are later mutated in + # place, but none are removed or inserted in the middle). hunk ./src/allmydata/immutable/downloader/status.py 86 - # self.read_events tracks read() requests. It is a list of: + # self.read_events tracks read() requests. It is a list of dicts, + # each with the following keys: # start,length (of data requested) hunk ./src/allmydata/immutable/downloader/status.py 89 - # request time - # finish time (None until finished) - # bytes returned (starts at 0, grows as segments are delivered) - # time spent in decrypt (None for ciphertext-only reads) - # time spent paused + # start_time + # finish_time (None until finished) + # bytes_returned (starts at 0, grows as segments are delivered) + # decrypt_time (time spent in decrypt, None for ciphertext-only reads) + # paused_time (time spent paused by client via pauseProducing) self.read_events = [] hunk ./src/allmydata/immutable/downloader/status.py 96 - self.known_shares = [] # (serverid, shnum) - self.problems = [] + # self.segment_events tracks segment requests and their resolution. + # It is a list of dicts: + # segment_number + # start_time + # active_time (None until work has begun) + # decode_time (time spent in decode, None until delievered) + # finish_time (None until resolved) + # success (None until resolved, then boolean) + # segment_start (file offset of first byte, None until delivered) + # segment_length (None until delivered) + self.segment_events = [] hunk ./src/allmydata/immutable/downloader/status.py 108 + # self.dyhb_requests tracks "do you have a share" requests and + # responses. It is a list of dicts: + # serverid (binary) + # start_time + # success (None until resolved, then boolean) + # response_shnums (tuple, None until successful) + # finish_time (None until resolved) + self.dyhb_requests = [] hunk ./src/allmydata/immutable/downloader/status.py 117 - def add_dyhb_sent(self, serverid, when): - r = (when, None, None) - if serverid not in self.dyhb_requests: - self.dyhb_requests[serverid] = [] - self.dyhb_requests[serverid].append(r) - tag = (serverid, len(self.dyhb_requests[serverid])-1) - return DYHBEvent(self, tag) + # self.block_requests tracks share-data requests and responses. It is + # a list of dicts: + # serverid (binary), + # shnum, + # start,length, (of data requested) + # start_time + # finish_time (None until resolved) + # success (None until resolved, then bool) + # response_length (None until success) + self.block_requests = [] hunk ./src/allmydata/immutable/downloader/status.py 128 - def add_dyhb_finished(self, tag, shnums, when): - # received="error" on error, else tuple(shnums) - (serverid, index) = tag - r = self.dyhb_requests[serverid][index] - (sent, _, _) = r - r = (sent, shnums, when) - self.dyhb_requests[serverid][index] = r + self.known_shares = [] # (serverid, shnum) + self.problems = [] hunk ./src/allmydata/immutable/downloader/status.py 131 - def add_request_sent(self, serverid, shnum, start, length, when): - r = (shnum, start, length, when, None, None) - if serverid not in self.requests: - self.requests[serverid] = [] - self.requests[serverid].append(r) - tag = (serverid, len(self.requests[serverid])-1) - return RequestEvent(self, tag) hunk ./src/allmydata/immutable/downloader/status.py 132 - def add_request_finished(self, tag, received, when): - # received="error" on error, else len(data) - (serverid, index) = tag - r = self.requests[serverid][index] - (shnum, start, length, sent, _, _) = r - r = (shnum, start, length, sent, received, when) - self.requests[serverid][index] = r + def add_read_event(self, start, length, when): + if self.first_timestamp is None: + self.first_timestamp = when + r = { "start": start, + "length": length, + "start_time": when, + "finish_time": None, + "bytes_returned": 0, + "decrypt_time": 0, + "paused_time": 0, + } + self.read_events.append(r) + return ReadEvent(r, self) def add_segment_request(self, segnum, when): hunk ./src/allmydata/immutable/downloader/status.py 147 - if self.started is None: - self.started = when - r = ("request", segnum, when, None, None, None) - self.segment_events.append(r) - def add_segment_delivery(self, segnum, when, start, length, decodetime): - r = ("delivery", segnum, when, start, length, decodetime) - self.segment_events.append(r) - def add_segment_error(self, segnum, when): - r = ("error", segnum, when, None, None, None) + if self.first_timestamp is None: + self.first_timestamp = when + r = { "segment_number": segnum, + "start_time": when, + "active_time": None, + "finish_time": None, + "success": None, + "decode_time": None, + "segment_start": None, + "segment_length": None, + } self.segment_events.append(r) hunk ./src/allmydata/immutable/downloader/status.py 159 + return SegmentEvent(r, self) hunk ./src/allmydata/immutable/downloader/status.py 161 - def add_read_event(self, start, length, when): - if self.started is None: - self.started = when - r = (start, length, when, None, 0, 0, 0) - self.read_events.append(r) - tag = len(self.read_events)-1 - return ReadEvent(self, tag) - def update_read_event(self, tag, bytes_d, decrypt_d, paused_d): - r = self.read_events[tag] - (start, length, requesttime, finishtime, bytes, decrypt, paused) = r - bytes += bytes_d - decrypt += decrypt_d - paused += paused_d - r = (start, length, requesttime, finishtime, bytes, decrypt, paused) - self.read_events[tag] = r - def finish_read_event(self, tag, finishtime): - r = self.read_events[tag] - (start, length, requesttime, _, bytes, decrypt, paused) = r - r = (start, length, requesttime, finishtime, bytes, decrypt, paused) - self.read_events[tag] = r + def add_dyhb_request(self, serverid, when): + r = { "serverid": serverid, + "start_time": when, + "success": None, + "response_shnums": None, + "finish_time": None, + } + self.dyhb_requests.append(r) + return DYHBEvent(r, self) + + def add_block_request(self, serverid, shnum, start, length, when): + r = { "serverid": serverid, + "shnum": shnum, + "start": start, + "length": length, + "start_time": when, + "finish_time": None, + "success": None, + "response_length": None, + } + self.block_requests.append(r) + return BlockRequestEvent(r, self) + + def update_last_timestamp(self, when): + if self.last_timestamp is None or when > self.last_timestamp: + self.last_timestamp = when def add_known_share(self, serverid, shnum): self.known_shares.append( (serverid, shnum) ) hunk ./src/allmydata/immutable/downloader/status.py 205 # mention all outstanding segment requests outstanding = set() errorful = set() - for s_ev in self.segment_events: - (etype, segnum, when, segstart, seglen, decodetime) = s_ev - if etype == "request": - outstanding.add(segnum) - elif etype == "delivery": - outstanding.remove(segnum) - else: # "error" - outstanding.remove(segnum) - errorful.add(segnum) + outstanding = set([s_ev["segment_number"] + for s_ev in self.segment_events + if s_ev["finish_time"] is None]) + errorful = set([s_ev["segment_number"] + for s_ev in self.segment_events + if s_ev["success"] is False]) def join(segnums): if len(segnums) == 1: return "segment %s" % list(segnums)[0] hunk ./src/allmydata/immutable/downloader/status.py 233 return 0.0 total_outstanding, total_received = 0, 0 for r_ev in self.read_events: - (start, length, ign1, finishtime, bytes, ign2, ign3) = r_ev - if finishtime is None: - total_outstanding += length - total_received += bytes + if r_ev["finish_time"] is None: + total_outstanding += r_ev["length"] + total_received += r_ev["bytes_returned"] # else ignore completed requests if not total_outstanding: return 1.0 hunk ./src/allmydata/immutable/downloader/status.py 246 def get_active(self): return False # TODO def get_started(self): - return self.started + return self.first_timestamp def get_results(self): return None # TODO hunk ./src/allmydata/immutable/filenode.py 58 return a Deferred that fires (with the consumer) when the read is finished.""" self._maybe_create_download_node() - actual_size = size - if actual_size is None: - actual_size = self._verifycap.size - offset - read_ev = self._download_status.add_read_event(offset, actual_size, - now()) + if size is None: + size = self._verifycap.size + # clip size so offset+size does not go past EOF + size = min(size, self._verifycap.size-offset) + read_ev = self._download_status.add_read_event(offset, size, now()) if IDownloadStatusHandlingConsumer.providedBy(consumer): consumer.set_download_status_read_event(read_ev) return self._node.read(consumer, offset, size, read_ev) hunk ./src/allmydata/immutable/filenode.py 180 def __init__(self, consumer, readkey, offset): self._consumer = consumer - self._read_event = None + self._read_ev = None # TODO: pycryptopp CTR-mode needs random-access operations: I want # either a=AES(readkey, offset) or better yet both of: # a=AES(readkey, offset=0) hunk ./src/allmydata/immutable/filenode.py 193 self._decryptor.process("\x00"*offset_small) def set_download_status_read_event(self, read_ev): - self._read_event = read_ev + self._read_ev = read_ev def registerProducer(self, producer, streaming): # this passes through, so the real consumer can flow-control the real hunk ./src/allmydata/immutable/filenode.py 206 def write(self, ciphertext): started = now() plaintext = self._decryptor.process(ciphertext) - if self._read_event: + if self._read_ev: elapsed = now() - started hunk ./src/allmydata/immutable/filenode.py 208 - self._read_event.update(0, elapsed, 0) + self._read_ev.update(0, elapsed, 0) self._consumer.write(plaintext) class ImmutableFileNode: hunk ./src/allmydata/test/test_download.py 1225 now = 12345.1 ds = DownloadStatus("si-1", 123) self.failUnlessEqual(ds.get_status(), "idle") - ds.add_segment_request(0, now) + ev0 = ds.add_segment_request(0, now) self.failUnlessEqual(ds.get_status(), "fetching segment 0") hunk ./src/allmydata/test/test_download.py 1227 - ds.add_segment_delivery(0, now+1, 0, 1000, 2.0) + ev0.activate(now+0.5) + ev0.deliver(now+1, 0, 1000, 2.0) self.failUnlessEqual(ds.get_status(), "idle") hunk ./src/allmydata/test/test_download.py 1230 - ds.add_segment_request(2, now+2) - ds.add_segment_request(1, now+2) + ev2 = ds.add_segment_request(2, now+2) + ev1 = ds.add_segment_request(1, now+2) self.failUnlessEqual(ds.get_status(), "fetching segments 1,2") hunk ./src/allmydata/test/test_download.py 1233 - ds.add_segment_error(1, now+3) + ev1.error(now+3) self.failUnlessEqual(ds.get_status(), "fetching segment 2; errors on segment 1") hunk ./src/allmydata/test/test_download.py 1236 + del ev2 # hush pyflakes def test_progress(self): now = 12345.1 hunk ./src/allmydata/test/test_web.py 22 from allmydata.unknown import UnknownNode from allmydata.web import status, common from allmydata.scripts.debug import CorruptShareOptions, corrupt_share -from allmydata.util import fileutil, base32 +from allmydata.util import fileutil, base32, hashutil from allmydata.util.consumer import download_to_data from allmydata.util.netstring import split_netstring from allmydata.util.encodingutil import to_str hunk ./src/allmydata/test/test_web.py 81 ds = DownloadStatus("storage_index", 1234) now = time.time() - ds.add_segment_request(0, now) - # segnum, when, start,len, decodetime - ds.add_segment_delivery(0, now+1, 0, 100, 0.5) - ds.add_segment_request(1, now+2) - ds.add_segment_error(1, now+3) + serverid_a = hashutil.tagged_hash("foo", "serverid_a")[:20] + serverid_b = hashutil.tagged_hash("foo", "serverid_b")[:20] + storage_index = hashutil.storage_index_hash("SI") + e0 = ds.add_segment_request(0, now) + e0.activate(now+0.5) + e0.deliver(now+1, 0, 100, 0.5) # when, start,len, decodetime + e1 = ds.add_segment_request(1, now+2) + e1.error(now+3) # two outstanding requests hunk ./src/allmydata/test/test_web.py 90 - ds.add_segment_request(2, now+4) - ds.add_segment_request(3, now+5) + e2 = ds.add_segment_request(2, now+4) + e3 = ds.add_segment_request(3, now+5) + del e2,e3 # hush pyflakes hunk ./src/allmydata/test/test_web.py 94 - # simulate a segment which gets delivered faster than a system clock tick (ticket #1166) - ds.add_segment_request(4, now) - ds.add_segment_delivery(4, now, 0, 140, 0.5) + # simulate a segment which gets delivered faster than a system clock tick + # (ticket #1166) + e = ds.add_segment_request(4, now) + e.activate(now) + e.deliver(now, 0, 140, 0.5) hunk ./src/allmydata/test/test_web.py 100 - e = ds.add_dyhb_sent("serverid_a", now) + e = ds.add_dyhb_request(serverid_a, now) e.finished([1,2], now+1) hunk ./src/allmydata/test/test_web.py 102 - e = ds.add_dyhb_sent("serverid_b", now+2) # left unfinished + e = ds.add_dyhb_request(serverid_b, now+2) # left unfinished e = ds.add_read_event(0, 120, now) e.update(60, 0.5, 0.1) # bytes, decrypttime, pausetime hunk ./src/allmydata/test/test_web.py 109 e.finished(now+1) e = ds.add_read_event(120, 30, now+2) # left unfinished - e = ds.add_request_sent("serverid_a", 1, 100, 20, now) + e = ds.add_block_request(serverid_a, 1, 100, 20, now) e.finished(20, now+1) hunk ./src/allmydata/test/test_web.py 111 - e = ds.add_request_sent("serverid_a", 1, 120, 30, now+1) # left unfinished + e = ds.add_block_request(serverid_a, 1, 120, 30, now+1) # left unfinished # make sure that add_read_event() can come first too hunk ./src/allmydata/test/test_web.py 114 - ds1 = DownloadStatus("storage_index", 1234) + ds1 = DownloadStatus(storage_index, 1234) e = ds1.add_read_event(0, 120, now) e.update(60, 0.5, 0.1) # bytes, decrypttime, pausetime e.finished(now+1) hunk ./src/allmydata/test/test_web.py 563 def _check_dl(res): self.failUnless("File Download Status" in res, res) d.addCallback(_check_dl) - d.addCallback(lambda res: self.GET("/status/down-%d?t=json" % dl_num)) + d.addCallback(lambda res: self.GET("/status/down-%d/event_json" % dl_num)) def _check_dl_json(res): data = simplejson.loads(res) self.failUnless(isinstance(data, dict)) hunk ./src/allmydata/test/test_web.py 567 + # this data comes from build_one_ds() above + self.failUnlessEqual(set(data["serverids"].values()), + set(["phwr", "cmpu"])) + self.failUnlessEqual(len(data["segment"]), 5) + self.failUnlessEqual(len(data["read"]), 2) d.addCallback(_check_dl_json) d.addCallback(lambda res: self.GET("/status/up-%d" % ul_num)) def _check_ul(res): addfile ./src/allmydata/web/download-status-timeline.xhtml hunk ./src/allmydata/web/download-status-timeline.xhtml 1 + + + AllMyData - Tahoe - File Download Status Timeline + + + + + + + + + +

File Download Status

+ +
    +
  • Started:
  • +
  • Storage Index:
  • +
  • Helper?:
  • +
  • Total Size:
  • +
  • Progress:
  • +
  • Status:
  • +
+ + +
+
overview
+
Timeline
+
+ +
Return to the Welcome Page
+ + + hunk ./src/allmydata/web/download-status.xhtml 19
  • Total Size:
  • Progress:
  • Status:
  • +
  • addfile ./src/allmydata/web/download_status_timeline.js hunk ./src/allmydata/web/download_status_timeline.js 1 + +$(function() { + + function onDataReceived(data) { + var bounds = { min: data.bounds.min, + max: data.bounds.max + }; + //bounds.max = data.dyhb[data.dyhb.length-1].finish_time; + var duration = bounds.max - bounds.min; + var WIDTH = 600; + var vis = new pv.Panel().canvas("timeline").margin(30); + + var dyhb_top = 0; + var read_top = dyhb_top + 30*data.dyhb[data.dyhb.length-1].row+60; + var segment_top = read_top + 30*data.read[data.read.length-1].row+60; + var block_top = segment_top + 30*data.segment[data.segment.length-1].row+60; + var block_row_to_y = {}; + var row_y=0; + for (var group=0; group < data.block_rownums.length; group++) { + for (var row=0; row < data.block_rownums[group]; row++) { + block_row_to_y[group+"-"+row] = row_y; + row_y += 10; + } + row_y += 5; + } + + var height = block_top + row_y; + var kx = bounds.min; + var ky = 1; + var x = pv.Scale.linear(bounds.min, bounds.max).range(0, WIDTH-40); + var relx = pv.Scale.linear(0, duration).range(0, WIDTH-40); + //var y = pv.Scale.linear(-ky,ky).range(0, height); + //x.nice(); relx.nice(); + + /* add the invisible panel now, at the bottom of the stack, so that + it won't steal mouseover events and prevent tooltips from + working. */ + vis.add(pv.Panel) + .events("all") + .event("mousedown", pv.Behavior.pan()) + .event("mousewheel", pv.Behavior.zoom()) + .event("pan", transform) + .event("zoom", transform) + ; + + vis.anchor("top").top(-20).add(pv.Label).text("DYHB Requests"); + + vis.add(pv.Bar) + .data(data.dyhb) + .height(20) + .top(function (d) {return 30*d.row;}) + .left(function(d){return x(d.start_time);}) + .width(function(d){return x(d.finish_time)-x(d.start_time);}) + .title(function(d){return "shnums: "+d.response_shnums;}) + .fillStyle(function(d){return data.server_info[d.serverid].color;}) + .strokeStyle("black").lineWidth(1); + + vis.add(pv.Rule) + .data(data.dyhb) + .top(function(d){return 30*d.row + 20/2;}) + .left(0).width(0) + .strokeStyle("#888") + .anchor("left").add(pv.Label) + .text(function(d){return d.serverid.slice(0,4);}); + + /* we use a function for data=relx.ticks() here instead of + simply .data(relx.ticks()) so that it will be recalculated when + the scales change (by pan/zoom) */ + var xaxis = vis.add(pv.Rule) + .data(function() {return relx.ticks();}) + .strokeStyle("#ccc") + .left(relx) + .anchor("bottom").add(pv.Label) + .text(function(d){return relx.tickFormat(d)+"s";}); + + var read = vis.add(pv.Panel).top(read_top); + read.anchor("top").top(-20).add(pv.Label).text("read() requests"); + + read.add(pv.Bar) + .data(data.read) + .height(20) + .top(function (d) {return 30*d.row;}) + .left(function(d){return x(d.start_time);}) + .width(function(d){return x(d.finish_time)-x(d.start_time);}) + .title(function(d){return "read(start="+d.start+", len="+d.length+") -> "+d.bytes_returned+" bytes";}) + .fillStyle("red") + .strokeStyle("black").lineWidth(1); + + var segment = vis.add(pv.Panel).top(segment_top); + segment.anchor("top").top(-20).add(pv.Label).text("segment() requests"); + + segment.add(pv.Bar) + .data(data.segment) + .height(20) + .top(function (d) {return 30*d.row;}) + .left(function(d){return x(d.start_time);}) + .width(function(d){return x(d.finish_time)-x(d.start_time);}) + .title(function(d){return "seg"+d.segment_number+" ["+d.segment_start+":+"+d.segment_length+"] (took "+(d.finish_time-d.start_time)+")";}) + .fillStyle(function(d){if (d.success) return "#c0ffc0"; + else return "#ffc0c0";}) + .strokeStyle("black").lineWidth(1); + + var block = vis.add(pv.Panel).top(block_top); + block.anchor("top").top(-20).add(pv.Label).text("block() requests"); + + var shnum_colors = pv.Colors.category10(); + block.add(pv.Bar) + .data(data.block) + .height(10) + .top(function (d) {return block_row_to_y[d.row[0]+"-"+d.row[1]];}) + .left(function(d){return x(d.start_time);}) + .width(function(d){return x(d.finish_time)-x(d.start_time);}) + .title(function(d){return "sh"+d.shnum+"-on-"+d.serverid.slice(0,4)+" ["+d.start+":+"+d.length+"] -> "+d.response_length;}) + .fillStyle(function(d){return data.server_info[d.serverid].color;}) + .strokeStyle(function(d){return shnum_colors(d.shnum).color;}) + .lineWidth(function(d) + {if (d.response_length > 100) return 3; + else return 1; + }) + ; + + + vis.height(height); + + function transform() { + var t0= this.transform(); + var t = this.transform().invert(); + // when t.x=0 and t.k=1.0, left should be bounds.min + x.domain(bounds.min + (t.x/WIDTH)*duration, + bounds.min + t.k*duration + (t.x/WIDTH)*duration); + relx.domain(0 + t.x/WIDTH*duration, + t.k*duration + (t.x/WIDTH)*duration); + vis.render(); + } + + vis.render(); + } + + $.ajax({url: "event_json", + method: 'GET', + dataType: 'json', + success: onDataReceived }); +}); + addfile ./src/allmydata/web/jquery.js hunk ./src/allmydata/web/jquery.js 1 +/*! + * jQuery JavaScript Library v1.3.2 + * http://jquery.com/ + * + * Copyright (c) 2009 John Resig + * Dual licensed under the MIT and GPL licenses. + * http://docs.jquery.com/License + * + * Date: 2009-02-19 17:34:21 -0500 (Thu, 19 Feb 2009) + * Revision: 6246 + */ +(function(){ + +var + // Will speed up references to window, and allows munging its name. + window = this, + // Will speed up references to undefined, and allows munging its name. + undefined, + // Map over jQuery in case of overwrite + _jQuery = window.jQuery, + // Map over the $ in case of overwrite + _$ = window.$, + + jQuery = window.jQuery = window.$ = function( selector, context ) { + // The jQuery object is actually just the init constructor 'enhanced' + return new jQuery.fn.init( selector, context ); + }, + + // A simple way to check for HTML strings or ID strings + // (both of which we optimize for) + quickExpr = /^[^<]*(<(.|\s)+>)[^>]*$|^#([\w-]+)$/, + // Is it a simple selector + isSimple = /^.[^:#\[\.,]*$/; + +jQuery.fn = jQuery.prototype = { + init: function( selector, context ) { + // Make sure that a selection was provided + selector = selector || document; + + // Handle $(DOMElement) + if ( selector.nodeType ) { + this[0] = selector; + this.length = 1; + this.context = selector; + return this; + } + // Handle HTML strings + if ( typeof selector === "string" ) { + // Are we dealing with HTML string or an ID? + var match = quickExpr.exec( selector ); + + // Verify a match, and that no context was specified for #id + if ( match && (match[1] || !context) ) { + + // HANDLE: $(html) -> $(array) + if ( match[1] ) + selector = jQuery.clean( [ match[1] ], context ); + + // HANDLE: $("#id") + else { + var elem = document.getElementById( match[3] ); + + // Handle the case where IE and Opera return items + // by name instead of ID + if ( elem && elem.id != match[3] ) + return jQuery().find( selector ); + + // Otherwise, we inject the element directly into the jQuery object + var ret = jQuery( elem || [] ); + ret.context = document; + ret.selector = selector; + return ret; + } + + // HANDLE: $(expr, [context]) + // (which is just equivalent to: $(content).find(expr) + } else + return jQuery( context ).find( selector ); + + // HANDLE: $(function) + // Shortcut for document ready + } else if ( jQuery.isFunction( selector ) ) + return jQuery( document ).ready( selector ); + + // Make sure that old selector state is passed along + if ( selector.selector && selector.context ) { + this.selector = selector.selector; + this.context = selector.context; + } + + return this.setArray(jQuery.isArray( selector ) ? + selector : + jQuery.makeArray(selector)); + }, + + // Start with an empty selector + selector: "", + + // The current version of jQuery being used + jquery: "1.3.2", + + // The number of elements contained in the matched element set + size: function() { + return this.length; + }, + + // Get the Nth element in the matched element set OR + // Get the whole matched element set as a clean array + get: function( num ) { + return num === undefined ? + + // Return a 'clean' array + Array.prototype.slice.call( this ) : + + // Return just the object + this[ num ]; + }, + + // Take an array of elements and push it onto the stack + // (returning the new matched element set) + pushStack: function( elems, name, selector ) { + // Build a new jQuery matched element set + var ret = jQuery( elems ); + + // Add the old object onto the stack (as a reference) + ret.prevObject = this; + + ret.context = this.context; + + if ( name === "find" ) + ret.selector = this.selector + (this.selector ? " " : "") + selector; + else if ( name ) + ret.selector = this.selector + "." + name + "(" + selector + ")"; + + // Return the newly-formed element set + return ret; + }, + + // Force the current matched set of elements to become + // the specified array of elements (destroying the stack in the process) + // You should use pushStack() in order to do this, but maintain the stack + setArray: function( elems ) { + // Resetting the length to 0, then using the native Array push + // is a super-fast way to populate an object with array-like properties + this.length = 0; + Array.prototype.push.apply( this, elems ); + + return this; + }, + + // Execute a callback for every element in the matched set. + // (You can seed the arguments with an array of args, but this is + // only used internally.) + each: function( callback, args ) { + return jQuery.each( this, callback, args ); + }, + + // Determine the position of an element within + // the matched set of elements + index: function( elem ) { + // Locate the position of the desired element + return jQuery.inArray( + // If it receives a jQuery object, the first element is used + elem && elem.jquery ? elem[0] : elem + , this ); + }, + + attr: function( name, value, type ) { + var options = name; + + // Look for the case where we're accessing a style value + if ( typeof name === "string" ) + if ( value === undefined ) + return this[0] && jQuery[ type || "attr" ]( this[0], name ); + + else { + options = {}; + options[ name ] = value; + } + + // Check to see if we're setting style values + return this.each(function(i){ + // Set all the styles + for ( name in options ) + jQuery.attr( + type ? + this.style : + this, + name, jQuery.prop( this, options[ name ], type, i, name ) + ); + }); + }, + + css: function( key, value ) { + // ignore negative width and height values + if ( (key == 'width' || key == 'height') && parseFloat(value) < 0 ) + value = undefined; + return this.attr( key, value, "curCSS" ); + }, + + text: function( text ) { + if ( typeof text !== "object" && text != null ) + return this.empty().append( (this[0] && this[0].ownerDocument || document).createTextNode( text ) ); + + var ret = ""; + + jQuery.each( text || this, function(){ + jQuery.each( this.childNodes, function(){ + if ( this.nodeType != 8 ) + ret += this.nodeType != 1 ? + this.nodeValue : + jQuery.fn.text( [ this ] ); + }); + }); + + return ret; + }, + + wrapAll: function( html ) { + if ( this[0] ) { + // The elements to wrap the target around + var wrap = jQuery( html, this[0].ownerDocument ).clone(); + + if ( this[0].parentNode ) + wrap.insertBefore( this[0] ); + + wrap.map(function(){ + var elem = this; + + while ( elem.firstChild ) + elem = elem.firstChild; + + return elem; + }).append(this); + } + + return this; + }, + + wrapInner: function( html ) { + return this.each(function(){ + jQuery( this ).contents().wrapAll( html ); + }); + }, + + wrap: function( html ) { + return this.each(function(){ + jQuery( this ).wrapAll( html ); + }); + }, + + append: function() { + return this.domManip(arguments, true, function(elem){ + if (this.nodeType == 1) + this.appendChild( elem ); + }); + }, + + prepend: function() { + return this.domManip(arguments, true, function(elem){ + if (this.nodeType == 1) + this.insertBefore( elem, this.firstChild ); + }); + }, + + before: function() { + return this.domManip(arguments, false, function(elem){ + this.parentNode.insertBefore( elem, this ); + }); + }, + + after: function() { + return this.domManip(arguments, false, function(elem){ + this.parentNode.insertBefore( elem, this.nextSibling ); + }); + }, + + end: function() { + return this.prevObject || jQuery( [] ); + }, + + // For internal use only. + // Behaves like an Array's method, not like a jQuery method. + push: [].push, + sort: [].sort, + splice: [].splice, + + find: function( selector ) { + if ( this.length === 1 ) { + var ret = this.pushStack( [], "find", selector ); + ret.length = 0; + jQuery.find( selector, this[0], ret ); + return ret; + } else { + return this.pushStack( jQuery.unique(jQuery.map(this, function(elem){ + return jQuery.find( selector, elem ); + })), "find", selector ); + } + }, + + clone: function( events ) { + // Do the clone + var ret = this.map(function(){ + if ( !jQuery.support.noCloneEvent && !jQuery.isXMLDoc(this) ) { + // IE copies events bound via attachEvent when + // using cloneNode. Calling detachEvent on the + // clone will also remove the events from the orignal + // In order to get around this, we use innerHTML. + // Unfortunately, this means some modifications to + // attributes in IE that are actually only stored + // as properties will not be copied (such as the + // the name attribute on an input). + var html = this.outerHTML; + if ( !html ) { + var div = this.ownerDocument.createElement("div"); + div.appendChild( this.cloneNode(true) ); + html = div.innerHTML; + } + + return jQuery.clean([html.replace(/ jQuery\d+="(?:\d+|null)"/g, "").replace(/^\s*/, "")])[0]; + } else + return this.cloneNode(true); + }); + + // Copy the events from the original to the clone + if ( events === true ) { + var orig = this.find("*").andSelf(), i = 0; + + ret.find("*").andSelf().each(function(){ + if ( this.nodeName !== orig[i].nodeName ) + return; + + var events = jQuery.data( orig[i], "events" ); + + for ( var type in events ) { + for ( var handler in events[ type ] ) { + jQuery.event.add( this, type, events[ type ][ handler ], events[ type ][ handler ].data ); + } + } + + i++; + }); + } + + // Return the cloned set + return ret; + }, + + filter: function( selector ) { + return this.pushStack( + jQuery.isFunction( selector ) && + jQuery.grep(this, function(elem, i){ + return selector.call( elem, i ); + }) || + + jQuery.multiFilter( selector, jQuery.grep(this, function(elem){ + return elem.nodeType === 1; + }) ), "filter", selector ); + }, + + closest: function( selector ) { + var pos = jQuery.expr.match.POS.test( selector ) ? jQuery(selector) : null, + closer = 0; + + return this.map(function(){ + var cur = this; + while ( cur && cur.ownerDocument ) { + if ( pos ? pos.index(cur) > -1 : jQuery(cur).is(selector) ) { + jQuery.data(cur, "closest", closer); + return cur; + } + cur = cur.parentNode; + closer++; + } + }); + }, + + not: function( selector ) { + if ( typeof selector === "string" ) + // test special case where just one selector is passed in + if ( isSimple.test( selector ) ) + return this.pushStack( jQuery.multiFilter( selector, this, true ), "not", selector ); + else + selector = jQuery.multiFilter( selector, this ); + + var isArrayLike = selector.length && selector[selector.length - 1] !== undefined && !selector.nodeType; + return this.filter(function() { + return isArrayLike ? jQuery.inArray( this, selector ) < 0 : this != selector; + }); + }, + + add: function( selector ) { + return this.pushStack( jQuery.unique( jQuery.merge( + this.get(), + typeof selector === "string" ? + jQuery( selector ) : + jQuery.makeArray( selector ) + ))); + }, + + is: function( selector ) { + return !!selector && jQuery.multiFilter( selector, this ).length > 0; + }, + + hasClass: function( selector ) { + return !!selector && this.is( "." + selector ); + }, + + val: function( value ) { + if ( value === undefined ) { + var elem = this[0]; + + if ( elem ) { + if( jQuery.nodeName( elem, 'option' ) ) + return (elem.attributes.value || {}).specified ? elem.value : elem.text; + + // We need to handle select boxes special + if ( jQuery.nodeName( elem, "select" ) ) { + var index = elem.selectedIndex, + values = [], + options = elem.options, + one = elem.type == "select-one"; + + // Nothing was selected + if ( index < 0 ) + return null; + + // Loop through all the selected options + for ( var i = one ? index : 0, max = one ? index + 1 : options.length; i < max; i++ ) { + var option = options[ i ]; + + if ( option.selected ) { + // Get the specifc value for the option + value = jQuery(option).val(); + + // We don't need an array for one selects + if ( one ) + return value; + + // Multi-Selects return an array + values.push( value ); + } + } + + return values; + } + + // Everything else, we just grab the value + return (elem.value || "").replace(/\r/g, ""); + + } + + return undefined; + } + + if ( typeof value === "number" ) + value += ''; + + return this.each(function(){ + if ( this.nodeType != 1 ) + return; + + if ( jQuery.isArray(value) && /radio|checkbox/.test( this.type ) ) + this.checked = (jQuery.inArray(this.value, value) >= 0 || + jQuery.inArray(this.name, value) >= 0); + + else if ( jQuery.nodeName( this, "select" ) ) { + var values = jQuery.makeArray(value); + + jQuery( "option", this ).each(function(){ + this.selected = (jQuery.inArray( this.value, values ) >= 0 || + jQuery.inArray( this.text, values ) >= 0); + }); + + if ( !values.length ) + this.selectedIndex = -1; + + } else + this.value = value; + }); + }, + + html: function( value ) { + return value === undefined ? + (this[0] ? + this[0].innerHTML.replace(/ jQuery\d+="(?:\d+|null)"/g, "") : + null) : + this.empty().append( value ); + }, + + replaceWith: function( value ) { + return this.after( value ).remove(); + }, + + eq: function( i ) { + return this.slice( i, +i + 1 ); + }, + + slice: function() { + return this.pushStack( Array.prototype.slice.apply( this, arguments ), + "slice", Array.prototype.slice.call(arguments).join(",") ); + }, + + map: function( callback ) { + return this.pushStack( jQuery.map(this, function(elem, i){ + return callback.call( elem, i, elem ); + })); + }, + + andSelf: function() { + return this.add( this.prevObject ); + }, + + domManip: function( args, table, callback ) { + if ( this[0] ) { + var fragment = (this[0].ownerDocument || this[0]).createDocumentFragment(), + scripts = jQuery.clean( args, (this[0].ownerDocument || this[0]), fragment ), + first = fragment.firstChild; + + if ( first ) + for ( var i = 0, l = this.length; i < l; i++ ) + callback.call( root(this[i], first), this.length > 1 || i > 0 ? + fragment.cloneNode(true) : fragment ); + + if ( scripts ) + jQuery.each( scripts, evalScript ); + } + + return this; + + function root( elem, cur ) { + return table && jQuery.nodeName(elem, "table") && jQuery.nodeName(cur, "tr") ? + (elem.getElementsByTagName("tbody")[0] || + elem.appendChild(elem.ownerDocument.createElement("tbody"))) : + elem; + } + } +}; + +// Give the init function the jQuery prototype for later instantiation +jQuery.fn.init.prototype = jQuery.fn; + +function evalScript( i, elem ) { + if ( elem.src ) + jQuery.ajax({ + url: elem.src, + async: false, + dataType: "script" + }); + + else + jQuery.globalEval( elem.text || elem.textContent || elem.innerHTML || "" ); + + if ( elem.parentNode ) + elem.parentNode.removeChild( elem ); +} + +function now(){ + return +new Date; +} + +jQuery.extend = jQuery.fn.extend = function() { + // copy reference to target object + var target = arguments[0] || {}, i = 1, length = arguments.length, deep = false, options; + + // Handle a deep copy situation + if ( typeof target === "boolean" ) { + deep = target; + target = arguments[1] || {}; + // skip the boolean and the target + i = 2; + } + + // Handle case when target is a string or something (possible in deep copy) + if ( typeof target !== "object" && !jQuery.isFunction(target) ) + target = {}; + + // extend jQuery itself if only one argument is passed + if ( length == i ) { + target = this; + --i; + } + + for ( ; i < length; i++ ) + // Only deal with non-null/undefined values + if ( (options = arguments[ i ]) != null ) + // Extend the base object + for ( var name in options ) { + var src = target[ name ], copy = options[ name ]; + + // Prevent never-ending loop + if ( target === copy ) + continue; + + // Recurse if we're merging object values + if ( deep && copy && typeof copy === "object" && !copy.nodeType ) + target[ name ] = jQuery.extend( deep, + // Never move original objects, clone them + src || ( copy.length != null ? [ ] : { } ) + , copy ); + + // Don't bring in undefined values + else if ( copy !== undefined ) + target[ name ] = copy; + + } + + // Return the modified object + return target; +}; + +// exclude the following css properties to add px +var exclude = /z-?index|font-?weight|opacity|zoom|line-?height/i, + // cache defaultView + defaultView = document.defaultView || {}, + toString = Object.prototype.toString; + +jQuery.extend({ + noConflict: function( deep ) { + window.$ = _$; + + if ( deep ) + window.jQuery = _jQuery; + + return jQuery; + }, + + // See test/unit/core.js for details concerning isFunction. + // Since version 1.3, DOM methods and functions like alert + // aren't supported. They return false on IE (#2968). + isFunction: function( obj ) { + return toString.call(obj) === "[object Function]"; + }, + + isArray: function( obj ) { + return toString.call(obj) === "[object Array]"; + }, + + // check if an element is in a (or is an) XML document + isXMLDoc: function( elem ) { + return elem.nodeType === 9 && elem.documentElement.nodeName !== "HTML" || + !!elem.ownerDocument && jQuery.isXMLDoc( elem.ownerDocument ); + }, + + // Evalulates a script in a global context + globalEval: function( data ) { + if ( data && /\S/.test(data) ) { + // Inspired by code by Andrea Giammarchi + // http://webreflection.blogspot.com/2007/08/global-scope-evaluation-and-dom.html + var head = document.getElementsByTagName("head")[0] || document.documentElement, + script = document.createElement("script"); + + script.type = "text/javascript"; + if ( jQuery.support.scriptEval ) + script.appendChild( document.createTextNode( data ) ); + else + script.text = data; + + // Use insertBefore instead of appendChild to circumvent an IE6 bug. + // This arises when a base node is used (#2709). + head.insertBefore( script, head.firstChild ); + head.removeChild( script ); + } + }, + + nodeName: function( elem, name ) { + return elem.nodeName && elem.nodeName.toUpperCase() == name.toUpperCase(); + }, + + // args is for internal usage only + each: function( object, callback, args ) { + var name, i = 0, length = object.length; + + if ( args ) { + if ( length === undefined ) { + for ( name in object ) + if ( callback.apply( object[ name ], args ) === false ) + break; + } else + for ( ; i < length; ) + if ( callback.apply( object[ i++ ], args ) === false ) + break; + + // A special, fast, case for the most common use of each + } else { + if ( length === undefined ) { + for ( name in object ) + if ( callback.call( object[ name ], name, object[ name ] ) === false ) + break; + } else + for ( var value = object[0]; + i < length && callback.call( value, i, value ) !== false; value = object[++i] ){} + } + + return object; + }, + + prop: function( elem, value, type, i, name ) { + // Handle executable functions + if ( jQuery.isFunction( value ) ) + value = value.call( elem, i ); + + // Handle passing in a number to a CSS property + return typeof value === "number" && type == "curCSS" && !exclude.test( name ) ? + value + "px" : + value; + }, + + className: { + // internal only, use addClass("class") + add: function( elem, classNames ) { + jQuery.each((classNames || "").split(/\s+/), function(i, className){ + if ( elem.nodeType == 1 && !jQuery.className.has( elem.className, className ) ) + elem.className += (elem.className ? " " : "") + className; + }); + }, + + // internal only, use removeClass("class") + remove: function( elem, classNames ) { + if (elem.nodeType == 1) + elem.className = classNames !== undefined ? + jQuery.grep(elem.className.split(/\s+/), function(className){ + return !jQuery.className.has( classNames, className ); + }).join(" ") : + ""; + }, + + // internal only, use hasClass("class") + has: function( elem, className ) { + return elem && jQuery.inArray( className, (elem.className || elem).toString().split(/\s+/) ) > -1; + } + }, + + // A method for quickly swapping in/out CSS properties to get correct calculations + swap: function( elem, options, callback ) { + var old = {}; + // Remember the old values, and insert the new ones + for ( var name in options ) { + old[ name ] = elem.style[ name ]; + elem.style[ name ] = options[ name ]; + } + + callback.call( elem ); + + // Revert the old values + for ( var name in options ) + elem.style[ name ] = old[ name ]; + }, + + css: function( elem, name, force, extra ) { + if ( name == "width" || name == "height" ) { + var val, props = { position: "absolute", visibility: "hidden", display:"block" }, which = name == "width" ? [ "Left", "Right" ] : [ "Top", "Bottom" ]; + + function getWH() { + val = name == "width" ? elem.offsetWidth : elem.offsetHeight; + + if ( extra === "border" ) + return; + + jQuery.each( which, function() { + if ( !extra ) + val -= parseFloat(jQuery.curCSS( elem, "padding" + this, true)) || 0; + if ( extra === "margin" ) + val += parseFloat(jQuery.curCSS( elem, "margin" + this, true)) || 0; + else + val -= parseFloat(jQuery.curCSS( elem, "border" + this + "Width", true)) || 0; + }); + } + + if ( elem.offsetWidth !== 0 ) + getWH(); + else + jQuery.swap( elem, props, getWH ); + + return Math.max(0, Math.round(val)); + } + + return jQuery.curCSS( elem, name, force ); + }, + + curCSS: function( elem, name, force ) { + var ret, style = elem.style; + + // We need to handle opacity special in IE + if ( name == "opacity" && !jQuery.support.opacity ) { + ret = jQuery.attr( style, "opacity" ); + + return ret == "" ? + "1" : + ret; + } + + // Make sure we're using the right name for getting the float value + if ( name.match( /float/i ) ) + name = styleFloat; + + if ( !force && style && style[ name ] ) + ret = style[ name ]; + + else if ( defaultView.getComputedStyle ) { + + // Only "float" is needed here + if ( name.match( /float/i ) ) + name = "float"; + + name = name.replace( /([A-Z])/g, "-$1" ).toLowerCase(); + + var computedStyle = defaultView.getComputedStyle( elem, null ); + + if ( computedStyle ) + ret = computedStyle.getPropertyValue( name ); + + // We should always get a number back from opacity + if ( name == "opacity" && ret == "" ) + ret = "1"; + + } else if ( elem.currentStyle ) { + var camelCase = name.replace(/\-(\w)/g, function(all, letter){ + return letter.toUpperCase(); + }); + + ret = elem.currentStyle[ name ] || elem.currentStyle[ camelCase ]; + + // From the awesome hack by Dean Edwards + // http://erik.eae.net/archives/2007/07/27/18.54.15/#comment-102291 + + // If we're not dealing with a regular pixel number + // but a number that has a weird ending, we need to convert it to pixels + if ( !/^\d+(px)?$/i.test( ret ) && /^\d/.test( ret ) ) { + // Remember the original values + var left = style.left, rsLeft = elem.runtimeStyle.left; + + // Put in the new values to get a computed value out + elem.runtimeStyle.left = elem.currentStyle.left; + style.left = ret || 0; + ret = style.pixelLeft + "px"; + + // Revert the changed values + style.left = left; + elem.runtimeStyle.left = rsLeft; + } + } + + return ret; + }, + + clean: function( elems, context, fragment ) { + context = context || document; + + // !context.createElement fails in IE with an error but returns typeof 'object' + if ( typeof context.createElement === "undefined" ) + context = context.ownerDocument || context[0] && context[0].ownerDocument || document; + + // If a single string is passed in and it's a single tag + // just do a createElement and skip the rest + if ( !fragment && elems.length === 1 && typeof elems[0] === "string" ) { + var match = /^<(\w+)\s*\/?>$/.exec(elems[0]); + if ( match ) + return [ context.createElement( match[1] ) ]; + } + + var ret = [], scripts = [], div = context.createElement("div"); + + jQuery.each(elems, function(i, elem){ + if ( typeof elem === "number" ) + elem += ''; + + if ( !elem ) + return; + + // Convert html string into DOM nodes + if ( typeof elem === "string" ) { + // Fix "XHTML"-style tags in all browsers + elem = elem.replace(/(<(\w+)[^>]*?)\/>/g, function(all, front, tag){ + return tag.match(/^(abbr|br|col|img|input|link|meta|param|hr|area|embed)$/i) ? + all : + front + ">"; + }); + + // Trim whitespace, otherwise indexOf won't work as expected + var tags = elem.replace(/^\s+/, "").substring(0, 10).toLowerCase(); + + var wrap = + // option or optgroup + !tags.indexOf("", "" ] || + + !tags.indexOf("", "" ] || + + tags.match(/^<(thead|tbody|tfoot|colg|cap)/) && + [ 1, "", "
    " ] || + + !tags.indexOf("", "" ] || + + // matched above + (!tags.indexOf("", "" ] || + + !tags.indexOf("", "" ] || + + // IE can't serialize and