diff --git a/src/allmydata/immutable/download2.py b/src/allmydata/immutable/download2.py new file mode 100644 index 0000000..440459c --- /dev/null +++ b/src/allmydata/immutable/download2.py @@ -0,0 +1,1273 @@ + +import binascii +from allmydata.util.hashtree import IncompleteHashTree, BadHashError, \ + NotEnoughHashesError + +(UNUSED, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \ + ("UNUSED", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM") + +class BadSegmentNumberError(Exception): + pass + +class Share: + # this is a specific implementation of IShare for tahoe's native storage + # servers. A different backend would use a different class. + """I represent a single instance of a single share (e.g. I reference the + shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). + I am associated with a CommonShare that remembers data that is held in + common among e.g. SI=abcde/shnum2 across all servers. I am also + associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all + servers). + """ + + def __init__(self, rref, verifycap, commonshare, node, peerid, shnum): + self._rref = rref + self._guess_offsets(verifycap, node.guessed_segment_size) + self.actual_offsets = None + self.actual_segment_size = None + self._UEB_length = None + self._commonshare = commonshare # holds block_hash_tree + self._node = node # holds share_hash_tree and UEB + self._peerid = peerid + self._shnum = shnum + + self._wanted = Spans() # desired metadata + self._wanted_blocks = Spans() # desired block data + self._requested = Spans() # we've sent a request for this + self._received = Spans() # we've received a response for this + self._received_data = DataSpans() # the response contents, still unused + self._requested_blocks = [] # (segnum, set(observer2..)) + ver = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + self._overrun_ok = ver["tolerates-immutable-read-overrun"] + # If _overrun_ok and we guess the offsets correctly, we can get + # everything in one RTT. If _overrun_ok and we guess wrong, we might + # need two RTT (but we could get lucky and do it in one). If overrun + # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version, + # 2=offset table, 3=UEB_length and everything else (hashes, block), + # 4=UEB. + + def _guess_offsets(self, verifycap, guessed_segment_size): + self.guessed_segment_size = guessed_segment_size + size = verifycap.size + k = verifycap.needed_shares + N = verifycap.total_shares + offsets = {} + for i,field in enumerate('data', + 'plaintext_hash_tree', # UNUSED + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension', + ): + offsets[field] = i # bad guesses are easy :) # XXX stub + self.guessed_offsets = offsets + self._fieldsize = 4 + self._fieldstruct = ">L" + + # called by our client, the SegmentFetcher + def get_block(self, segnum): + """Add a block number to the list of requests. This will eventually + result in a fetch of the data necessary to validate the block, then + the block itself. The fetch order is generally + first-come-first-served, but requests may be answered out-of-order if + data becomes available sooner. + + I return an Observer2, which has two uses. The first is to call + o.subscribe(), which gives me a place to send state changes and + eventually the data block. The second is o.cancel(), which removes + the request (if it is still active). + """ + o = Observer2() + o.set_canceler(self._cancel_block_request) + for i,(segnum0,observers) in enumerate(self._requested_blocks): + if segnum0 == segnum: + observers.add(o) + break + else: + self._requested_blocks.append(segnum, set([o])) + eventually(self.loop) + return o + + def _cancel_block_request(self, o): + new_requests = [] + for e in self._requested_blocks: + (segnum0, observers) = e + observers.discard(o) + if observers: + new_requests.append(e) + self._requested_blocks = new_requests + + # internal methods + def _active_segnum(self): + if self._requested_blocks: + return self._requested_blocks[0] + return None + + def _active_segnum_and_observers(self): + if self._requested_blocks: + # we only retrieve information for one segment at a time, to + # minimize alacrity (first come, first served) + return self._requested_blocks[0] + return None, [] + + def loop(self): + # TODO: if any exceptions occur here, kill the download + + # we are (eventually) called after all state transitions: + # new segments added to self._requested_blocks + # new data received from servers (responses to our read() calls) + # impatience timer fires (server appears slow) + + # First, consume all of the information that we currently have, for + # all the segments people currently want. + while self._get_satisfaction(): + pass + + # When we get no satisfaction (from the data we've received so far), + # we determine what data we desire (to satisfy more requests). The + # number of segments is finite, so I can't get no satisfaction + # forever. + self._desire() + + # finally send out requests for whatever we need (desire minus have). + # You can't always get what you want, but, sometimes, you get what + # you need. + self._request_needed() # express desire + + def _get_satisfaction(self): + # return True if we retired a data block, and should therefore be + # called again. Return False if we don't retire a data block (even if + # we do retire some other data, like hash chains). + + if self.actual_offsets is None: + if not self._satisfy_offsets(): + # can't even look at anything without the offset table + return False + + if self._node.UEB is None: + if not self._satisfy_UEB(): + # can't check any hashes without the UEB + return False + + segnum, observers = self._active_segnum_and_observers() + if segnum >= self._node.UEB.num_segments: + for o in observers: + o.notify(state=BADSEGNUM) + self._requested_blocks.pop(0) + return True + + if self._node.share_hash_tree.needed_hashes(self.shnum): + if not self._satisfy_share_hash_tree(): + # can't check block_hash_tree without a root + return False + + if segnum is None: + return False # we don't want any particular segment right now + + # block_hash_tree + needed_hashes = self._commonshare.block_hash_tree.needed_hashes(segnum) + if needed_hashes: + if not self._satisfy_block_hash_tree(needed_hashes): + # can't check block without block_hash_tree + return False + + # data blocks + return self._satisfy_data_block(segnum, observers) + + def _satisfy_offsets(self): + version_s = self._received_data.get(0, 4) + if version_s is None: + return False + (version,) = struct.unpack(">L", version_s) + if version == 1: + table_start = 0x0c + self._fieldsize = 0x4 + self._fieldstruct = ">L" + else: + table_start = 0x14 + self._fieldsize = 0x8 + self._fieldstruct = ">Q" + offset_table_size = 6 * self._fieldsize + table_s = self._received_data.pop(table_start, offset_table_size) + if table_s is None: + return False + fields = struct.unpack(6*self._fieldstruct, table_s) + offsets = {} + for i,field in enumerate('data', + 'plaintext_hash_tree', # UNUSED + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension', + ): + offsets[field] = fields[i] + self.actual_offsets = offsets + self._received_data.remove(0, 4) # don't need this anymore + return True + + def _satisfy_UEB(self): + o = self.actual_offsets + fsize = self._fieldsize + rdata = self._received_data + UEB_length_s = rdata.get(o["uri_extension"], fsize) + if not UEB_length_s: + return False + UEB_length = struct.unpack(UEB_length_s, self._fieldstruct) + UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length) + if not UEB_s: + return False + rdata.remove(o["uri_extension"], fsize) + try: + self._node.validate_UEB(UEB_s) # stores in self._node.UEB # XXX + self.actual_segment_size = self._node.segment_size + assert self.actual_segment_size is not None + return True + except hashtree.BadHashError: + # TODO: if this UEB was bad, we'll keep trying to validate it + # over and over again. Only log.err on the first one, or better + # yet skip all but the first + f = Failure() + self._signal_corruption(f, o["uri_extension"], fsize+UEB_length) + return False + + def _satisfy_share_hash_tree(self): + # the share hash chain is stored as (hashnum,hash) tuples, so you + # can't fetch just the pieces you need, because you don't know + # exactly where they are. So fetch everything, and parse the results + # later. + o = self.actual_offsets + rdata = self._received_data + hashlen = o["uri_extension"] - o["share_hashes"] + assert hashlen % (2+HASH_SIZE) == 0 + hashdata = rdata.get(o["share_hashes"], hashlen) + if not hashdata: + return False + share_hashes = {} + for i in range(0, hashlen, 2+HASH_SIZE): + hashnum = struct.unpack(">H", hashdata[i:i+2])[0] + hashvalue = hashdata[i+2:i+2+HASH_SIZE] + share_hashes[hashnum] = hashvalue + try: + self._node.process_share_hashes(share_hashes) + # adds to self._node.share_hash_tree + rdata.remove(o["share_hashes"], hashlen) + return True + except IndexError, hashtree.BadHashError, hashtree.NotEnoughHashesError: + f = Failure() + self._signal_corruption(f, o["share_hashes"], hashlen) + return False + + def _signal_corruption(self, f, start, offset): + # there was corruption somewhere in the given range + print f # XXX + pass + + def _satisfy_block_hash_tree(self, needed_hashes): + o = self.actual_offsets + rdata = self._received_data + block_hashes = {} + for hashnum in needed_hashes: + hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) + if hashdata: + block_hashes[hashnum] = hashdata + else: + return False # missing some hashes + # note that we don't submit any hashes to the block_hash_tree until + # we've gotten them all, because the hash tree will throw an + # exception if we only give it a partial set (which it therefore + # cannot validate) + ok = commonshare.process_block_hashes(block_hashes) # XXX + if not ok: + return False + for hashnum in needed_hashes: + rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) + return True + + def _satisfy_data_block(self, segnum, observers): + o = self.actual_offsets + segsize = self._node.UEB["segment_size"] + needed_shares = self._node.UEB["needed_shares"] + sharesize = mathutil.div_ceil(self._node.UEB["size"], + needed_shares) + blocksize = mathutil.div_ceil(segsize, needed_shares) # XXX + blockstart = o["data"] + segnum * blocksize + if blocknum < NUM_BLOCKS-1: + blocklen = blocksize + else: + blocklen = sharesize % blocksize + if blocklen == 0: + blocklen = blocksize + block = rdata.pop(blockstart, blocklen) + if not block: + return False + # this block is being retired, either as COMPLETE or CORRUPT, since + # no further data reads will help + assert self._requested_blocks[0][0] == segnum + ok = commonshare.check_block(segnum, block) + if ok: + state = COMPLETE + else: + state = CORRUPT + for o in observers: + # goes to SegmentFetcher._block_request_activity + o.notify(state=state, block=block) + self._requested_blocks.pop(0) # retired + return True # got satisfaction + + def _desire(self): + segnum, observers = self._active_segnum_and_observers() + fsize = self._fieldsize + rdata = self._received_data + commonshare = self._commonshare + + if not self.actual_offsets: + self._desire_offsets() + + # we can use guessed offsets as long as this server tolerates overrun + if not self.actual_offsets and not self._overrun_ok: + return # must wait for the offsets to arrive + + o = self.actual_offsets or self.guessed_offsets + segsize = self.actual_segment_size or self.guessed_segment_size + if self._node.UEB is None: + self._desire_UEB(o) + + if self._node.share_hash_tree.needed_hashes(self.shnum): + hashlen = o["uri_extension"] - o["share_hashes"] + self._wanted.add(o["share_hashes"], hashlen) + + if segnum is None: + return # only need block hashes or blocks for active segments + + # block hash chain + for hashnum in commonshare.block_hash_tree.needed_hashes(segnum): + self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) + + # data + blockstart, blocklen = COMPUTE(segnum, segsize, etc) # XXX + self._wanted_blocks.add(blockstart, blocklen) + + + def _desire_offsets(self): + if self._overrun_ok: + # easy! this includes version number, sizes, and offsets + self._wanted.add(0,1024) + return + + # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44). + # To be conservative, only request the data that we know lives there, + # even if that means more roundtrips. + + self._wanted.add(0,4) # version number, always safe + version_s = self._received_data.get(0, 4) + if not version_s: + return + (version,) = struct.unpack(">L", version_s) + if version == 1: + table_start = 0x0c + fieldsize = 0x4 + else: + table_start = 0x14 + fieldsize = 0x8 + offset_table_size = 6 * fieldsize + self._wanted.add(table_start, offset_table_size) + + def _desire_UEB(self, o): + # UEB data is stored as (length,data). + if self._overrun_ok: + # We can pre-fetch 2kb, which should probably cover it. If it + # turns out to be larger, we'll come back here later with a known + # length and fetch the rest. + self._wanted.add(o["uri_extension"], 2048) + # now, while that is probably enough to fetch the whole UEB, it + # might not be, so we need to do the next few steps as well. In + # most cases, the following steps will not actually add anything + # to self._wanted + + self._wanted.add(o["uri_extension"], self._fieldsize) + # only use a length if we're sure it's correct, otherwise we'll + # probably fetch a huge number + if not self.actual_offsets: + return + UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize) + if UEB_length_s: + UEB_length = struct.unpack(UEB_length_s, self._fieldstruct) + # we know the length, so make sure we grab everything + self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length) + + def _request_needed(self): + # send requests for metadata first, to avoid hanging on to large data + # blocks any longer than necessary. + self._send_requests(self._wanted - self._received - self._requested) + # then send requests for data blocks. All the hashes should arrive + # before the blocks, so the blocks can be consumed and released in a + # single turn. + self._send_requests(self._wanted_blocks - self._received - self._requested + + def _send_requests(self, needed): + for (start, length) in needed: + # TODO: quantize to reasonably-large blocks + self._requested.add(start, length) + d = self._send_request(start, length) + d.addCallback(self._got_data, start, length) + d.addErrback(self._got_error) + d.addErrback(log.err, ...) # XXX + + def _send_request(self, start, length): + return self._rref.callRemote("read", start, length) + + def _got_data(self, data, start, length): + span = (start, length) + assert span in self._requested + self._requested.remove(start, length) + self._received.add(start, length) + self._received_data.add(start, data) + eventually(self.loop) + + def _got_error(self, f): # XXX + ... + + +class CommonShare: + """I hold data that is common across all instances of a single share, + like sh2 on both servers A and B. This is just the block hash tree. + """ + def __init__(self, numsegs): + if numsegs is not None: + self._block_hash_tree = IncompleteHashTree(numsegs) + + def got_numsegs(self, numsegs): + self._block_hash_tree = IncompleteHashTree(numsegs) + + def process_block_hashes(self, block_hashes): + self._block_hash_tree.add_hashes(block_hashes) + return True + def check_block(self, segnum, block): + h = hashutil.block_hash(block) + try: + self._block_hash_tree.set_hashes(leaves={segnum: h}) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + LOG(...) + return False + return True + +# all classes are also Services, and the rule is that you don't initiate more +# work unless self.running + +# GC: decide whether each service is restartable or not. For non-restartable +# services, stopService() should delete a lot of attributes to kill reference +# cycles. The primary goal is to decref remote storage BucketReaders when a +# download is complete. + +class SegmentFetcher: + """I am responsible for acquiring blocks for a single segment. I will use + the Share instances passed to my add_shares() method to locate, retrieve, + and validate those blocks. I expect my parent node to call my + no_more_shares() method when there are no more shares available. I will + call my parent's want_more_shares() method when I want more: I expect to + see at least one call to add_shares or no_more_shares afterwards. + + When I have enough validated blocks, I will call my parent's + process_blocks() method with a dictionary that maps shnum to blockdata. + If I am unable to provide enough blocks, I will call my parent's + fetch_failed() method with (self, f). After either of these events, I + will shut down and do no further work. My parent can also call my stop() + method to have me shut down early.""" + + def __init__(self, node, segnum, k): + self._node = node # CiphertextFileNode + self.segnum = segnum + self._k = k + self._shares = {} # maps non-dead Share instance to a state, one of + # (UNUSED, PENDING, OVERDUE, COMPLETE, CORRUPT). + # State transition map is: + # UNUSED -(send-read)-> PENDING + # PENDING -(timer)-> OVERDUE + # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM + # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM + # If a share becomes DEAD, it is removed from the + # dict. If it becomes BADSEGNUM, the whole fetch is + # terminated. + self._share_observers = {} # maps Share to Observer2 for active ones + self._shnums = DictOfSets() # maps shnum to the shares that provide it + self._blocks = {} # maps shnum to validated block data + self._no_more_shares = False + self._bad_segnum = False + self._running = True + + def stop(self): + self._cancel_all_requests() + self._running = False + del self._shares # let GC work # ??? + + + # called by our parent CiphertextFileNode + + def add_shares(self, shares): + # called when ShareFinder locates a new share, and when a non-initial + # segment fetch is started and we already know about shares from the + # previous segment + for s in shares: + self._shares[s] = UNUSED + self._shnums[s.shnum].add(s) + eventually(self._loop) + + def no_more_shares(self): + # ShareFinder tells us it's reached the end of its list + self._no_more_shares = True + + # internal methods + + def _count_shnums(self, *states): + """shnums for which at least one state is in the following list""" + shnums = [] + for shnum,shares in self._shnums.iteritems(): + matches = [s for s in shares if s.state in states] + if matches: + shnums.append(shnum) + return len(shnums) + + def _loop(self): + if not self._running: + return + if self._bad_segnum: + # oops, we were asking for a segment number beyond the end of the + # file. This is an error. + self.stop() + e = BadSegmentNumberError("%d > %d" % (self.segnum, + self._node.num_segments)) + f = Failure(e) + self._node.fetch_failed(self, f) + return + + # are we done? + if self._count_shnums(COMPLETE) >= self._k: + # yay! + self.stop() + self._node.process_blocks(self.segnum, self._blocks) + return + + # we may have exhausted everything + if (self._no_more_shares and + self._count_shnums(UNUSED, PENDING, OVERDUE, COMPLETE) < self._k): + # no more new shares are coming, and the remaining hopeful shares + # aren't going to be enough. boo! + self.stop() + e = NotEnoughShares("...") # XXX + f = Failure(e) + self._node.fetch_failed(self, f) + return + + # nope, not done. Are we "block-hungry" (i.e. do we want to send out + # more read requests, or do we think we have enough in flight + # already?) + while self._count_shnums(PENDING, COMPLETE) < self._k: + # we're hungry.. are there any unused shares? + sent = self._send_new_request() + if not sent: + break + + # ok, now are we "share-hungry" (i.e. do we have enough known shares + # to make us happy, or should we ask the ShareFinder to get us more?) + if self._count_shnums(UNUSED, PENDING, COMPLETE) < self._k: + # we're hungry for more shares + self._node.want_more_shares() + # that will trigger the ShareFinder to keep looking + + def _send_new_request(self): + for shnum,shares in self._shnums.iteritems(): + states = [self._shares[s] for s in shares] + if COMPLETE in states or PENDING in states: + # don't send redundant requests + continue + if UNUSED not in states: + # no candidates for this shnum, move on + continue + # here's a candidate. Send a request. + s = find_one(shares, UNUSED) # XXX could choose fastest + self._shares[s] = PENDING + self._share_observers[s] = o = s.get_block(segnum) + o.subscribe(self._block_request_activity, share=s, shnum=shnum) + # TODO: build up a list of candidates, then walk through the + # list, sending requests to the most desireable servers, + # re-checking our block-hunger each time. For non-initial segment + # fetches, this would let us stick with faster servers. + return True + # nothing was sent: don't call us again until you have more shares to + # work with, or one of the existing shares has been declared OVERDUE + return False + + def _cancel_all_requests(self): + for o in self._share_observers.values(): + o.cancel() + self._share_observers = {} + + def _block_request_activity(self, share, shnum, state, block=None): + # called by Shares, in response to our s.send_request() calls. + # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. + if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): + del self._share_observers[share] + if state is COMPLETE: + # 'block' is fully validated + self._shares[share] = COMPLETE + self._blocks[shnum] = block + elif state is OVERDUE: + self._shares[share] = OVERDUE + # OVERDUE is not terminal: it will eventually transition to + # COMPLETE, CORRUPT, or DEAD. + elif state is CORRUPT: + self._shares[share] = CORRUPT + elif state is DEAD: + del self._shares[share] + self._shnums[shnum].remove(share) + elif state is BADSEGNUM: + self._shares[share] = BADSEGNUM # ??? + self._bad_segnum = True + eventually(self._loop) + + +class RequestToken: + def __init__(self, peerid): + self.peerid = peerid + +class ShareFinder: + def __init__(self, storage_broker, storage_index, + share_consumer, max_outstanding_requests=10): + self.running = True + s = storage_broker.get_servers_for_index(storage_index) + self._servers = iter(s) + self.share_consumer = share_consumer + self.max_outstanding = max_outstanding_requests + + self._hungry = False + + self._commonshares = {} # shnum to CommonShare instance + self.undelivered_shares = [] + self.pending_requests = set() + + self._si_prefix = base32.b2a_l(storage_index[:8], 60) + self._lp = log.msg(format="ShareFinder[si=%(si)s] starting", + si=self._si_prefix, level=log.NOISY, umid="2xjj2A") + + self._num_segments = None + d = share_consumer.get_num_segments() + d.addCallback(self._got_numsegs) + d.addErrback(log.err, ...) # XXX + + def log(self, *args, **kwargs): + if "parent" not in kwargs: + kwargs["parent"] = self._lp + return log.msg(*args, **kwargs) + + def stop(self): + self.running = False + + def _got_numsegs(self, numsegs): + for cs in self._commonshares.values(): + cs.got_numsegs(numsegs) + self._num_segments = numsegs + + # called by our parent CiphertextDownloader + def hungry(self): + log.msg(format="ShareFinder[si=%(si)s] hungry", + si=self._si_prefix, level=log.NOISY, umid="NywYaQ") + self._hungry = True + eventually(self.loop) + + # internal methods + def loop(self): + log.msg(format="ShareFinder[si=%(si)s] loop: running=%(running)s" + " hungry=%(hungry)s, undelivered=%(undelivered)s," + " pending=%(pending)s", + si=self._si_prefix, running=self._running, hungry=self._hungry, + undelivered=",".join(["sh%d@%s" % (s._shnum, + idlib.shortnodeid_b2a(s._peerid)) + for s in self.undelivered_shares]), + pending=",".join([idlib.shortnodeid_b2a(rt.peerid) + for rt in self.pending_requests]), # sort? + level=log.NOISY, umid="kRtS4Q") + if not self.running: + return + if not self._hungry: + return + if self.undelivered_shares: + sh = self.undelivered_shares.pop(0) + # they will call hungry() again if they want more + self._hungry = False + eventually(self.share_consumer.got_shares, [sh]) + return + if len(self.pending_requests) >= self.max_outstanding_requests: + # cannot send more requests, must wait for some to retire + return + + server = None + try: + if self._servers: + server = self._servers.next() + except StopIteration: + self._servers = None + + if server: + self.send_request(server) + return + + if self.pending_requests: + # no server, but there are still requests in flight: maybe one of + # them will make progress + return + + # we've run out of servers (so we can't send any more requests), and + # we have nothing in flight. No further progress can be made. They + # are destined to remain hungry. + self.share_consumer.no_more_shares() + self.stop() + + + def send_request(self, server): + peerid, rref = server + req = RequestToken(peerid) + self.pending_requests.add(req) + lp = self.log(format="sending DYHB to [%(peerid)s]", + peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, umid="Io7pyg") + d = rref.callRemote("get_buckets", self._storage_index) + d.addBoth(incidentally, self.pending_requests.discard, req) + d.addCallbacks(self._got_response, self._got_error, + callbackArgs=(peerid, req, lp)) + d.addErrback(log.err, format="error in send_request", + level=log.WEIRD, parent=lp, umid="rpdV0w") + d.addCallback(incidentally, eventually, self.loop) + + def _got_response(self, buckets, peerid, req, lp): + if buckets: + shnums_s = ",".join([str(shnum) for shnum in buckets]) + self.log(format="got shnums [%s] from [%(peerid)s]" % shnums_s, + peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="0fcEZw") + else: + self.log(format="no shares from [%(peerid)s]", + peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="U7d4JA") + for shnum, bucket in buckets.iteritems(): + if shnum not in self._commonshares: + self._commonshares[shnum] = CommonShare(self._num_segments) + cs = self._commonshares[shnum] + s = Share(bucket, self.verifycap, cs, self.node, + peerid, shnum) + self.undelivered_shares.append(s) + + def _got_error(self, f, peerid, req): + self.log(format="got error from [%(peerid)s]", + peerid=idlib.shortnodeid_b2a(peerid), failure=f, + level=log.UNUSUAL, parent=lp, umid="zUKdCw") + + + +class Segmentation: + """I am responsible for a single offset+size read of the file. I handle + segmentation: I figure out which segments are necessary, request them + (from my CiphertextDownloader) in order, and trim the segments down to + match the offset+size span. I use the Producer/Consumer interface to only + request one segment at a time. + """ + implements(IPushProducer) + def __init__(self, node, offset, size, consumer): + self._node = node + self._hungry = True + self._active_segnum = None + self._cancel_segment_request = None + # these are updated as we deliver data. At any given time, we still + # want to download file[offset:offset+size] + self._offset = offset + self._size = size + self._consumer = consumer + + def start(self): + self._alive = True + self._deferred = defer.Deferred() + self._consumer.registerProducer(self) # XXX??? + self._maybe_fetch_next() + return self._deferred + + def _maybe_fetch_next(self): + if not self._alive or not self._hungry: + return + if self._active_segnum is not None: + return + self._fetch_next() + + def _fetch_next(self): + if self._size == 0: + # done! + self._alive = False + self._hungry = False + self._consumer.unregisterProducer() + self._deferred.callback(self._consumer) + return + n = self._node + have_actual_segment_size = n.actual_segment_size is not None + segment_size = n.actual_segment_size or n.guessed_segment_size + if self._offset == 0: + # great! we want segment0 for sure + wanted_segnum = 0 + else: + # this might be a guess + wanted_segnum = self._offset // segment_size + self._active_segnum = wanted_segnum + d,c = self._node.get_segment(wanted_segnum) + self._cancel_segment_request = c + d.addBoth(self._request_retired) + d.addCallback(self._got_segment, have_actual_segment_size) + d.addErrback(self._retry_bad_segment, have_actual_segment_size) + d.addErrback(self._error) + + def _request_retired(self, res): + self._active_segnum = None + self._cancel_segment_request = None + return res + + def _got_segment(self, (segment_start,segment), had_actual_segment_size): + self._active_segnum = None + self._cancel_segment_request = None + # we got file[segment_start:segment_start+len(segment)] + # we want file[self._offset:self._offset+self._size] + o = overlap(segment_start, len(segment), self._offset, self._size) + # the overlap is file[o[0]:o[0]+o[1]] + if not o or o[0] != self._offset: + # we didn't get the first byte, so we can't use this segment + if have_actual_segment_size: + # and we should have gotten it right. This is big problem. + raise SOMETHING + # we've wasted some bandwidth, but now we can grab the right one, + # because we should know the segsize by now. + assert self._node.actual_segment_size is not None + self._maybe_fetch_next() + return + offset_in_segment = self._offset - segment_start + desired_data = segment[offset_in_segment:offset_in_segment+o[1]] + + self._offset += len(desired_data) + self._size -= len(desired_data) + self._consumer.write(desired_data) + self._maybe_fetch_next() + + def _retry_bad_segment(self, f, had_actual_segment_size): + f.trap(BadSegmentNumberError): # guessed way wrong, off the end + if had_actual_segment_size: + # but we should have known better, so this is a real error + return f + # we didn't know better: try again with more information + return self._maybe_fetch_next() + + def _error(self, f): + self._alive = False + self._hungry = False + self._consumer.unregisterProducer() + self._deferred.errback(f) + + def stopProducing(self): + self._hungry = False + self._alive = False + # cancel any outstanding segment request + if self._cancel_segment_request: + self._cancel_segment_request() + self._cancel_segment_request = None + def pauseProducing(self): + self._hungry = False + def resumeProducing(self): + self._hungry = True + eventually(self._maybe_fetch_next) + +class Cancel: + def __init__(self, f): + self._f = f + self.cancelled = False + def cancel(self): + if not self.cancelled: + self.cancelled = True + self._f(self) + +class CiphertextFileNode: + # Share._node points to me + def __init__(self, verifycap, storage_broker, secret_holder, + terminator, history): + assert isinstance(verifycap, CHKFileVerifierURI) + self.u = verifycap + storage_index = verifycap.storage_index + self._needed_shares = verifycap.needed_shares + self._total_shares = verifycap.total_shares + self.running = True + terminator.register(self) # calls self.stop() at stopService() + # the rule is: only send network requests if you're active + # (self.running is True). You can do eventual-sends any time. This + # rule should mean that once stopService()+flushEventualQueue() + # fires, everything will be done. + self._secret_holder = secret_holder + self._history = history + + self.share_hash_tree = IncompleteHashTree(self.u.total_shares) + + # we guess the segment size, so Segmentation can pull non-initial + # segments in a single roundtrip + k = verifycap.needed_shares + max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the + # same place as upload.BaseUploadable + s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k) + self.guessed_segment_size = s + + # filled in when we parse a valid UEB + self.have_UEB = False + self.num_segments = None + self.segment_size = None + self.tail_data_size = None + self.tail_segment_size = None + self.block_size = None + self.share_size = None + self.ciphertext_hash_tree = None # size depends on num_segments + self.ciphertext_hash = None # flat hash, optional + + # things to track callers that want data + self._segsize_observers = OneShotObserverList() + self._numsegs_observers = OneShotObserverList() + # _segment_requests can have duplicates + self._segment_requests = [] # (segnum, d, cancel_handle) + self._active_segment = None # a SegmentFetcher, with .segnum + + self._sharefinder = ShareFinder(storage_broker, storage_index, self) + self._shares = set() + + def stop(self): + # called by the Terminator at shutdown, mostly for tests + if self._active_segment: + self._active_segment.stop() + self._active_segment = None + self._sharefinder.stop() + + # things called by our client, either a filenode user or an + # ImmutableFileNode wrapper + + def read(self, consumer, offset=0, size=None): + """I am the main entry point, from which FileNode.read() can get + data. I feed the consumer with the desired range of ciphertext. I + return a Deferred that fires (with the consumer) when the read is + finished.""" + # for concurrent operations: each gets its own Segmentation manager + if size is None: + size = self._size - offset + s = Segmentation(self, offset, size, consumer) + # this raises an interesting question: what segments to fetch? if + # offset=0, always fetch the first segment, and then allow + # Segmentation to be responsible for pulling the subsequent ones if + # the first wasn't large enough. If offset>0, we're going to need an + # extra roundtrip to get the UEB (and therefore the segment size) + # before we can figure out which segment to get. TODO: allow the + # offset-table-guessing code (which starts by guessing the segsize) + # to assist the offset>0 process. + d = s.start() + return d + + def get_segment(self, segnum): + """Begin downloading a segment. I return a tuple (d, c): 'd' is a + Deferred that fires with (offset,data) when the desired segment is + available, and c is an object on which c.cancel() can be called to + disavow interest in the segment (after which 'd' will never fire). + + You probably need to know the segment size before calling this, + unless you want the first few bytes of the file. If you ask for a + segment number which turns out to be too large, the Deferred will + errback with BadSegmentNumberError. + + The Deferred fires with the offset of the first byte of the data + segment, so that you can call get_segment() before knowing the + segment size, and still know which data you received. + """ + d = defer.Deferred() + c = Cancel(self._cancel_request) + self._segment_requests.append( (segnum, d, c) ) + self._start_new_segment() + eventually(self._loop) + return (d, c) + + # things called by the Segmentation object used to transform + # arbitrary-sized read() calls into quantized segment fetches + + def get_segment_size(self): + """I return a Deferred that fires with the segment_size used by this + file.""" + return self._segsize_observers.when_fired() + def get_num_segments(self): + """I return a Deferred that fires with the number of segments used by + this file.""" + return self._numsegs_observers.when_fired() + + def _start_new_segment(self): + if self._active_segment is None and self._segment_requests: + segnum = self._segment_requests[0][0] + self._active_segment = fetcher = SegmentFetcher(self, segnum, + self._needed_shares) + active_shares = [s for s in self._shares if s.not_dead()] + fetcher.add_shares(active_shares) # this triggers the loop + + + # called by our child ShareFinder + def got_shares(self, shares): + self._shares.update(shares) + if self._active_segment + self._active_segment.add_shares(shares) + def no_more_shares(self): + self._no_more_shares = True + if self._active_segment: + self._active_segment.no_more_shares() + + # things called by our Share instances + + def validate_UEB(self, UEB_s): + h = hashutil.uri_extension_hash(UEB_s) + if h != self._verifycap.uri_extension_hash: + raise hashutil.BadHashError + UEB_dict = uri.unpack_extension(data) + self._parse_UEB(self, UEB_dict) # sets self._stuff + # TODO: a malformed (but authentic) UEB could throw an assertion in + # _parse_UEB, and we should abandon the download. + self.have_UEB = True + self._segsize_observers.fire(self.segment_size) + self._numsegs_observers.fire(self.num_segments) + + + def _parse_UEB(self, d): + # Note: the UEB contains needed_shares and total_shares. These are + # redundant and inferior (the filecap contains the authoritative + # values). However, because it is possible to encode the same file in + # multiple ways, and the encoders might choose (poorly) to use the + # same key for both (therefore getting the same SI), we might + # encounter shares for both types. The UEB hashes will be different, + # however, and we'll disregard the "other" encoding's shares as + # corrupted. + + # therefore, we ignore d['total_shares'] and d['needed_shares']. + + self.share_size = mathutil.div_ceil(self._verifycap.size, + self._needed_shares) + + self.segment_size = d['segment_size'] + for r in self._readers: + r.set_segment_size(self.segment_size) + + self.block_size = mathutil.div_ceil(self._segsize, self._needed_shares) + self.num_segments = mathutil.div_ceil(self._size, self.segment_size) + + self.tail_data_size = self._size % self.segment_size + if self.tail_data_size == 0: + self.tail_data_size = self.segment_size + # padding for erasure code + self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, + self._needed_shares) + + # zfec.Decode() instantiation is fast, but still, let's use the same + # codec for anything we can. 3-of-10 takes 15us on my laptop, + # 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is 2.5ms, + # worst-case 254-of-255 is 9.3ms + self._codec = codec.CRSDecoder() + self._codec.set_params(self.segment_size, + self._needed_shares, self._total_shares) + + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. + self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) + self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) + + self.share_hash_tree.set_hashes({0: d['share_root_hash']}) + + # crypttext_hash is optional. We only pull this from the first UEB + # that we see. + if 'crypttext_hash' in d: + if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE: + self.ciphertext_hash = d['crypttext_hash'] + else: + log.msg("ignoring bad-length UEB[crypttext_hash], " + "got %d bytes, want %d" % (len(d['crypttext_hash']), + hashutil.CRYPTO_VAL_SIZE), + umid="oZkGLA", level=log.WEIRD) + + # Our job is a fast download, not verification, so we ignore any + # redundant fields. The Verifier uses a different code path which + # does not ignore them. + + + def process_share_hashes(self, share_hashes): + self.share_hash_tree.set_hashes(share_hashes) + + # called by our child SegmentFetcher + + def want_more_shares(self): + self._sharefinder.hungry() + + def fetch_failed(self, sf, f): + assert sf is self._active_segment + sf.disownServiceParent() + self._active_segment = None + # deliver error upwards + for (d,c) in self._extract_requests(sf.segnum): + eventually(self._deliver_error, d, c, f) + + def _deliver_error(self, d, c, f): + # this method exists to handle cancel() that occurs between + # _got_segment and _deliver_error + if not c.cancelled: + d.errback(f) + + def process_blocks(self, segnum, blocks): + codec = self._codec + if segnum == self.num_segments-1: + codec = codec.CRSDecoder() + k, N = self._needed_shares, self._total_shares + codec.set_params(self.tail_segment_size, k, N) + + shares = [] + shareids = [] + for (shareid, share) in blocks.iteritems(): + shareids.append(shareid) + shares.append(share) + del blocks + segment = codec.decode(shares, shareids) + del shares + self._process_segment(segnum, segment) + + def _process_segment(self, segnum, segment): + h = hashutil.crypttext_hash(segment) + try: + self.ciphertext_hash_tree.set_hashes(leaves={segnum, h}) + except SOMETHING: + SOMETHING + assert self._active_segment.segnum == segnum + assert self.segment_size is not None + offset = segnum * self.segment_size + for (d,c) in self._extract_requests(segnum): + eventually(self._deliver, d, c, offset, segment) + self._active_segment = None + self._start_new_segment() + + def _deliver(self, d, c, offset, segment): + # this method exists to handle cancel() that occurs between + # _got_segment and _deliver + if not c.cancelled: + d.callback((offset,segment)) + + def _extract_requests(self, segnum): + """Remove matching requests and return their (d,c) tuples so that the + caller can retire them.""" + retire = [(d,c) for (segnum0, d, c) in self._segment_requests + if segnum0 == segnum] + self._segment_requests = [t for t in self._segment_requests + if t[0] != segnum] + return retire + + def _cancel_request(self, c): + self._segment_requests = [t for t in self._segment_requests + if t[2] != c] + segnums = [segnum for (segnum,d,c) in self._segment_requests] + if self._active_segment.segnum not in segnums: + self._active_segment.stop() + self._active_segment = None + self._start_new_segment() + +class DecryptingConsumer: + """I sit between a CiphertextDownloader (which acts as a Producer) and + the real Consumer, decrypting everything that passes by. The real + Consumer sees the real Producer, but the Producer sees us instead of the + real consumer.""" + implements(IConsumer) + + def __init__(self, consumer, readkey, offset): + self._consumer = consumer + # TODO: pycryptopp CTR-mode needs random-access operations: I want + # either a=AES(readkey, offset) or better yet both of: + # a=AES(readkey, offset=0) + # a.process(ciphertext, offset=xyz) + # For now, we fake it with the existing iv= argument. + offset_big = offset // 16 + offset_small = offset % 16 + iv = binascii.unhexlify("%032x" % offset_big) + self._decryptor = AES(readkey, iv=iv) + self._decryptor.process("\x00"*offset_small) + + def registerProducer(self, producer): + # this passes through, so the real consumer can flow-control the real + # producer. Therefore we don't need to provide any IPushProducer + # methods. We implement all the IConsumer methods as pass-throughs, + # and only intercept write() to perform decryption. + self._consumer.registerProducer(producer) + def unregisterProducer(self): + self._consumer.unregisterProducer() + def write(self, ciphertext): + plaintext = self._decryptor.process(ciphertext) + self._consumer.write(plaintext) + +class ImmutableFileNode: + # I wrap a CiphertextFileNode with a decryption key + def __init__(self, filecap, storage_broker, secret_holder, downloader, + history): + assert isinstance(filecap, CHKFileURI) + verifycap = filecap.get_verify_cap() + self._cnode = CiphertextFileNode(verifycap, storage_broker, + secret_holder, downloader, history) + assert isinstance(filecap, CHKFileURI) + self.u = filecap + + def read(self, consumer, offset=0, size=None): + decryptor = DecryptingConsumer(consumer, self._readkey, offset) + return self._cnode.read(decryptor, offset, size) + + +# TODO: if server1 has all shares, and server2-10 have one each, make the +# loop stall slightly before requesting all shares from the first server, to +# give it a chance to learn about the other shares and get some diversity. +# Or, don't bother, let the first block all come from one server, and take +# comfort in the fact that we'll learn about the other servers by the time we +# fetch the second block. +# +# davidsarah points out that we could use sequential (instead of parallel) +# fetching of multiple block from a single server: by the time the first +# block arrives, we'll hopefully have heard about other shares. This would +# induce some RTT delays (i.e. lose pipelining) in the case that this server +# has the only shares, but that seems tolerable. We could rig it to only use +# sequential requests on the first segment. + +# as a query gets later, we're more willing to duplicate work. + +# should change server read protocol to allow small shares to be fetched in a +# single RTT. Instead of get_buckets-then-read, just use read(shnums, readv), +# where shnums=[] means all shares, and the return value is a dict of +# # shnum->ta (like with mutable files). The DYHB query should also fetch the +# offset table, since everything else can be located once we have that. + + +# ImmutableFileNode +# DecryptingConsumer +# CiphertextFileNode +# Segmentation +# ShareFinder +# SegmentFetcher[segnum] (one at a time) +# CommonShare[shnum] +# Share[shnum,server] + +# TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers +# should be failed with BadSegmentNumberError. But should this be the +# responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will +# first appear when a Share receives a valid UEB and calls +# CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is +# expecting to hear from the Share, via the _block_request_activity observer. + +# make it the responsibility of the SegmentFetcher. Each Share that gets a +# valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or +# CORRUPT). The SegmentFetcher it then responsible for shutting down, and +# informing its parent (the CiphertextFileNode) of the BadSegmentNumberError, +# which is then passed to the client of get_segment(). + + +# TODO: if offset table is corrupt, attacker could cause us to fetch whole +# (large) share diff --git a/src/allmydata/immutable/download2_off.py b/src/allmydata/immutable/download2_off.py new file mode 100755 index 0000000..d2b8b99 --- /dev/null +++ b/src/allmydata/immutable/download2_off.py @@ -0,0 +1,634 @@ +#! /usr/bin/python + +# known (shnum,Server) pairs are sorted into a list according to +# desireability. This sort is picking a winding path through a matrix of +# [shnum][server]. The goal is to get diversity of both shnum and server. + +# The initial order is: +# find the lowest shnum on the first server, add it +# look at the next server, find the lowest shnum that we don't already have +# if any +# next server, etc, until all known servers are checked +# now look at servers that we skipped (because ... + +# Keep track of which block requests are outstanding by (shnum,Server). Don't +# bother prioritizing "validated" shares: the overhead to pull the share hash +# chain is tiny (4 hashes = 128 bytes), and the overhead to pull a new block +# hash chain is also tiny (1GB file, 8192 segments of 128KiB each, 13 hashes, +# 832 bytes). Each time a block request is sent, also request any necessary +# hashes. Don't bother with a "ValidatedShare" class (as distinct from some +# other sort of Share). Don't bother avoiding duplicate hash-chain requests. + +# For each outstanding segread, walk the list and send requests (skipping +# outstanding shnums) until requests for k distinct shnums are in flight. If +# we can't do that, ask for more. If we get impatient on a request, find the +# first non-outstanding + +# start with the first Share in the list, and send a request. Then look at +# the next one. If we already have a pending request for the same shnum or +# server, push that Share down onto the fallback list and try the next one, +# etc. If we run out of non-fallback shares, use the fallback ones, +# preferring shnums that we don't have outstanding requests for (i.e. assume +# that all requests will complete). Do this by having a second fallback list. + +# hell, I'm reviving the Herder. But remember, we're still talking 3 objects +# per file, not thousands. + +# actually, don't bother sorting the initial list. Append Shares as the +# responses come back, that will put the fastest servers at the front of the +# list, and give a tiny preference to servers that are earlier in the +# permuted order. + +# more ideas: +# sort shares by: +# 1: number of roundtrips needed to get some data +# 2: share number +# 3: ms of RTT delay +# maybe measure average time-to-completion of requests, compare completion +# time against that, much larger indicates congestion on the server side +# or the server's upstream speed is less than our downstream. Minimum +# time-to-completion indicates min(our-downstream,their-upstream). Could +# fetch shares one-at-a-time to measure that better. + +# when should we risk duplicate work and send a new request? + +def walk(self): + shares = sorted(list) + oldshares = copy(shares) + outstanding = list() + fallbacks = list() + second_fallbacks = list() + while len(outstanding.nonlate.shnums) < k: # need more requests + while oldshares: + s = shares.pop(0) + if s.server in outstanding.servers or s.shnum in outstanding.shnums: + fallbacks.append(s) + continue + outstanding.append(s) + send_request(s) + break #'while need_more_requests' + # must use fallback list. Ask for more servers while we're at it. + ask_for_more_servers() + while fallbacks: + s = fallbacks.pop(0) + if s.shnum in outstanding.shnums: + # assume that the outstanding requests will complete, but + # send new requests for other shnums to existing servers + second_fallbacks.append(s) + continue + outstanding.append(s) + send_request(s) + break #'while need_more_requests' + # if we get here, we're being forced to send out multiple queries per + # share. We've already asked for more servers, which might help. If + # there are no late outstanding queries, then duplicate shares won't + # help. Don't send queries for duplicate shares until some of the + # queries are late. + if outstanding.late: + # we're allowed to try any non-outstanding share + while second_fallbacks: + pass + newshares = outstanding + fallbacks + second_fallbacks + oldshares + + +class Server: + """I represent an abstract Storage Server. One day, the StorageBroker + will return instances of me. For now, the StorageBroker returns (peerid, + RemoteReference) tuples, and this code wraps a Server instance around + them. + """ + def __init__(self, peerid, ss): + self.peerid = peerid + self.remote = ss + self._remote_buckets = {} # maps shnum to RIBucketReader + # TODO: release the bucket references on shares that we no longer + # want. OTOH, why would we not want them? Corruption? + + def send_query(self, storage_index): + """I return a Deferred that fires with a set of shnums. If the server + had shares available, I will retain the RemoteReferences to its + buckets, so that get_data(shnum, range) can be called later.""" + d = self.remote.callRemote("get_buckets", self.storage_index) + d.addCallback(self._got_response) + return d + + def _got_response(self, r): + self._remote_buckets = r + return set(r.keys()) + +class ShareOnAServer: + """I represent one instance of a share, known to live on a specific + server. I am created every time a server responds affirmatively to a + do-you-have-block query.""" + + def __init__(self, shnum, server): + self._shnum = shnum + self._server = server + self._block_hash_tree = None + + def cost(self, segnum): + """I return a tuple of (roundtrips, bytes, rtt), indicating how + expensive I think it would be to fetch the given segment. Roundtrips + indicates how many roundtrips it is likely to take (one to get the + data and hashes, plus one to get the offset table and UEB if this is + the first segment we've ever fetched). 'bytes' is how many bytes we + must fetch (estimated). 'rtt' is estimated round-trip time (float) in + seconds for a trivial request. The downloading algorithm will compare + costs to decide which shares should be used.""" + # the most significant factor here is roundtrips: a Share for which + # we already have the offset table is better to than a brand new one + + def max_bandwidth(self): + """Return a float, indicating the highest plausible bytes-per-second + that I've observed coming from this share. This will be based upon + the minimum (bytes-per-fetch / time-per-fetch) ever observed. This + can we used to estimate the server's upstream bandwidth. Clearly this + is only accurate if a share is retrieved with no contention for + either the upstream, downstream, or middle of the connection, but it + may still serve as a useful metric for deciding which servers to pull + from.""" + + def get_segment(self, segnum): + """I return a Deferred that will fire with the segment data, or + errback.""" + +class NativeShareOnAServer(ShareOnAServer): + """For tahoe native (foolscap) servers, I contain a RemoteReference to + the RIBucketReader instance.""" + def __init__(self, shnum, server, rref): + ShareOnAServer.__init__(self, shnum, server) + self._rref = rref # RIBucketReader + +class Share: + def __init__(self, shnum): + self._shnum = shnum + # _servers are the Server instances which appear to hold a copy of + # this share. It is populated when the ValidShare is first created, + # or when we receive a get_buckets() response for a shnum that + # already has a ValidShare instance. When we lose the connection to a + # server, we remove it. + self._servers = set() + # offsets, UEB, and share_hash_tree all live in the parent. + # block_hash_tree lives here. + self._block_hash_tree = None + + self._want + + def get_servers(self): + return self._servers + + + def get_block(self, segnum): + # read enough data to obtain a single validated block + if not self.have_offsets: + # we get the offsets in their own read, since they tell us where + # everything else lives. We must fetch offsets for each share + # separately, since they aren't directly covered by the UEB. + pass + if not self.parent.have_ueb: + # use _guessed_segsize to make a guess about the layout, so we + # can fetch both the offset table and the UEB in the same read. + # This also requires making a guess about the presence or absence + # of the plaintext_hash_tree. Oh, and also the version number. Oh + # well. + pass + +class CiphertextDownloader: + """I manage all downloads for a single file. I operate a state machine + with input events that are local read() requests, responses to my remote + 'get_bucket' and 'read_bucket' messages, and connection establishment and + loss. My outbound events are connection establishment requests and bucket + read requests messages. + """ + # eventually this will merge into the FileNode + ServerClass = Server # for tests to override + + def __init__(self, storage_index, ueb_hash, size, k, N, storage_broker, + shutdowner): + # values we get from the filecap + self._storage_index = si = storage_index + self._ueb_hash = ueb_hash + self._size = size + self._needed_shares = k + self._total_shares = N + self._share_hash_tree = IncompleteHashTree(self._total_shares) + # values we discover when we first fetch the UEB + self._ueb = None # is dict after UEB fetch+validate + self._segsize = None + self._numsegs = None + self._blocksize = None + self._tail_segsize = None + self._ciphertext_hash = None # optional + # structures we create when we fetch the UEB, then continue to fill + # as we download the file + self._share_hash_tree = None # is IncompleteHashTree after UEB fetch + self._ciphertext_hash_tree = None + + # values we learn as we download the file + self._offsets = {} # (shnum,Server) to offset table (dict) + self._block_hash_tree = {} # shnum to IncompleteHashTree + # other things which help us + self._guessed_segsize = min(128*1024, size) + self._active_share_readers = {} # maps shnum to Reader instance + self._share_readers = [] # sorted by preference, best first + self._readers = set() # set of Reader instances + self._recent_horizon = 10 # seconds + + # 'shutdowner' is a MultiService parent used to cancel all downloads + # when the node is shutting down, to let tests have a clean reactor. + + self._init_available_servers() + self._init_find_enough_shares() + + # _available_servers is an iterator that provides us with Server + # instances. Each time we pull out a Server, we immediately send it a + # query, so we don't need to keep track of who we've sent queries to. + + def _init_available_servers(self): + self._available_servers = self._get_available_servers() + self._no_more_available_servers = False + + def _get_available_servers(self): + """I am a generator of servers to use, sorted by the order in which + we should query them. I make sure there are no duplicates in this + list.""" + # TODO: make StorageBroker responsible for this non-duplication, and + # replace this method with a simple iter(get_servers_for_index()), + # plus a self._no_more_available_servers=True + seen = set() + sb = self._storage_broker + for (peerid, ss) in sb.get_servers_for_index(self._storage_index): + if peerid not in seen: + yield self.ServerClass(peerid, ss) # Server(peerid, ss) + seen.add(peerid) + self._no_more_available_servers = True + + # this block of code is responsible for having enough non-problematic + # distinct shares/servers available and ready for download, and for + # limiting the number of queries that are outstanding. The idea is that + # we'll use the k fastest/best shares, and have the other ones in reserve + # in case those servers stop responding or respond too slowly. We keep + # track of all known shares, but we also keep track of problematic shares + # (ones with hash failures or lost connections), so we can put them at + # the bottom of the list. + + def _init_find_enough_shares(self): + # _unvalidated_sharemap maps shnum to set of Servers, and remembers + # where viable (but not yet validated) shares are located. Each + # get_bucket() response adds to this map, each act of validation + # removes from it. + self._sharemap = DictOfSets() + + # _sharemap maps shnum to set of Servers, and remembers where viable + # shares are located. Each get_bucket() response adds to this map, + # each hash failure or disconnect removes from it. (TODO: if we + # disconnect but reconnect later, we should be allowed to re-query). + self._sharemap = DictOfSets() + + # _problem_shares is a set of (shnum, Server) tuples, and + + # _queries_in_flight maps a Server to a timestamp, which remembers + # which servers we've sent queries to (and when) but have not yet + # heard a response. This lets us put a limit on the number of + # outstanding queries, to limit the size of the work window (how much + # extra work we ask servers to do in the hopes of keeping our own + # pipeline filled). We remove a Server from _queries_in_flight when + # we get an answer/error or we finally give up. If we ever switch to + # a non-connection-oriented protocol (like UDP, or forwarded Chord + # queries), we can use this information to retransmit any query that + # has gone unanswered for too long. + self._queries_in_flight = dict() + + def _count_recent_queries_in_flight(self): + now = time.time() + recent = now - self._recent_horizon + return len([s for (s,when) in self._queries_in_flight.items() + if when > recent]) + + def _find_enough_shares(self): + # goal: have 2*k distinct not-invalid shares available for reading, + # from 2*k distinct servers. Do not have more than 4*k "recent" + # queries in flight at a time. + if (len(self._sharemap) >= 2*self._needed_shares + and len(self._sharemap.values) >= 2*self._needed_shares): + return + num = self._count_recent_queries_in_flight() + while num < 4*self._needed_shares: + try: + s = self._available_servers.next() + except StopIteration: + return # no more progress can be made + self._queries_in_flight[s] = time.time() + d = s.send_query(self._storage_index) + d.addBoth(incidentally, self._queries_in_flight.discard, s) + d.addCallbacks(lambda shnums: [self._sharemap.add(shnum, s) + for shnum in shnums], + lambda f: self._query_error(f, s)) + d.addErrback(self._error) + d.addCallback(self._reschedule) + num += 1 + + def _query_error(self, f, s): + # a server returned an error, log it gently and ignore + level = log.WEIRD + if f.check(DeadReferenceError): + level = log.UNUSUAL + log.msg("Error during get_buckets to server=%(server)s", server=str(s), + failure=f, level=level, umid="3uuBUQ") + + # this block is responsible for turning known shares into usable shares, + # by fetching enough data to validate their contents. + + # UEB (from any share) + # share hash chain, validated (from any share, for given shnum) + # block hash (any share, given shnum) + + def _got_ueb(self, ueb_data, share): + if self._ueb is not None: + return + if hashutil.uri_extension_hash(ueb_data) != self._ueb_hash: + share.error("UEB hash does not match") + return + d = uri.unpack_extension(ueb_data) + self.share_size = mathutil.div_ceil(self._size, self._needed_shares) + + + # There are several kinds of things that can be found in a UEB. + # First, things that we really need to learn from the UEB in order to + # do this download. Next: things which are optional but not redundant + # -- if they are present in the UEB they will get used. Next, things + # that are optional and redundant. These things are required to be + # consistent: they don't have to be in the UEB, but if they are in + # the UEB then they will be checked for consistency with the + # already-known facts, and if they are inconsistent then an exception + # will be raised. These things aren't actually used -- they are just + # tested for consistency and ignored. Finally: things which are + # deprecated -- they ought not be in the UEB at all, and if they are + # present then a warning will be logged but they are otherwise + # ignored. + + # First, things that we really need to learn from the UEB: + # segment_size, crypttext_root_hash, and share_root_hash. + self._segsize = d['segment_size'] + + self._blocksize = mathutil.div_ceil(self._segsize, self._needed_shares) + self._numsegs = mathutil.div_ceil(self._size, self._segsize) + + self._tail_segsize = self._size % self._segsize + if self._tail_segsize == 0: + self._tail_segsize = self._segsize + # padding for erasure code + self._tail_segsize = mathutil.next_multiple(self._tail_segsize, + self._needed_shares) + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. + self._ciphertext_hash_tree = IncompleteHashTree(self._numsegs) + self._ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) + + self._share_hash_tree.set_hashes({0: d['share_root_hash']}) + + + # Next: things that are optional and not redundant: crypttext_hash + if 'crypttext_hash' in d: + if len(self._ciphertext_hash) == hashutil.CRYPTO_VAL_SIZE: + self._ciphertext_hash = d['crypttext_hash'] + else: + log.msg("ignoring bad-length UEB[crypttext_hash], " + "got %d bytes, want %d" % (len(d['crypttext_hash']), + hashutil.CRYPTO_VAL_SIZE), + umid="oZkGLA", level=log.WEIRD) + + # we ignore all of the redundant fields when downloading. The + # Verifier uses a different code path which does not ignore them. + + # finally, set self._ueb as a marker that we don't need to request it + # anymore + self._ueb = d + + def _got_share_hashes(self, hashes, share): + assert isinstance(hashes, dict) + try: + self._share_hash_tree.set_hashes(hashes) + except (IndexError, BadHashError, NotEnoughHashesError), le: + share.error("Bad or missing hashes") + return + + #def _got_block_hashes( + + def _init_validate_enough_shares(self): + # _valid_shares maps shnum to ValidatedShare instances, and is + # populated once the block hash root has been fetched and validated + # (which requires any valid copy of the UEB, and a valid copy of the + # share hash chain for each shnum) + self._valid_shares = {} + + # _target_shares is an ordered list of ReadyShare instances, each of + # which is a (shnum, server) tuple. It is sorted in order of + # preference: we expect to get the fastest response from the + # ReadyShares at the front of the list. It is also sorted to + # distribute the shnums, so that fetching shares from + # _target_shares[:k] is likely (but not guaranteed) to give us k + # distinct shares. The rule is that we skip over entries for blocks + # that we've already received, limit the number of recent queries for + # the same block, + self._target_shares = [] + + def _validate_enough_shares(self): + # my goal is to have at least 2*k distinct validated shares from at + # least 2*k distinct servers + valid_share_servers = set() + for vs in self._valid_shares.values(): + valid_share_servers.update(vs.get_servers()) + if (len(self._valid_shares) >= 2*self._needed_shares + and len(self._valid_share_servers) >= 2*self._needed_shares): + return + #for + + def _reschedule(self, _ign): + # fire the loop again + if not self._scheduled: + self._scheduled = True + eventually(self._loop) + + def _loop(self): + self._scheduled = False + # what do we need? + + self._find_enough_shares() + self._validate_enough_shares() + + if not self._ueb: + # we always need a copy of the UEB + pass + + def _error(self, f): + # this is an unexpected error: a coding bug + log.err(f, level=log.UNUSUAL) + + + +# using a single packed string (and an offset table) may be an artifact of +# our native storage server: other backends might allow cheap multi-part +# files (think S3, several buckets per share, one for each section). + +# find new names for: +# data_holder +# Share / Share2 (ShareInstance / Share? but the first is more useful) + +class IShare(Interface): + """I represent a single instance of a single share (e.g. I reference the + shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). + This interface is used by SegmentFetcher to retrieve validated blocks. + """ + def get_block(segnum): + """Return an Observer2, which will be notified with the following + events: + state=COMPLETE, block=data (terminal): validated block data + state=OVERDUE (non-terminal): we have reason to believe that the + request might have stalled, or we + might just be impatient + state=CORRUPT (terminal): the data we received was corrupt + state=DEAD (terminal): the connection has failed + """ + + +# it'd be nice if we receive the hashes before the block, or just +# afterwards, so we aren't stuck holding on to unvalidated blocks +# that we can't process. If we guess the offsets right, we can +# accomplish this by sending the block request after the metadata +# requests (by keeping two separate requestlists), and have a one RTT +# pipeline like: +# 1a=metadata, 1b=block +# 1b->process+deliver : one RTT + +# But if we guess wrong, and fetch the wrong part of the block, we'll +# have a pipeline that looks like: +# 1a=wrong metadata, 1b=wrong block +# 1a->2a=right metadata,2b=right block +# 2b->process+deliver +# which means two RTT and buffering one block (which, since we'll +# guess the segsize wrong for everything, means buffering one +# segment) + +# if we start asking for multiple segments, we could get something +# worse: +# 1a=wrong metadata, 1b=wrong block0, 1c=wrong block1, .. +# 1a->2a=right metadata,2b=right block0,2c=right block1, . +# 2b->process+deliver + +# which means two RTT but fetching and buffering the whole file +# before delivering anything. However, since we don't know when the +# other shares are going to arrive, we need to avoid having more than +# one block in the pipeline anyways. So we shouldn't be able to get +# into this state. + +# it also means that, instead of handling all of +# self._requested_blocks at once, we should only be handling one +# block at a time: one of the requested block should be special +# (probably FIFO). But retire all we can. + + # this might be better with a Deferred, using COMPLETE as the success + # case and CORRUPT/DEAD in an errback, because that would let us hold the + # 'share' and 'shnum' arguments locally (instead of roundtripping them + # through Share.send_request). But that OVERDUE is not terminal. So I + # want a new sort of callback mechanism, with the extra-argument-passing + # aspects of Deferred, but without being so one-shot. Is this a job for + # Observer? No, it doesn't take extra arguments. So this uses Observer2. + + +class Reader: + """I am responsible for a single offset+size read of the file. I handle + segmentation: I figure out which segments are necessary, request them + (from my CiphertextDownloader) in order, and trim the segments down to + match the offset+size span. I use the Producer/Consumer interface to only + request one segment at a time. + """ + implements(IPushProducer) + def __init__(self, consumer, offset, size): + self._needed = [] + self._consumer = consumer + self._hungry = False + self._offset = offset + self._size = size + self._segsize = None + def start(self): + self._alive = True + self._deferred = defer.Deferred() + # the process doesn't actually start until set_segment_size() + return self._deferred + + def set_segment_size(self, segsize): + if self._segsize is not None: + return + self._segsize = segsize + self._compute_segnums() + + def _compute_segnums(self, segsize): + # now that we know the file's segsize, what segments (and which + # ranges of each) will we need? + size = self._size + offset = self._offset + while size: + assert size >= 0 + this_seg_num = int(offset / self._segsize) + this_seg_offset = offset - (seg_num*self._segsize) + this_seg_size = min(size, self._segsize-seg_offset) + size -= this_seg_size + if size: + offset += this_seg_size + yield (this_seg_num, this_seg_offset, this_seg_size) + + def get_needed_segments(self): + return set([segnum for (segnum, off, size) in self._needed]) + + + def stopProducing(self): + self._hungry = False + self._alive = False + # TODO: cancel the segment requests + def pauseProducing(self): + self._hungry = False + def resumeProducing(self): + self._hungry = True + def add_segment(self, segnum, offset, size): + self._needed.append( (segnum, offset, size) ) + def got_segment(self, segnum, segdata): + """Return True if this schedule has more to go, or False if it is + done.""" + assert self._needed[0][segnum] == segnum + (_ign, offset, size) = self._needed.pop(0) + data = segdata[offset:offset+size] + self._consumer.write(data) + if not self._needed: + # we're done + self._alive = False + self._hungry = False + self._consumer.unregisterProducer() + self._deferred.callback(self._consumer) + def error(self, f): + self._alive = False + self._hungry = False + self._consumer.unregisterProducer() + self._deferred.errback(f) + + + +class x: + def OFFread(self, consumer, offset=0, size=None): + """I am the main entry point, from which FileNode.read() can get + data.""" + # tolerate concurrent operations: each gets its own Reader + if size is None: + size = self._size - offset + r = Reader(consumer, offset, size) + self._readers.add(r) + d = r.start() + if self.segment_size is not None: + r.set_segment_size(self.segment_size) + # TODO: if we can't find any segments, and thus never get a + # segsize, tell the Readers to give up + return d diff --git a/src/allmydata/immutable/download2_util.py b/src/allmydata/immutable/download2_util.py new file mode 100755 index 0000000..48f2f0a --- /dev/null +++ b/src/allmydata/immutable/download2_util.py @@ -0,0 +1,73 @@ + +import weakref + +class Observer2: + """A simple class to distribute multiple events to a single subscriber. + It accepts arbitrary kwargs, but no posargs.""" + def __init__(self): + self._watcher = None + self._undelivered_results = [] + self._canceler = None + + def set_canceler(self, f): + # we use a weakref to avoid creating a cycle between us and the thing + # we're observing: they'll be holding a reference to us to compare + # against the value we pass to their canceler function. + self._canceler = weakref(f) + + def subscribe(self, observer, **watcher_kwargs): + self._watcher = (observer, watcher_kwargs) + while self._undelivered_results: + self._notify(self._undelivered_results.pop(0)) + + def notify(self, **result_kwargs): + if self._watcher: + self._notify(result_kwargs) + else: + self._undelivered_results.append(result_kwargs) + + def _notify(self, result_kwargs): + o, watcher_kwargs = self._watcher + kwargs = dict(result_kwargs) + kwargs.update(watcher_kwargs) + eventually(o, **kwargs) + + def cancel(self): + f = self._canceler() + if f: + f(self) + +class DictOfSets: + def add(self, key, value): pass + def values(self): # return set that merges all value sets + r = set() + for key in self: + r.update(self[key]) + return r + + +def incidentally(res, f, *args, **kwargs): + """Add me to a Deferred chain like this: + d.addBoth(incidentally, func, arg) + and I'll behave as if you'd added the following function: + def _(res): + func(arg) + return res + This is useful if you want to execute an expression when the Deferred + fires, but don't care about its value. + """ + f(*args, **kwargs) + return res + + +import weakref +class Terminator(service.Service): + def __init__(self): + service.Service.__init__(self) + self._clients = weakref.WeakKeyDictionary() + def register(self, c): + self._clients[c] = None + def stopService(self): + for c in self._clients: + c.stop() + return service.Service.stopService(self) diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 6874655..b7537d7 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -7,12 +7,14 @@ from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.python import log +from hashlib import md5 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil from allmydata.util import assertutil, fileutil, deferredutil, abbreviate from allmydata.util import limiter, time_format, pollmixin, cachedir from allmydata.util import statistics, dictutil, pipeline from allmydata.util import log as tahoe_log +from allmydata.util.spans import Spans, overlap, DataSpans class Base32(unittest.TestCase): def test_b2a_matches_Pythons(self): @@ -1511,3 +1513,528 @@ class Log(unittest.TestCase): tahoe_log.err(format="intentional sample error", failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ") self.flushLoggedErrors(SampleError) + + +class SimpleSpans: + # this is a simple+inefficient form of util.spans.Spans . We compare the + # behavior of this reference model against the real (efficient) form. + + def __init__(self, _span_or_start=None, length=None): + self._have = set() + if length is not None: + for i in range(_span_or_start, _span_or_start+length): + self._have.add(i) + elif _span_or_start: + for (start,length) in _span_or_start: + self.add(start, length) + + def add(self, start, length): + for i in range(start, start+length): + self._have.add(i) + return self + + def remove(self, start, length): + for i in range(start, start+length): + self._have.discard(i) + return self + + def each(self): + return sorted(self._have) + + def __iter__(self): + items = sorted(self._have) + prevstart = None + prevend = None + for i in items: + if prevstart is None: + prevstart = prevend = i + continue + if i == prevend+1: + prevend = i + continue + yield (prevstart, prevend-prevstart+1) + prevstart = prevend = i + if prevstart is not None: + yield (prevstart, prevend-prevstart+1) + + def __len__(self): + # this also gets us bool(s) + return len(self._have) + + def __add__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.add(start, length) + return s + + def __sub__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.remove(start, length) + return s + + def __iadd__(self, other): + for (start, length) in other: + self.add(start, length) + return self + + def __isub__(self, other): + for (start, length) in other: + self.remove(start, length) + return self + + def __contains__(self, (start,length)): + for i in range(start, start+length): + if i not in self._have: + return False + return True + +class ByteSpans(unittest.TestCase): + def test_basic(self): + s = Spans() + self.failUnlessEqual(list(s), []) + self.failIf(s) + self.failIf((0,1) in s) + self.failUnlessEqual(len(s), 0) + + s1 = Spans(3, 4) # 3,4,5,6 + self._check1(s1) + + s2 = Spans(s1) + self._check1(s2) + + s2.add(10,2) # 10,11 + self._check1(s1) + self.failUnless((10,1) in s2) + self.failIf((10,1) in s1) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) + self.failUnlessEqual(len(s2), 6) + + s2.add(15,2).add(20,2) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) + self.failUnlessEqual(len(s2), 10) + + s2.remove(4,3).remove(15,1) + self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) + self.failUnlessEqual(len(s2), 6) + + def _check1(self, s): + self.failUnlessEqual(list(s), [(3,4)]) + self.failUnless(s) + self.failUnlessEqual(len(s), 4) + self.failIf((0,1) in s) + self.failUnless((3,4) in s) + self.failUnless((3,1) in s) + self.failUnless((5,2) in s) + self.failUnless((6,1) in s) + self.failIf((6,2) in s) + self.failIf((7,1) in s) + self.failUnlessEqual(list(s.each()), [3,4,5,6]) + + def test_math(self): + s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 + s2 = Spans(5, 3) # 5,6,7 + s3 = Spans(8, 4) # 8,9,10,11 + + s = s1 - s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = s1 - s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = s2 - s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + + s = s1 + s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = s1 + s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = s2 + s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + s = Spans(s1) + s -= s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = Spans(s1) + s -= s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = Spans(s2) + s -= s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + + s = Spans(s1) + s += s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = Spans(s1) + s += s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = Spans(s2) + s += s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + def test_random(self): + # attempt to increase coverage of corner cases by comparing behavior + # of a simple-but-slow model implementation against the + # complex-but-fast actual implementation, in a large number of random + # operations + S1 = SimpleSpans + S2 = Spans + s1 = S1(); s2 = S2() + seed = "" + def _create(subseed): + ns1 = S1(); ns2 = S2() + for i in range(10): + what = md5(subseed+str(i)).hexdigest() + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + ns1.add(start, length); ns2.add(start, length) + return ns1, ns2 + + #print + for i in range(1000): + what = md5(seed+str(i)).hexdigest() + op = what[0] + subop = what[1] + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + #print what + if op in "0": + if subop in "01234": + s1 = S1(); s2 = S2() + elif subop in "5678": + s1 = S1(start, length); s2 = S2(start, length) + else: + s1 = S1(s1); s2 = S2(s2) + #print "s2 = %s" % s2.dump() + elif op in "123": + #print "s2.add(%d,%d)" % (start, length) + s1.add(start, length); s2.add(start, length) + elif op in "456": + #print "s2.remove(%d,%d)" % (start, length) + s1.remove(start, length); s2.remove(start, length) + elif op in "78": + ns1, ns2 = _create(what[7:11]) + #print "s2 + %s" % ns2.dump() + s1 = s1 + ns1; s2 = s2 + ns2 + elif op in "9a": + ns1, ns2 = _create(what[7:11]) + #print "%s - %s" % (s2.dump(), ns2.dump()) + s1 = s1 - ns1; s2 = s2 - ns2 + elif op in "bc": + ns1, ns2 = _create(what[7:11]) + #print "s2 += %s" % ns2.dump() + s1 += ns1; s2 += ns2 + else: + ns1, ns2 = _create(what[7:11]) + #print "%s -= %s" % (s2.dump(), ns2.dump()) + s1 -= ns1; s2 -= ns2 + #print "s2 now %s" % s2.dump() + self.failUnlessEqual(list(s1.each()), list(s2.each())) + self.failUnlessEqual(len(s1), len(s2)) + self.failUnlessEqual(bool(s1), bool(s2)) + self.failUnlessEqual(list(s1), list(s2)) + for j in range(10): + what = md5(what[12:14]+str(j)).hexdigest() + start = int(what[2:4], 16) + length = max(1, int(what[5:6], 16)) + span = (start, length) + self.failUnlessEqual(bool(span in s1), bool(span in s2)) + + + # s() + # s(start,length) + # s(s0) + # s.add(start,length) : returns s + # s.remove(start,length) + # s.each() -> list of byte offsets, mostly for testing + # list(s) -> list of (start,length) tuples, one per span + # (start,length) in s -> True if (start..start+length-1) are all members + # NOT equivalent to x in list(s) + # len(s) -> number of bytes, for testing, bool(), and accounting/limiting + # bool(s) (__len__) + # s = s1+s2, s1-s2, +=s1, -=s1 + + def test_overlap(self): + for a in range(20): + for b in range(10): + for c in range(20): + for d in range(10): + self._test_overlap(a,b,c,d) + + def _test_overlap(self, a, b, c, d): + s1 = set(range(a,a+b)) + s2 = set(range(c,c+d)) + #print "---" + #self._show_overlap(s1, "1") + #self._show_overlap(s2, "2") + o = overlap(a,b,c,d) + expected = s1.intersection(s2) + if not expected: + self.failUnlessEqual(o, None) + else: + start,length = o + so = set(range(start,start+length)) + #self._show(so, "o") + self.failUnlessEqual(so, expected) + + def _show_overlap(self, s, c): + import sys + out = sys.stdout + if s: + for i in range(max(s)): + if i in s: + out.write(c) + else: + out.write(" ") + out.write("\n") + +def extend(s, start, length, fill): + if len(s) >= start+length: + return s + assert len(fill) == 1 + return s + fill*(start+length-len(s)) + +def replace(s, start, data): + assert len(s) >= start+len(data) + return s[:start] + data + s[start+len(data):] + +class SimpleDataSpans: + def __init__(self, other=None): + self.missing = "" # "1" where missing, "0" where found + self.data = "" + if other: + for (start, data) in other.get_spans(): + self.add(start, data) + + def __len__(self): + return len(self.missing.translate(None, "1")) + def _dump(self): + return [i for (i,c) in enumerate(self.missing) if c == "0"] + def _have(self, start, length): + m = self.missing[start:start+length] + if not m or len(m) prev_end + prev_end = start+length + except AssertionError: + print "BAD:", self.dump() + raise + + def add(self, start, length): + assert start >= 0 + assert length > 0 + #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump()) + first_overlap = last_overlap = None + for i,(s_start,s_length) in enumerate(self._spans): + #print " (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length)) + if (overlap(s_start, s_length, start, length) + or adjacent(s_start, s_length, start, length)): + last_overlap = i + if first_overlap is None: + first_overlap = i + continue + # no overlap + if first_overlap is not None: + break + #print " first_overlap", first_overlap, last_overlap + if first_overlap is None: + # no overlap, so just insert the span and sort by starting + # position. + self._spans.insert(0, (start,length)) + self._spans.sort() + else: + # everything from [first_overlap] to [last_overlap] overlapped + first_start,first_length = self._spans[first_overlap] + last_start,last_length = self._spans[last_overlap] + newspan_start = min(start, first_start) + newspan_end = max(start+length, last_start+last_length) + newspan_length = newspan_end - newspan_start + newspan = (newspan_start, newspan_length) + self._spans[first_overlap:last_overlap+1] = [newspan] + #print " ADD done: %s" % self.dump() + self._check() + + return self + + def remove(self, start, length): + assert start >= 0 + assert length > 0 + #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump()) + first_complete_overlap = last_complete_overlap = None + for i,(s_start,s_length) in enumerate(self._spans): + s_end = s_start + s_length + o = overlap(s_start, s_length, start, length) + if o: + o_start, o_length = o + o_end = o_start+o_length + if o_start == s_start and o_end == s_end: + # delete this span altogether + if first_complete_overlap is None: + first_complete_overlap = i + last_complete_overlap = i + elif o_start == s_start: + # we only overlap the left side, so trim the start + # 1111 + # rrrr + # oo + # -> 11 + new_start = o_end + new_end = s_end + assert new_start > s_start + new_length = new_end - new_start + self._spans[i] = (new_start, new_length) + elif o_end == s_end: + # we only overlap the right side + # 1111 + # rrrr + # oo + # -> 11 + new_start = s_start + new_end = o_start + assert new_end < s_end + new_length = new_end - new_start + self._spans[i] = (new_start, new_length) + else: + # we overlap the middle, so create a new span. No need to + # examine any other spans. + # 111111 + # rr + # LL RR + left_start = s_start + left_end = o_start + left_length = left_end - left_start + right_start = o_end + right_end = s_end + right_length = right_end - right_start + self._spans[i] = (left_start, left_length) + self._spans.append( (right_start, right_length) ) + self._spans.sort() + break + if first_complete_overlap is not None: + del self._spans[first_complete_overlap:last_complete_overlap+1] + #print " REMOVE done: %s" % self.dump() + self._check() + return self + + def dump(self): + return "len=%d: %s" % (len(self), + ",".join(["[%d-%d]" % (start,start+l-1) + for (start,l) in self._spans]) ) + + def each(self): + for start, length in self._spans: + for i in range(start, start+length): + yield i + + def __iter__(self): + for s in self._spans: + yield s + + def __len__(self): + # this also gets us bool(s) + return sum([length for start,length in self._spans]) + + def __add__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.add(start, length) + return s + + def __sub__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.remove(start, length) + return s + + def __iadd__(self, other): + for (start, length) in other: + self.add(start, length) + return self + + def __isub__(self, other): + for (start, length) in other: + self.remove(start, length) + return self + + def __contains__(self, (start,length)): + for span_start,span_length in self._spans: + o = overlap(start, length, span_start, span_length) + if o: + o_start,o_length = o + if o_start == start and o_length == length: + return True + return False + +def overlap(start0, length0, start1, length1): + # return start2,length2 of the overlapping region, or None + # 00 00 000 0000 00 00 000 00 00 00 00 + # 11 11 11 11 111 11 11 1111 111 11 11 + left = max(start0, start1) + right = min(start0+length0, start1+length1) + # if there is overlap, 'left' will be its start, and right-1 will + # be the end' + if left < right: + return (left, right-left) + return None + +def adjacent(start0, length0, start1, length1): + if (start0 < start1) and start0+length0 == start1: + return True + elif (start1 < start0) and start1+length1 == start0: + return True + return False + +class DataSpans: + """I represent portions of a large string. Equivalently, I can be said to + maintain a large array of characters (with gaps of empty elements). I can + be used to manage access to a remote share, where some pieces have been + retrieved, some have been requested, and others have not been read. + """ + + def __init__(self, other=None): + self.spans = [] # (start, data) tuples, non-overlapping, merged + if other: + for (start, data) in other.get_spans(): + self.add(start, data) + + def __len__(self): + # return number of bytes we're holding + return sum([len(data) for (start,data) in self.spans]) + + def _dump(self): + # return iterator of sorted list of offsets, one per byte + for (start,data) in self.spans: + for i in range(start, start+len(data)): + yield i + + def get_spans(self): + return list(self.spans) + + def assert_invariants(self): + if not self.spans: + return + prev_start = self.spans[0][0] + prev_end = prev_start + len(self.spans[0][1]) + for start, data in self.spans[1:]: + if not start > prev_end: + # adjacent or overlapping: bad + print "ASSERTION FAILED", self.spans + raise AssertionError + + def get(self, start, length): + # returns a string of LENGTH, or None + #print "get", start, length, self.spans + end = start+length + for (s_start,s_data) in self.spans: + s_end = s_start+len(s_data) + #print " ",s_start,s_end + if s_start <= start < s_end: + # we want some data from this span. Because we maintain + # strictly merged and non-overlapping spans, everything we + # want must be in this span. + offset = start - s_start + if offset + length > len(s_data): + #print " None, span falls short" + return None # span falls short + #print " some", s_data[offset:offset+length] + return s_data[offset:offset+length] + if s_start >= end: + # we've gone too far: no further spans will overlap + #print " None, gone too far" + return None + #print " None, ran out of spans" + return None + + def add(self, start, data): + # first: walk through existing spans, find overlap, modify-in-place + # create list of new spans + # add new spans + # sort + # merge adjacent spans + #print "add", start, data, self.spans + end = start + len(data) + i = 0 + while len(data): + #print " loop", start, data, i, len(self.spans), self.spans + if i >= len(self.spans): + #print " append and done" + # append a last span + self.spans.append( (start, data) ) + break + (s_start,s_data) = self.spans[i] + # five basic cases: + # a: OLD b:OLDD c1:OLD c2:OLD d1:OLDD d2:OLD e: OLLDD + # NEW NEW NEW NEWW NEW NEW NEW + # + # we handle A by inserting a new segment (with "N") and looping, + # turning it into B or C. We handle B by replacing a prefix and + # terminating. We handle C (both c1 and c2) by replacing the + # segment (and, for c2, looping, turning it into A). We handle D + # by replacing a suffix (and, for d2, looping, turning it into + # A). We handle E by replacing the middle and terminating. + if start < s_start: + # case A: insert a new span, then loop with the remainder + #print " insert new psan" + s_len = s_start-start + self.spans.insert(i, (start, data[:s_len])) + i += 1 + start = s_start + data = data[s_len:] + continue + s_len = len(s_data) + s_end = s_start+s_len + if s_start <= start < s_end: + #print " modify this span", s_start, start, s_end + # we want to modify some data in this span: a prefix, a + # suffix, or the whole thing + if s_start == start: + if s_end <= end: + #print " replace whole segment" + # case C: replace this segment + self.spans[i] = (s_start, data[:s_len]) + i += 1 + start += s_len + data = data[s_len:] + # C2 is where len(data)>0 + continue + # case B: modify the prefix, retain the suffix + #print " modify prefix" + self.spans[i] = (s_start, data + s_data[len(data):]) + break + if start > s_start and end < s_end: + # case E: modify the middle + #print " modify middle" + prefix_len = start - s_start # we retain this much + suffix_len = s_end - end # and retain this much + newdata = s_data[:prefix_len] + data + s_data[-suffix_len:] + self.spans[i] = (s_start, newdata) + break + # case D: retain the prefix, modify the suffix + #print " modify suffix" + prefix_len = start - s_start # we retain this much + suffix_len = s_len - prefix_len # we replace this much + #print " ", s_data, prefix_len, suffix_len, s_len, data + self.spans[i] = (s_start, + s_data[:prefix_len] + data[:suffix_len]) + i += 1 + start += suffix_len + data = data[suffix_len:] + #print " now", start, data + # D2 is where len(data)>0 + continue + # else we're not there yet + #print " still looking" + i += 1 + continue + # now merge adjacent spans + #print " merging", self.spans + newspans = [] + for (s_start,s_data) in self.spans: + if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]), + s_start, len(s_data)): + newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data) + else: + newspans.append( (s_start, s_data) ) + self.spans = newspans + self.assert_invariants() + #print " done", self.spans + + def remove(self, start, length): + i = 0 + end = start + length + #print "remove", start, length, self.spans + while i < len(self.spans): + (s_start,s_data) = self.spans[i] + if s_start >= end: + # this segment is entirely right of the removed region, and + # all further segments are even further right. We're done. + break + s_len = len(s_data) + s_end = s_start + s_len + o = overlap(start, length, s_start, s_len) + if not o: + i += 1 + continue + o_start, o_len = o + o_end = o_start + o_len + if o_len == s_len: + # remove the whole segment + del self.spans[i] + continue + if o_start == s_start: + # remove a prefix, leaving the suffix from o_end to s_end + prefix_len = o_end - o_start + self.spans[i] = (o_end, s_data[prefix_len:]) + i += 1 + continue + elif o_end == s_end: + # remove a suffix, leaving the prefix from s_start to o_start + prefix_len = o_start - s_start + self.spans[i] = (s_start, s_data[:prefix_len]) + i += 1 + continue + # remove the middle, creating a new segment + # left is s_start:o_start, right is o_end:s_end + left_len = o_start - s_start + left = s_data[:left_len] + right_len = s_end - o_end + right = s_data[-right_len:] + self.spans[i] = (s_start, left) + self.spans.insert(i+1, (o_end, right)) + break + #print " done", self.spans + + def pop(self, start, length): + data = self.get(start, length) + if data: + self.remove(start, length) + return data