Sun Aug 1 05:43:09 GMT Daylight Time 2010 david-sarah@jacaranda.org * Brian's New Downloader, for testing in 1.8beta. New patches: [Brian's New Downloader, for testing in 1.8beta. david-sarah@jacaranda.org**20100801044309 Ignore-this: d9d3637d2fbdd007d857f49cffa688d0 ] { hunk ./Makefile 128 # quicktest-coverage" to do a unit test run with coverage-gathering enabled, # then use "make coverate-output-text" for a brief report, or "make # coverage-output" for a pretty HTML report. Also see "make .coverage.el" and -# misc/coding_helpers/coverage.el for emacs integration. +# misc/coding_tools/coverage.el for emacs integration. quicktest-coverage: rm -f .coverage hunk ./Makefile 137 coverage-output: rm -rf coverage-html - coverage html -d coverage-html + coverage html -i -d coverage-html $(COVERAGE_OMIT) cp .coverage coverage-html/coverage.data @echo "now point your browser at coverage-html/index.html" hunk ./Makefile 157 .PHONY: repl test-darcs-boringfile test-clean clean find-trailing-spaces .coverage.el: .coverage - $(PYTHON) misc/coding_helpers/coverage2el.py + $(PYTHON) misc/coding_tools/coverage2el.py # 'upload-coverage' is meant to be run with an UPLOAD_TARGET=host:/dir setting ifdef UPLOAD_TARGET hunk ./Makefile 181 pyflakes: $(PYTHON) -OOu `which pyflakes` src/allmydata |sort |uniq +check-umids: + $(PYTHON) misc/coding_tools/check-umids.py `find src/allmydata -name '*.py'` count-lines: @echo -n "files: " hunk ./misc/coding_tools/coverage.el 87 'face '(:box "red") ) ) - (message "Added annotations") + (message (format "Added annotations: %d uncovered lines" + (safe-length uncovered-code-lines))) ) ) (message "unable to find coverage for this file")) hunk ./misc/simulators/sizes.py 63 self.block_arity = 0 self.block_tree_depth = 0 self.block_overhead = 0 - self.bytes_until_some_data = 20 + share_size + self.bytes_until_some_data = 32 + share_size self.share_storage_overhead = 0 self.share_transmission_overhead = 0 hunk ./misc/simulators/sizes.py 69 elif mode == "beta": # k=num_blocks, d=1 - # each block has a 20-byte hash + # each block has a 32-byte hash self.block_arity = num_blocks self.block_tree_depth = 1 hunk ./misc/simulators/sizes.py 72 - self.block_overhead = 20 + self.block_overhead = 32 # the share has a list of hashes, one for each block self.share_storage_overhead = (self.block_overhead * num_blocks) hunk ./misc/simulators/sizes.py 78 # we can get away with not sending the hash of the share that # we're sending in full, once - self.share_transmission_overhead = self.share_storage_overhead - 20 + self.share_transmission_overhead = self.share_storage_overhead - 32 # we must get the whole list (so it can be validated) before # any data can be validated self.bytes_until_some_data = (self.share_transmission_overhead + hunk ./misc/simulators/sizes.py 92 # to make things easier, we make the pessimistic assumption that # we have to store hashes for all the empty places in the tree # (when the number of shares is not an exact exponent of k) - self.block_overhead = 20 + self.block_overhead = 32 # the block hashes are organized into a k-ary tree, which # means storing (and eventually transmitting) more hashes. This # count includes all the low-level share hashes and the root. hunk ./misc/simulators/sizes.py 101 #print "num_leaves", num_leaves #print "hash_nodes", hash_nodes # the storage overhead is this - self.share_storage_overhead = 20 * (hash_nodes - 1) + self.share_storage_overhead = 32 * (hash_nodes - 1) # the transmission overhead is smaller: if we actually transmit # every block, we don't have to transmit 1/k of the # lowest-level block hashes, and we don't have to transmit the hunk ./misc/simulators/sizes.py 106 # root because it was already sent with the share-level hash tree - self.share_transmission_overhead = 20 * (hash_nodes + self.share_transmission_overhead = 32 * (hash_nodes - 1 # the root - num_leaves / k) # we must get a full sibling hash chain before we can validate hunk ./misc/simulators/sizes.py 112 # any data sibling_length = d * (k-1) - self.bytes_until_some_data = 20 * sibling_length + block_size + self.bytes_until_some_data = 32 * sibling_length + block_size hunk ./misc/simulators/storage-overhead.py 4 #!/usr/bin/env python import sys, math -from allmydata import upload, uri, encode, storage +from allmydata import uri, storage +from allmydata.immutable import upload +from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE from allmydata.util import mathutil def roundup(size, blocksize=4096): hunk ./misc/simulators/storage-overhead.py 27 def tell(self): return self.fp -def calc(filesize, params=(3,7,10), segsize=encode.Encoder.MAX_SEGMENT_SIZE): +def calc(filesize, params=(3,7,10), segsize=DEFAULT_MAX_SEGMENT_SIZE): num_shares = params[2] if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD: hunk ./misc/simulators/storage-overhead.py 30 - urisize = len(uri.pack_lit("A"*filesize)) + urisize = len(uri.LiteralFileURI("A"*filesize).to_string()) sharesize = 0 sharespace = 0 else: hunk ./misc/simulators/storage-overhead.py 34 - u = upload.FileUploader(None) + u = upload.FileUploader(None) # XXX changed u.set_params(params) # unfortunately, Encoder doesn't currently lend itself to answering # this question without measuring a filesize, so we have to give it a hunk ./src/allmydata/client.py 1 -import os, stat, time +import os, stat, time, weakref from allmydata.interfaces import RIStorageServer from allmydata import node hunk ./src/allmydata/client.py 7 from zope.interface import implements from twisted.internet import reactor, defer +from twisted.application import service from twisted.application.internet import TimerService from foolscap.api import Referenceable from pycryptopp.publickey import rsa hunk ./src/allmydata/client.py 16 from allmydata.storage.server import StorageServer from allmydata import storage_client from allmydata.immutable.upload import Uploader -from allmydata.immutable.download import Downloader from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient hunk ./src/allmydata/client.py 19 -from allmydata.util import hashutil, base32, pollmixin, cachedir, log +from allmydata.util import hashutil, base32, pollmixin, log from allmydata.util.encodingutil import get_filesystem_encoding from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date hunk ./src/allmydata/client.py 98 verifier = signer.get_verifying_key() return defer.succeed( (verifier, signer) ) +class Terminator(service.Service): + def __init__(self): + self._clients = weakref.WeakKeyDictionary() + def register(self, c): + self._clients[c] = None + def stopService(self): + for c in self._clients: + c.stop() + return service.Service.stopService(self) + class Client(node.Node, pollmixin.PollMixin): implements(IStatsProducer) hunk ./src/allmydata/client.py 292 self.init_client_storage_broker() self.history = History(self.stats_provider) + self.terminator = Terminator() + self.terminator.setServiceParent(self) self.add_service(Uploader(helper_furl, self.stats_provider)) hunk ./src/allmydata/client.py 295 - download_cachedir = os.path.join(self.basedir, - "private", "cache", "download") - self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir) - self.download_cache_dirman.setServiceParent(self) - self.downloader = Downloader(self.storage_broker, self.stats_provider) self.init_stub_client() self.init_nodemaker() hunk ./src/allmydata/client.py 353 self._secret_holder, self.get_history(), self.getServiceNamed("uploader"), - self.downloader, - self.download_cache_dirman, + self.terminator, self.get_encoding_parameters(), self._key_generator) hunk ./src/allmydata/immutable/checker.py 1 +from zope.interface import implements +from twisted.internet import defer from foolscap.api import DeadReferenceError, RemoteException hunk ./src/allmydata/immutable/checker.py 4 +from allmydata import hashtree, codec, uri +from allmydata.interfaces import IValidatedThingProxy, IVerifierURI from allmydata.hashtree import IncompleteHashTree from allmydata.check_results import CheckResults hunk ./src/allmydata/immutable/checker.py 8 -from allmydata.immutable import download from allmydata.uri import CHKFileVerifierURI from allmydata.util.assertutil import precondition hunk ./src/allmydata/immutable/checker.py 10 -from allmydata.util import base32, idlib, deferredutil, dictutil, log +from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil from allmydata.util.hashutil import file_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \ hunk ./src/allmydata/immutable/checker.py 13 - bucket_cancel_secret_hash + bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \ + block_hash from allmydata.immutable import layout hunk ./src/allmydata/immutable/checker.py 18 +class IntegrityCheckReject(Exception): + pass +class BadURIExtension(IntegrityCheckReject): + pass +class BadURIExtensionHashValue(IntegrityCheckReject): + pass +class BadOrMissingHash(IntegrityCheckReject): + pass +class UnsupportedErasureCodec(BadURIExtension): + pass + +class ValidatedExtendedURIProxy: + implements(IValidatedThingProxy) + """ I am a front-end for a remote UEB (using a local ReadBucketProxy), + responsible for retrieving and validating the elements from the UEB.""" + + def __init__(self, readbucketproxy, verifycap, fetch_failures=None): + # fetch_failures is for debugging -- see test_encode.py + self._fetch_failures = fetch_failures + self._readbucketproxy = readbucketproxy + precondition(IVerifierURI.providedBy(verifycap), verifycap) + self._verifycap = verifycap + + # required + self.segment_size = None + self.crypttext_root_hash = None + self.share_root_hash = None + + # computed + self.block_size = None + self.share_size = None + self.num_segments = None + self.tail_data_size = None + self.tail_segment_size = None + + # optional + self.crypttext_hash = None + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string()) + + def _check_integrity(self, data): + h = uri_extension_hash(data) + if h != self._verifycap.uri_extension_hash: + msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % + (self._readbucketproxy, + base32.b2a(self._verifycap.uri_extension_hash), + base32.b2a(h))) + if self._fetch_failures is not None: + self._fetch_failures["uri_extension"] += 1 + raise BadURIExtensionHashValue(msg) + else: + return data + + def _parse_and_validate(self, data): + self.share_size = mathutil.div_ceil(self._verifycap.size, + self._verifycap.needed_shares) + + d = uri.unpack_extension(data) + + # There are several kinds of things that can be found in a UEB. + # First, things that we really need to learn from the UEB in order to + # do this download. Next: things which are optional but not redundant + # -- if they are present in the UEB they will get used. Next, things + # that are optional and redundant. These things are required to be + # consistent: they don't have to be in the UEB, but if they are in + # the UEB then they will be checked for consistency with the + # already-known facts, and if they are inconsistent then an exception + # will be raised. These things aren't actually used -- they are just + # tested for consistency and ignored. Finally: things which are + # deprecated -- they ought not be in the UEB at all, and if they are + # present then a warning will be logged but they are otherwise + # ignored. + + # First, things that we really need to learn from the UEB: + # segment_size, crypttext_root_hash, and share_root_hash. + self.segment_size = d['segment_size'] + + self.block_size = mathutil.div_ceil(self.segment_size, + self._verifycap.needed_shares) + self.num_segments = mathutil.div_ceil(self._verifycap.size, + self.segment_size) + + self.tail_data_size = self._verifycap.size % self.segment_size + if not self.tail_data_size: + self.tail_data_size = self.segment_size + # padding for erasure code + self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, + self._verifycap.needed_shares) + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. + self.crypttext_root_hash = d['crypttext_root_hash'] + + self.share_root_hash = d['share_root_hash'] + + + # Next: things that are optional and not redundant: crypttext_hash + if d.has_key('crypttext_hash'): + self.crypttext_hash = d['crypttext_hash'] + if len(self.crypttext_hash) != CRYPTO_VAL_SIZE: + raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) + + + # Next: things that are optional, redundant, and required to be + # consistent: codec_name, codec_params, tail_codec_params, + # num_segments, size, needed_shares, total_shares + if d.has_key('codec_name'): + if d['codec_name'] != "crs": + raise UnsupportedErasureCodec(d['codec_name']) + + if d.has_key('codec_params'): + ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) + if ucpss != self.segment_size: + raise BadURIExtension("inconsistent erasure code params: " + "ucpss: %s != self.segment_size: %s" % + (ucpss, self.segment_size)) + if ucpns != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " + "self._verifycap.needed_shares: %s" % + (ucpns, self._verifycap.needed_shares)) + if ucpts != self._verifycap.total_shares: + raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " + "self._verifycap.total_shares: %s" % + (ucpts, self._verifycap.total_shares)) + + if d.has_key('tail_codec_params'): + utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) + if utcpss != self.tail_segment_size: + raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " + "self.tail_segment_size: %s, self._verifycap.size: %s, " + "self.segment_size: %s, self._verifycap.needed_shares: %s" + % (utcpss, self.tail_segment_size, self._verifycap.size, + self.segment_size, self._verifycap.needed_shares)) + if utcpns != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " + "self._verifycap.needed_shares: %s" % (utcpns, + self._verifycap.needed_shares)) + if utcpts != self._verifycap.total_shares: + raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " + "self._verifycap.total_shares: %s" % (utcpts, + self._verifycap.total_shares)) + + if d.has_key('num_segments'): + if d['num_segments'] != self.num_segments: + raise BadURIExtension("inconsistent num_segments: size: %s, " + "segment_size: %s, computed_num_segments: %s, " + "ueb_num_segments: %s" % (self._verifycap.size, + self.segment_size, + self.num_segments, d['num_segments'])) + + if d.has_key('size'): + if d['size'] != self._verifycap.size: + raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % + (self._verifycap.size, d['size'])) + + if d.has_key('needed_shares'): + if d['needed_shares'] != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " + "needed shares: %s" % (self._verifycap.total_shares, + d['needed_shares'])) + + if d.has_key('total_shares'): + if d['total_shares'] != self._verifycap.total_shares: + raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " + "total shares: %s" % (self._verifycap.total_shares, + d['total_shares'])) + + # Finally, things that are deprecated and ignored: plaintext_hash, + # plaintext_root_hash + if d.get('plaintext_hash'): + log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " + "and is no longer used. Ignoring. %s" % (self,)) + if d.get('plaintext_root_hash'): + log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " + "reasons and is no longer used. Ignoring. %s" % (self,)) + + return self + + def start(self): + """Fetch the UEB from bucket, compare its hash to the hash from + verifycap, then parse it. Returns a deferred which is called back + with self once the fetch is successful, or is erred back if it + fails.""" + d = self._readbucketproxy.get_uri_extension() + d.addCallback(self._check_integrity) + d.addCallback(self._parse_and_validate) + return d + +class ValidatedReadBucketProxy(log.PrefixingLogMixin): + """I am a front-end for a remote storage bucket, responsible for + retrieving and validating data from that bucket. + + My get_block() method is used by BlockDownloaders. + """ + + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, + block_size, share_size): + """ share_hash_tree is required to have already been initialized with + the root hash (the number-0 hash), using the share_root_hash from the + UEB""" + precondition(share_hash_tree[0] is not None, share_hash_tree) + prefix = "%d-%s-%s" % (sharenum, bucket, + base32.b2a_l(share_hash_tree[0][:8], 60)) + log.PrefixingLogMixin.__init__(self, + facility="tahoe.immutable.download", + prefix=prefix) + self.sharenum = sharenum + self.bucket = bucket + self.share_hash_tree = share_hash_tree + self.num_blocks = num_blocks + self.block_size = block_size + self.share_size = share_size + self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) + + def get_all_sharehashes(self): + """Retrieve and validate all the share-hash-tree nodes that are + included in this share, regardless of whether we need them to + validate the share or not. Each share contains a minimal Merkle tree + chain, but there is lots of overlap, so usually we'll be using hashes + from other shares and not reading every single hash from this share. + The Verifier uses this function to read and validate every single + hash from this share. + + Call this (and wait for the Deferred it returns to fire) before + calling get_block() for the first time: this lets us check that the + share share contains enough hashes to validate its own data, and + avoids downloading any share hash twice. + + I return a Deferred which errbacks upon failure, probably with + BadOrMissingHash.""" + + d = self.bucket.get_share_hashes() + def _got_share_hashes(sh): + sharehashes = dict(sh) + try: + self.share_hash_tree.set_hashes(sharehashes) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_share_hashes) + return d + + def get_all_blockhashes(self): + """Retrieve and validate all the block-hash-tree nodes that are + included in this share. Each share contains a full Merkle tree, but + we usually only fetch the minimal subset necessary for any particular + block. This function fetches everything at once. The Verifier uses + this function to validate the block hash tree. + + Call this (and wait for the Deferred it returns to fire) after + calling get_all_sharehashes() and before calling get_block() for the + first time: this lets us check that the share contains all block + hashes and avoids downloading them multiple times. + + I return a Deferred which errbacks upon failure, probably with + BadOrMissingHash. + """ + + # get_block_hashes(anything) currently always returns everything + needed = list(range(len(self.block_hash_tree))) + d = self.bucket.get_block_hashes(needed) + def _got_block_hashes(blockhashes): + if len(blockhashes) < len(self.block_hash_tree): + raise BadOrMissingHash() + bh = dict(enumerate(blockhashes)) + + try: + self.block_hash_tree.set_hashes(bh) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_block_hashes) + return d + + def get_all_crypttext_hashes(self, crypttext_hash_tree): + """Retrieve and validate all the crypttext-hash-tree nodes that are + in this share. Normally we don't look at these at all: the download + process fetches them incrementally as needed to validate each segment + of ciphertext. But this is a convenient place to give the Verifier a + function to validate all of these at once. + + Call this with a new hashtree object for each share, initialized with + the crypttext hash tree root. I return a Deferred which errbacks upon + failure, probably with BadOrMissingHash. + """ + + # get_crypttext_hashes() always returns everything + d = self.bucket.get_crypttext_hashes() + def _got_crypttext_hashes(hashes): + if len(hashes) < len(crypttext_hash_tree): + raise BadOrMissingHash() + ct_hashes = dict(enumerate(hashes)) + try: + crypttext_hash_tree.set_hashes(ct_hashes) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_crypttext_hashes) + return d + + def get_block(self, blocknum): + # the first time we use this bucket, we need to fetch enough elements + # of the share hash tree to validate it from our share hash up to the + # hashroot. + if self.share_hash_tree.needed_hashes(self.sharenum): + d1 = self.bucket.get_share_hashes() + else: + d1 = defer.succeed([]) + + # We might need to grab some elements of our block hash tree, to + # validate the requested block up to the share hash. + blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) + # We don't need the root of the block hash tree, as that comes in the + # share tree. + blockhashesneeded.discard(0) + d2 = self.bucket.get_block_hashes(blockhashesneeded) + + if blocknum < self.num_blocks-1: + thisblocksize = self.block_size + else: + thisblocksize = self.share_size % self.block_size + if thisblocksize == 0: + thisblocksize = self.block_size + d3 = self.bucket.get_block_data(blocknum, + self.block_size, thisblocksize) + + dl = deferredutil.gatherResults([d1, d2, d3]) + dl.addCallback(self._got_data, blocknum) + return dl + + def _got_data(self, results, blocknum): + precondition(blocknum < self.num_blocks, + self, blocknum, self.num_blocks) + sharehashes, blockhashes, blockdata = results + try: + sharehashes = dict(sharehashes) + except ValueError, le: + le.args = tuple(le.args + (sharehashes,)) + raise + blockhashes = dict(enumerate(blockhashes)) + + candidate_share_hash = None # in case we log it in the except block below + blockhash = None # in case we log it in the except block below + + try: + if self.share_hash_tree.needed_hashes(self.sharenum): + # This will raise exception if the values being passed do not + # match the root node of self.share_hash_tree. + try: + self.share_hash_tree.set_hashes(sharehashes) + except IndexError, le: + # Weird -- sharehashes contained index numbers outside of + # the range that fit into this hash tree. + raise BadOrMissingHash(le) + + # To validate a block we need the root of the block hash tree, + # which is also one of the leafs of the share hash tree, and is + # called "the share hash". + if not self.block_hash_tree[0]: # empty -- no root node yet + # Get the share hash from the share hash tree. + share_hash = self.share_hash_tree.get_leaf(self.sharenum) + if not share_hash: + # No root node in block_hash_tree and also the share hash + # wasn't sent by the server. + raise hashtree.NotEnoughHashesError + self.block_hash_tree.set_hashes({0: share_hash}) + + if self.block_hash_tree.needed_hashes(blocknum): + self.block_hash_tree.set_hashes(blockhashes) + + blockhash = block_hash(blockdata) + self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) + #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " + # "%r .. %r: %s" % + # (self.sharenum, blocknum, len(blockdata), + # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) + + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + # log.WEIRD: indicates undetected disk/network error, or more + # likely a programming error + self.log("hash failure in block=%d, shnum=%d on %s" % + (blocknum, self.sharenum, self.bucket)) + if self.block_hash_tree.needed_hashes(blocknum): + self.log(""" failure occurred when checking the block_hash_tree. + This suggests that either the block data was bad, or that the + block hashes we received along with it were bad.""") + else: + self.log(""" the failure probably occurred when checking the + share_hash_tree, which suggests that the share hashes we + received from the remote peer were bad.""") + self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) + self.log(" block length: %d" % len(blockdata)) + self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) + if len(blockdata) < 100: + self.log(" block data: %r" % (blockdata,)) + else: + self.log(" block data start/end: %r .. %r" % + (blockdata[:50], blockdata[-50:])) + self.log(" share hash tree:\n" + self.share_hash_tree.dump()) + self.log(" block hash tree:\n" + self.block_hash_tree.dump()) + lines = [] + for i,h in sorted(sharehashes.items()): + lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) + self.log(" sharehashes:\n" + "\n".join(lines) + "\n") + lines = [] + for i,h in blockhashes.items(): + lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) + log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") + raise BadOrMissingHash(le) + + # If we made it here, the block is good. If the hash trees didn't + # like what they saw, they would have raised a BadHashError, causing + # our caller to see a Failure and thus ignore this block (as well as + # dropping this bucket). + return blockdata + + class Checker(log.PrefixingLogMixin): """I query all servers to see if M uniquely-numbered shares are available. hunk ./src/allmydata/immutable/checker.py 516 level = log.WEIRD if f.check(DeadReferenceError): level = log.UNUSUAL - self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ") + self.log("failure from server on 'get_buckets' the REMOTE failure was:", + facility="tahoe.immutable.checker", + failure=f, level=level, umid="AX7wZQ") return ({}, serverid, False) d.addCallbacks(_wrap_results, _trap_errs) hunk ./src/allmydata/immutable/checker.py 579 vcap = self._verifycap b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index()) - veup = download.ValidatedExtendedURIProxy(b, vcap) + veup = ValidatedExtendedURIProxy(b, vcap) d = veup.start() def _got_ueb(vup): hunk ./src/allmydata/immutable/checker.py 586 share_hash_tree = IncompleteHashTree(vcap.total_shares) share_hash_tree.set_hashes({0: vup.share_root_hash}) - vrbp = download.ValidatedReadBucketProxy(sharenum, b, - share_hash_tree, - vup.num_segments, - vup.block_size, - vup.share_size) + vrbp = ValidatedReadBucketProxy(sharenum, b, + share_hash_tree, + vup.num_segments, + vup.block_size, + vup.share_size) # note: normal download doesn't use get_all_sharehashes(), # because it gets more data than necessary. We've discussed the hunk ./src/allmydata/immutable/checker.py 649 return (False, sharenum, 'incompatible') elif f.check(layout.LayoutInvalid, layout.RidiculouslyLargeURIExtensionBlock, - download.BadOrMissingHash, - download.BadURIExtensionHashValue): + BadOrMissingHash, + BadURIExtensionHashValue): return (False, sharenum, 'corrupt') # if it wasn't one of those reasons, re-raise the error hunk ./src/allmydata/immutable/download.py 1 -import random, weakref, itertools, time -from zope.interface import implements -from twisted.internet import defer, reactor -from twisted.internet.interfaces import IPushProducer, IConsumer -from foolscap.api import DeadReferenceError, RemoteException, eventually - -from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib -from allmydata.util.assertutil import _assert, precondition -from allmydata import codec, hashtree, uri -from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \ - IDownloadStatus, IDownloadResults, IValidatedThingProxy, \ - IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \ - UnableToFetchCriticalDownloadDataError -from allmydata.immutable import layout -from allmydata.monitor import Monitor -from pycryptopp.cipher.aes import AES - -class IntegrityCheckReject(Exception): - pass - -class BadURIExtensionHashValue(IntegrityCheckReject): - pass -class BadURIExtension(IntegrityCheckReject): - pass -class UnsupportedErasureCodec(BadURIExtension): - pass -class BadCrypttextHashValue(IntegrityCheckReject): - pass -class BadOrMissingHash(IntegrityCheckReject): - pass - -class DownloadStopped(Exception): - pass - -class DownloadResults: - implements(IDownloadResults) - - def __init__(self): - self.servers_used = set() - self.server_problems = {} - self.servermap = {} - self.timings = {} - self.file_size = None - -class DecryptingTarget(log.PrefixingLogMixin): - implements(IDownloadTarget, IConsumer) - def __init__(self, target, key, _log_msg_id=None): - precondition(IDownloadTarget.providedBy(target), target) - self.target = target - self._decryptor = AES(key) - prefix = str(target) - log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix) - # methods to satisfy the IConsumer interface - def registerProducer(self, producer, streaming): - if IConsumer.providedBy(self.target): - self.target.registerProducer(producer, streaming) - def unregisterProducer(self): - if IConsumer.providedBy(self.target): - self.target.unregisterProducer() - def write(self, ciphertext): - plaintext = self._decryptor.process(ciphertext) - self.target.write(plaintext) - def open(self, size): - self.target.open(size) - def close(self): - self.target.close() - def finish(self): - return self.target.finish() - # The following methods is just to pass through to the next target, and - # just because that target might be a repairer.DownUpConnector, and just - # because the current CHKUpload object expects to find the storage index - # in its Uploadable. - def set_storageindex(self, storageindex): - self.target.set_storageindex(storageindex) - def set_encodingparams(self, encodingparams): - self.target.set_encodingparams(encodingparams) - -class ValidatedThingObtainer: - def __init__(self, validatedthingproxies, debugname, log_id): - self._validatedthingproxies = validatedthingproxies - self._debugname = debugname - self._log_id = log_id - - def _bad(self, f, validatedthingproxy): - f.trap(RemoteException, DeadReferenceError, - IntegrityCheckReject, layout.LayoutInvalid, - layout.ShareVersionIncompatible) - level = log.WEIRD - if f.check(DeadReferenceError): - level = log.UNUSUAL - elif f.check(RemoteException): - level = log.WEIRD - else: - level = log.SCARY - log.msg(parent=self._log_id, facility="tahoe.immutable.download", - format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed", - op=self._debugname, validatedthingproxy=str(validatedthingproxy), - failure=f, level=level, umid="JGXxBA") - if not self._validatedthingproxies: - raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,)) - # try again with a different one - d = self._try_the_next_one() - return d - - def _try_the_next_one(self): - vtp = self._validatedthingproxies.pop(0) - # start() obtains, validates, and callsback-with the thing or else - # errbacks - d = vtp.start() - d.addErrback(self._bad, vtp) - return d - - def start(self): - return self._try_the_next_one() - -class ValidatedCrypttextHashTreeProxy: - implements(IValidatedThingProxy) - """ I am a front-end for a remote crypttext hash tree using a local - ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the - Validated Thing protocol (i.e., I have a start() method that fires with - self once I get a valid one).""" - def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, - fetch_failures=None): - # fetch_failures is for debugging -- see test_encode.py - self._readbucketproxy = readbucketproxy - self._num_segments = num_segments - self._fetch_failures = fetch_failures - self._crypttext_hash_tree = crypttext_hash_tree - - def _validate(self, proposal): - ct_hashes = dict(list(enumerate(proposal))) - try: - self._crypttext_hash_tree.set_hashes(ct_hashes) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - if self._fetch_failures is not None: - self._fetch_failures["crypttext_hash_tree"] += 1 - raise BadOrMissingHash(le) - # If we now have enough of the crypttext hash tree to integrity-check - # *any* segment of ciphertext, then we are done. TODO: It would have - # better alacrity if we downloaded only part of the crypttext hash - # tree at a time. - for segnum in range(self._num_segments): - if self._crypttext_hash_tree.needed_hashes(segnum): - raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,)) - return self - - def start(self): - d = self._readbucketproxy.get_crypttext_hashes() - d.addCallback(self._validate) - return d - -class ValidatedExtendedURIProxy: - implements(IValidatedThingProxy) - """ I am a front-end for a remote UEB (using a local ReadBucketProxy), - responsible for retrieving and validating the elements from the UEB.""" - - def __init__(self, readbucketproxy, verifycap, fetch_failures=None): - # fetch_failures is for debugging -- see test_encode.py - self._fetch_failures = fetch_failures - self._readbucketproxy = readbucketproxy - precondition(IVerifierURI.providedBy(verifycap), verifycap) - self._verifycap = verifycap - - # required - self.segment_size = None - self.crypttext_root_hash = None - self.share_root_hash = None - - # computed - self.block_size = None - self.share_size = None - self.num_segments = None - self.tail_data_size = None - self.tail_segment_size = None - - # optional - self.crypttext_hash = None - - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string()) - - def _check_integrity(self, data): - h = hashutil.uri_extension_hash(data) - if h != self._verifycap.uri_extension_hash: - msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % - (self._readbucketproxy, - base32.b2a(self._verifycap.uri_extension_hash), - base32.b2a(h))) - if self._fetch_failures is not None: - self._fetch_failures["uri_extension"] += 1 - raise BadURIExtensionHashValue(msg) - else: - return data - - def _parse_and_validate(self, data): - self.share_size = mathutil.div_ceil(self._verifycap.size, - self._verifycap.needed_shares) - - d = uri.unpack_extension(data) - - # There are several kinds of things that can be found in a UEB. - # First, things that we really need to learn from the UEB in order to - # do this download. Next: things which are optional but not redundant - # -- if they are present in the UEB they will get used. Next, things - # that are optional and redundant. These things are required to be - # consistent: they don't have to be in the UEB, but if they are in - # the UEB then they will be checked for consistency with the - # already-known facts, and if they are inconsistent then an exception - # will be raised. These things aren't actually used -- they are just - # tested for consistency and ignored. Finally: things which are - # deprecated -- they ought not be in the UEB at all, and if they are - # present then a warning will be logged but they are otherwise - # ignored. - - # First, things that we really need to learn from the UEB: - # segment_size, crypttext_root_hash, and share_root_hash. - self.segment_size = d['segment_size'] - - self.block_size = mathutil.div_ceil(self.segment_size, - self._verifycap.needed_shares) - self.num_segments = mathutil.div_ceil(self._verifycap.size, - self.segment_size) - - self.tail_data_size = self._verifycap.size % self.segment_size - if not self.tail_data_size: - self.tail_data_size = self.segment_size - # padding for erasure code - self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, - self._verifycap.needed_shares) - - # Ciphertext hash tree root is mandatory, so that there is at most - # one ciphertext that matches this read-cap or verify-cap. The - # integrity check on the shares is not sufficient to prevent the - # original encoder from creating some shares of file A and other - # shares of file B. - self.crypttext_root_hash = d['crypttext_root_hash'] - - self.share_root_hash = d['share_root_hash'] - - - # Next: things that are optional and not redundant: crypttext_hash - if d.has_key('crypttext_hash'): - self.crypttext_hash = d['crypttext_hash'] - if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE: - raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) - - - # Next: things that are optional, redundant, and required to be - # consistent: codec_name, codec_params, tail_codec_params, - # num_segments, size, needed_shares, total_shares - if d.has_key('codec_name'): - if d['codec_name'] != "crs": - raise UnsupportedErasureCodec(d['codec_name']) - - if d.has_key('codec_params'): - ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) - if ucpss != self.segment_size: - raise BadURIExtension("inconsistent erasure code params: " - "ucpss: %s != self.segment_size: %s" % - (ucpss, self.segment_size)) - if ucpns != self._verifycap.needed_shares: - raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " - "self._verifycap.needed_shares: %s" % - (ucpns, self._verifycap.needed_shares)) - if ucpts != self._verifycap.total_shares: - raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " - "self._verifycap.total_shares: %s" % - (ucpts, self._verifycap.total_shares)) - - if d.has_key('tail_codec_params'): - utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) - if utcpss != self.tail_segment_size: - raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " - "self.tail_segment_size: %s, self._verifycap.size: %s, " - "self.segment_size: %s, self._verifycap.needed_shares: %s" - % (utcpss, self.tail_segment_size, self._verifycap.size, - self.segment_size, self._verifycap.needed_shares)) - if utcpns != self._verifycap.needed_shares: - raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " - "self._verifycap.needed_shares: %s" % (utcpns, - self._verifycap.needed_shares)) - if utcpts != self._verifycap.total_shares: - raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " - "self._verifycap.total_shares: %s" % (utcpts, - self._verifycap.total_shares)) - - if d.has_key('num_segments'): - if d['num_segments'] != self.num_segments: - raise BadURIExtension("inconsistent num_segments: size: %s, " - "segment_size: %s, computed_num_segments: %s, " - "ueb_num_segments: %s" % (self._verifycap.size, - self.segment_size, - self.num_segments, d['num_segments'])) - - if d.has_key('size'): - if d['size'] != self._verifycap.size: - raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % - (self._verifycap.size, d['size'])) - - if d.has_key('needed_shares'): - if d['needed_shares'] != self._verifycap.needed_shares: - raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " - "needed shares: %s" % (self._verifycap.total_shares, - d['needed_shares'])) - - if d.has_key('total_shares'): - if d['total_shares'] != self._verifycap.total_shares: - raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " - "total shares: %s" % (self._verifycap.total_shares, - d['total_shares'])) - - # Finally, things that are deprecated and ignored: plaintext_hash, - # plaintext_root_hash - if d.get('plaintext_hash'): - log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " - "and is no longer used. Ignoring. %s" % (self,)) - if d.get('plaintext_root_hash'): - log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " - "reasons and is no longer used. Ignoring. %s" % (self,)) - - return self - - def start(self): - """Fetch the UEB from bucket, compare its hash to the hash from - verifycap, then parse it. Returns a deferred which is called back - with self once the fetch is successful, or is erred back if it - fails.""" - d = self._readbucketproxy.get_uri_extension() - d.addCallback(self._check_integrity) - d.addCallback(self._parse_and_validate) - return d - -class ValidatedReadBucketProxy(log.PrefixingLogMixin): - """I am a front-end for a remote storage bucket, responsible for - retrieving and validating data from that bucket. - - My get_block() method is used by BlockDownloaders. - """ - - def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, - block_size, share_size): - """ share_hash_tree is required to have already been initialized with - the root hash (the number-0 hash), using the share_root_hash from the - UEB""" - precondition(share_hash_tree[0] is not None, share_hash_tree) - prefix = "%d-%s-%s" % (sharenum, bucket, - base32.b2a_l(share_hash_tree[0][:8], 60)) - log.PrefixingLogMixin.__init__(self, - facility="tahoe.immutable.download", - prefix=prefix) - self.sharenum = sharenum - self.bucket = bucket - self.share_hash_tree = share_hash_tree - self.num_blocks = num_blocks - self.block_size = block_size - self.share_size = share_size - self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) - - def get_all_sharehashes(self): - """Retrieve and validate all the share-hash-tree nodes that are - included in this share, regardless of whether we need them to - validate the share or not. Each share contains a minimal Merkle tree - chain, but there is lots of overlap, so usually we'll be using hashes - from other shares and not reading every single hash from this share. - The Verifier uses this function to read and validate every single - hash from this share. - - Call this (and wait for the Deferred it returns to fire) before - calling get_block() for the first time: this lets us check that the - share share contains enough hashes to validate its own data, and - avoids downloading any share hash twice. - - I return a Deferred which errbacks upon failure, probably with - BadOrMissingHash.""" - - d = self.bucket.get_share_hashes() - def _got_share_hashes(sh): - sharehashes = dict(sh) - try: - self.share_hash_tree.set_hashes(sharehashes) - except IndexError, le: - raise BadOrMissingHash(le) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - raise BadOrMissingHash(le) - d.addCallback(_got_share_hashes) - return d - - def get_all_blockhashes(self): - """Retrieve and validate all the block-hash-tree nodes that are - included in this share. Each share contains a full Merkle tree, but - we usually only fetch the minimal subset necessary for any particular - block. This function fetches everything at once. The Verifier uses - this function to validate the block hash tree. - - Call this (and wait for the Deferred it returns to fire) after - calling get_all_sharehashes() and before calling get_block() for the - first time: this lets us check that the share contains all block - hashes and avoids downloading them multiple times. - - I return a Deferred which errbacks upon failure, probably with - BadOrMissingHash. - """ - - # get_block_hashes(anything) currently always returns everything - needed = list(range(len(self.block_hash_tree))) - d = self.bucket.get_block_hashes(needed) - def _got_block_hashes(blockhashes): - if len(blockhashes) < len(self.block_hash_tree): - raise BadOrMissingHash() - bh = dict(enumerate(blockhashes)) - - try: - self.block_hash_tree.set_hashes(bh) - except IndexError, le: - raise BadOrMissingHash(le) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - raise BadOrMissingHash(le) - d.addCallback(_got_block_hashes) - return d - - def get_all_crypttext_hashes(self, crypttext_hash_tree): - """Retrieve and validate all the crypttext-hash-tree nodes that are - in this share. Normally we don't look at these at all: the download - process fetches them incrementally as needed to validate each segment - of ciphertext. But this is a convenient place to give the Verifier a - function to validate all of these at once. - - Call this with a new hashtree object for each share, initialized with - the crypttext hash tree root. I return a Deferred which errbacks upon - failure, probably with BadOrMissingHash. - """ - - # get_crypttext_hashes() always returns everything - d = self.bucket.get_crypttext_hashes() - def _got_crypttext_hashes(hashes): - if len(hashes) < len(crypttext_hash_tree): - raise BadOrMissingHash() - ct_hashes = dict(enumerate(hashes)) - try: - crypttext_hash_tree.set_hashes(ct_hashes) - except IndexError, le: - raise BadOrMissingHash(le) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - raise BadOrMissingHash(le) - d.addCallback(_got_crypttext_hashes) - return d - - def get_block(self, blocknum): - # the first time we use this bucket, we need to fetch enough elements - # of the share hash tree to validate it from our share hash up to the - # hashroot. - if self.share_hash_tree.needed_hashes(self.sharenum): - d1 = self.bucket.get_share_hashes() - else: - d1 = defer.succeed([]) - - # We might need to grab some elements of our block hash tree, to - # validate the requested block up to the share hash. - blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) - # We don't need the root of the block hash tree, as that comes in the - # share tree. - blockhashesneeded.discard(0) - d2 = self.bucket.get_block_hashes(blockhashesneeded) - - if blocknum < self.num_blocks-1: - thisblocksize = self.block_size - else: - thisblocksize = self.share_size % self.block_size - if thisblocksize == 0: - thisblocksize = self.block_size - d3 = self.bucket.get_block_data(blocknum, - self.block_size, thisblocksize) - - dl = deferredutil.gatherResults([d1, d2, d3]) - dl.addCallback(self._got_data, blocknum) - return dl - - def _got_data(self, results, blocknum): - precondition(blocknum < self.num_blocks, - self, blocknum, self.num_blocks) - sharehashes, blockhashes, blockdata = results - try: - sharehashes = dict(sharehashes) - except ValueError, le: - le.args = tuple(le.args + (sharehashes,)) - raise - blockhashes = dict(enumerate(blockhashes)) - - candidate_share_hash = None # in case we log it in the except block below - blockhash = None # in case we log it in the except block below - - try: - if self.share_hash_tree.needed_hashes(self.sharenum): - # This will raise exception if the values being passed do not - # match the root node of self.share_hash_tree. - try: - self.share_hash_tree.set_hashes(sharehashes) - except IndexError, le: - # Weird -- sharehashes contained index numbers outside of - # the range that fit into this hash tree. - raise BadOrMissingHash(le) - - # To validate a block we need the root of the block hash tree, - # which is also one of the leafs of the share hash tree, and is - # called "the share hash". - if not self.block_hash_tree[0]: # empty -- no root node yet - # Get the share hash from the share hash tree. - share_hash = self.share_hash_tree.get_leaf(self.sharenum) - if not share_hash: - # No root node in block_hash_tree and also the share hash - # wasn't sent by the server. - raise hashtree.NotEnoughHashesError - self.block_hash_tree.set_hashes({0: share_hash}) - - if self.block_hash_tree.needed_hashes(blocknum): - self.block_hash_tree.set_hashes(blockhashes) - - blockhash = hashutil.block_hash(blockdata) - self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) - #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " - # "%r .. %r: %s" % - # (self.sharenum, blocknum, len(blockdata), - # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) - - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - # log.WEIRD: indicates undetected disk/network error, or more - # likely a programming error - self.log("hash failure in block=%d, shnum=%d on %s" % - (blocknum, self.sharenum, self.bucket)) - if self.block_hash_tree.needed_hashes(blocknum): - self.log(""" failure occurred when checking the block_hash_tree. - This suggests that either the block data was bad, or that the - block hashes we received along with it were bad.""") - else: - self.log(""" the failure probably occurred when checking the - share_hash_tree, which suggests that the share hashes we - received from the remote peer were bad.""") - self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) - self.log(" block length: %d" % len(blockdata)) - self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) - if len(blockdata) < 100: - self.log(" block data: %r" % (blockdata,)) - else: - self.log(" block data start/end: %r .. %r" % - (blockdata[:50], blockdata[-50:])) - self.log(" share hash tree:\n" + self.share_hash_tree.dump()) - self.log(" block hash tree:\n" + self.block_hash_tree.dump()) - lines = [] - for i,h in sorted(sharehashes.items()): - lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) - self.log(" sharehashes:\n" + "\n".join(lines) + "\n") - lines = [] - for i,h in blockhashes.items(): - lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) - log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") - raise BadOrMissingHash(le) - - # If we made it here, the block is good. If the hash trees didn't - # like what they saw, they would have raised a BadHashError, causing - # our caller to see a Failure and thus ignore this block (as well as - # dropping this bucket). - return blockdata - - - -class BlockDownloader(log.PrefixingLogMixin): - """I am responsible for downloading a single block (from a single bucket) - for a single segment. - - I am a child of the SegmentDownloader. - """ - - def __init__(self, vbucket, blocknum, parent, results): - precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket) - prefix = "%s-%d" % (vbucket, blocknum) - log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) - self.vbucket = vbucket - self.blocknum = blocknum - self.parent = parent - self.results = results - - def start(self, segnum): - self.log("get_block(segnum=%d)" % segnum) - started = time.time() - d = self.vbucket.get_block(segnum) - d.addCallbacks(self._hold_block, self._got_block_error, - callbackArgs=(started,)) - return d - - def _hold_block(self, data, started): - if self.results: - elapsed = time.time() - started - peerid = self.vbucket.bucket.get_peerid() - if peerid not in self.results.timings["fetch_per_server"]: - self.results.timings["fetch_per_server"][peerid] = [] - self.results.timings["fetch_per_server"][peerid].append(elapsed) - self.log("got block") - self.parent.hold_block(self.blocknum, data) - - def _got_block_error(self, f): - f.trap(RemoteException, DeadReferenceError, - IntegrityCheckReject, layout.LayoutInvalid, - layout.ShareVersionIncompatible) - if f.check(RemoteException, DeadReferenceError): - level = log.UNUSUAL - else: - level = log.WEIRD - self.log("failure to get block", level=level, umid="5Z4uHQ") - if self.results: - peerid = self.vbucket.bucket.get_peerid() - self.results.server_problems[peerid] = str(f) - self.parent.bucket_failed(self.vbucket) - -class SegmentDownloader: - """I am responsible for downloading all the blocks for a single segment - of data. - - I am a child of the CiphertextDownloader. - """ - - def __init__(self, parent, segmentnumber, needed_shares, results): - self.parent = parent - self.segmentnumber = segmentnumber - self.needed_blocks = needed_shares - self.blocks = {} # k: blocknum, v: data - self.results = results - self._log_number = self.parent.log("starting segment %d" % - segmentnumber) - - def log(self, *args, **kwargs): - if "parent" not in kwargs: - kwargs["parent"] = self._log_number - return self.parent.log(*args, **kwargs) - - def start(self): - return self._download() - - def _download(self): - d = self._try() - def _done(res): - if len(self.blocks) >= self.needed_blocks: - # we only need self.needed_blocks blocks - # we want to get the smallest blockids, because they are - # more likely to be fast "primary blocks" - blockids = sorted(self.blocks.keys())[:self.needed_blocks] - blocks = [] - for blocknum in blockids: - blocks.append(self.blocks[blocknum]) - return (blocks, blockids) - else: - return self._download() - d.addCallback(_done) - return d - - def _try(self): - # fill our set of active buckets, maybe raising NotEnoughSharesError - active_buckets = self.parent._activate_enough_buckets() - # Now we have enough buckets, in self.parent.active_buckets. - - # in test cases, bd.start might mutate active_buckets right away, so - # we need to put off calling start() until we've iterated all the way - # through it. - downloaders = [] - for blocknum, vbucket in active_buckets.iteritems(): - assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket - bd = BlockDownloader(vbucket, blocknum, self, self.results) - downloaders.append(bd) - if self.results: - self.results.servers_used.add(vbucket.bucket.get_peerid()) - l = [bd.start(self.segmentnumber) for bd in downloaders] - return defer.DeferredList(l, fireOnOneErrback=True) - - def hold_block(self, blocknum, data): - self.blocks[blocknum] = data - - def bucket_failed(self, vbucket): - self.parent.bucket_failed(vbucket) - -class DownloadStatus: - implements(IDownloadStatus) - statusid_counter = itertools.count(0) - - def __init__(self): - self.storage_index = None - self.size = None - self.helper = False - self.status = "Not started" - self.progress = 0.0 - self.paused = False - self.stopped = False - self.active = True - self.results = None - self.counter = self.statusid_counter.next() - self.started = time.time() - - def get_started(self): - return self.started - def get_storage_index(self): - return self.storage_index - def get_size(self): - return self.size - def using_helper(self): - return self.helper - def get_status(self): - status = self.status - if self.paused: - status += " (output paused)" - if self.stopped: - status += " (output stopped)" - return status - def get_progress(self): - return self.progress - def get_active(self): - return self.active - def get_results(self): - return self.results - def get_counter(self): - return self.counter - - def set_storage_index(self, si): - self.storage_index = si - def set_size(self, size): - self.size = size - def set_helper(self, helper): - self.helper = helper - def set_status(self, status): - self.status = status - def set_paused(self, paused): - self.paused = paused - def set_stopped(self, stopped): - self.stopped = stopped - def set_progress(self, value): - self.progress = value - def set_active(self, value): - self.active = value - def set_results(self, value): - self.results = value - -class CiphertextDownloader(log.PrefixingLogMixin): - """ I download shares, check their integrity, then decode them, check the - integrity of the resulting ciphertext, then and write it to my target. - Before I send any new request to a server, I always ask the 'monitor' - object that was passed into my constructor whether this task has been - cancelled (by invoking its raise_if_cancelled() method).""" - implements(IPushProducer) - _status = None - - def __init__(self, storage_broker, v, target, monitor): - - precondition(IStorageBroker.providedBy(storage_broker), storage_broker) - precondition(IVerifierURI.providedBy(v), v) - precondition(IDownloadTarget.providedBy(target), target) - - self._storage_broker = storage_broker - self._verifycap = v - self._storage_index = v.get_storage_index() - self._uri_extension_hash = v.uri_extension_hash - - prefix=base32.b2a_l(self._storage_index[:8], 60) - log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) - - self._started = time.time() - self._status = s = DownloadStatus() - s.set_status("Starting") - s.set_storage_index(self._storage_index) - s.set_size(self._verifycap.size) - s.set_helper(False) - s.set_active(True) - - self._results = DownloadResults() - s.set_results(self._results) - self._results.file_size = self._verifycap.size - self._results.timings["servers_peer_selection"] = {} - self._results.timings["fetch_per_server"] = {} - self._results.timings["cumulative_fetch"] = 0.0 - self._results.timings["cumulative_decode"] = 0.0 - self._results.timings["cumulative_decrypt"] = 0.0 - self._results.timings["paused"] = 0.0 - - self._paused = False - self._stopped = False - if IConsumer.providedBy(target): - target.registerProducer(self, True) - self._target = target - # Repairer (uploader) needs the storageindex. - self._target.set_storageindex(self._storage_index) - self._monitor = monitor - self._opened = False - - self.active_buckets = {} # k: shnum, v: bucket - self._share_buckets = {} # k: sharenum, v: list of buckets - - # _download_all_segments() will set this to: - # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets - self._share_vbuckets = None - - self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } - - self._ciphertext_hasher = hashutil.crypttext_hasher() - - self._bytes_done = 0 - self._status.set_progress(float(self._bytes_done)/self._verifycap.size) - - # _got_uri_extension() will create the following: - # self._crypttext_hash_tree - # self._share_hash_tree - # self._current_segnum = 0 - # self._vup # ValidatedExtendedURIProxy - - # _get_all_shareholders() will create the following: - # self._total_queries - # self._responses_received = 0 - # self._queries_failed = 0 - - # This is solely for the use of unit tests. It will be triggered when - # we start downloading shares. - self._stage_4_d = defer.Deferred() - - def pauseProducing(self): - if self._paused: - return - self._paused = defer.Deferred() - self._paused_at = time.time() - if self._status: - self._status.set_paused(True) - - def resumeProducing(self): - if self._paused: - paused_for = time.time() - self._paused_at - self._results.timings['paused'] += paused_for - p = self._paused - self._paused = None - eventually(p.callback, None) - if self._status: - self._status.set_paused(False) - - def stopProducing(self): - self.log("Download.stopProducing") - self._stopped = True - self.resumeProducing() - if self._status: - self._status.set_stopped(True) - self._status.set_active(False) - - def start(self): - self.log("starting download") - - # first step: who should we download from? - d = defer.maybeDeferred(self._get_all_shareholders) - d.addBoth(self._got_all_shareholders) - # now get the uri_extension block from somebody and integrity check - # it and parse and validate its contents - d.addCallback(self._obtain_uri_extension) - d.addCallback(self._get_crypttext_hash_tree) - # once we know that, we can download blocks from everybody - d.addCallback(self._download_all_segments) - def _finished(res): - if self._status: - self._status.set_status("Finished") - self._status.set_active(False) - self._status.set_paused(False) - if IConsumer.providedBy(self._target): - self._target.unregisterProducer() - return res - d.addBoth(_finished) - def _failed(why): - if self._status: - self._status.set_status("Failed") - self._status.set_active(False) - if why.check(DownloadStopped): - # DownloadStopped just means the consumer aborted the - # download; not so scary. - self.log("download stopped", level=log.UNUSUAL) - else: - # This is really unusual, and deserves maximum forensics. - self.log("download failed!", failure=why, level=log.SCARY, - umid="lp1vaQ") - return why - d.addErrback(_failed) - d.addCallback(self._done) - return d - - def _get_all_shareholders(self): - """ Once the number of buckets that I know about is >= K then I - callback the Deferred that I return. - - If all of the get_buckets deferreds have fired (whether callback - or errback) and I still don't have enough buckets then I'll also - callback -- not errback -- the Deferred that I return. - """ - wait_for_enough_buckets_d = defer.Deferred() - self._wait_for_enough_buckets_d = wait_for_enough_buckets_d - - sb = self._storage_broker - servers = sb.get_servers_for_index(self._storage_index) - if not servers: - raise NoServersError("broker gave us no servers!") - - self._total_queries = len(servers) - self._responses_received = 0 - self._queries_failed = 0 - for (peerid,ss) in servers: - self.log(format="sending DYHB to [%(peerid)s]", - peerid=idlib.shortnodeid_b2a(peerid), - level=log.NOISY, umid="rT03hg") - d = ss.callRemote("get_buckets", self._storage_index) - d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(peerid,)) - d.addBoth(self._check_got_all_responses) - - if self._status: - self._status.set_status("Locating Shares (%d/%d)" % - (self._responses_received, - self._total_queries)) - return wait_for_enough_buckets_d - - def _check_got_all_responses(self, ignored=None): - assert (self._responses_received+self._queries_failed) <= self._total_queries - if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries: - reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False) - self._wait_for_enough_buckets_d = None - - def _got_response(self, buckets, peerid): - # Note that this can continue to receive responses after _wait_for_enough_buckets_d - # has fired. - self._responses_received += 1 - self.log(format="got results from [%(peerid)s]: shnums %(shnums)s", - peerid=idlib.shortnodeid_b2a(peerid), - shnums=sorted(buckets.keys()), - level=log.NOISY, umid="o4uwFg") - if self._results: - elapsed = time.time() - self._started - self._results.timings["servers_peer_selection"][peerid] = elapsed - if self._status: - self._status.set_status("Locating Shares (%d/%d)" % - (self._responses_received, - self._total_queries)) - for sharenum, bucket in buckets.iteritems(): - b = layout.ReadBucketProxy(bucket, peerid, self._storage_index) - self.add_share_bucket(sharenum, b) - # If we just got enough buckets for the first time, then fire the - # deferred. Then remove it from self so that we don't fire it - # again. - if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares: - reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True) - self._wait_for_enough_buckets_d = None - - if self._share_vbuckets is not None: - vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) - self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) - - if self._results: - if peerid not in self._results.servermap: - self._results.servermap[peerid] = set() - self._results.servermap[peerid].add(sharenum) - - def add_share_bucket(self, sharenum, bucket): - # this is split out for the benefit of test_encode.py - self._share_buckets.setdefault(sharenum, []).append(bucket) - - def _got_error(self, f): - self._queries_failed += 1 - level = log.WEIRD - if f.check(DeadReferenceError): - level = log.UNUSUAL - self.log("Error during get_buckets", failure=f, level=level, - umid="3uuBUQ") - - def bucket_failed(self, vbucket): - shnum = vbucket.sharenum - del self.active_buckets[shnum] - s = self._share_vbuckets[shnum] - # s is a set of ValidatedReadBucketProxy instances - s.remove(vbucket) - # ... which might now be empty - if not s: - # there are no more buckets which can provide this share, so - # remove the key. This may prompt us to use a different share. - del self._share_vbuckets[shnum] - - def _got_all_shareholders(self, res): - if self._results: - now = time.time() - self._results.timings["peer_selection"] = now - self._started - - if len(self._share_buckets) < self._verifycap.needed_shares: - msg = "Failed to get enough shareholders: have %d, need %d" \ - % (len(self._share_buckets), self._verifycap.needed_shares) - if self._share_buckets: - raise NotEnoughSharesError(msg) - else: - raise NoSharesError(msg) - - #for s in self._share_vbuckets.values(): - # for vb in s: - # assert isinstance(vb, ValidatedReadBucketProxy), \ - # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,) - - def _obtain_uri_extension(self, ignored): - # all shareholders are supposed to have a copy of uri_extension, and - # all are supposed to be identical. We compute the hash of the data - # that comes back, and compare it against the version in our URI. If - # they don't match, ignore their data and try someone else. - if self._status: - self._status.set_status("Obtaining URI Extension") - - uri_extension_fetch_started = time.time() - - vups = [] - for sharenum, buckets in self._share_buckets.iteritems(): - for bucket in buckets: - vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) - vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid) - d = vto.start() - - def _got_uri_extension(vup): - precondition(isinstance(vup, ValidatedExtendedURIProxy), vup) - if self._results: - elapsed = time.time() - uri_extension_fetch_started - self._results.timings["uri_extension"] = elapsed - - self._vup = vup - self._codec = codec.CRSDecoder() - self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares) - self._tail_codec = codec.CRSDecoder() - self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares) - - self._current_segnum = 0 - - self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares) - self._share_hash_tree.set_hashes({0: vup.share_root_hash}) - - self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments) - self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash}) - - # Repairer (uploader) needs the encodingparams. - self._target.set_encodingparams(( - self._verifycap.needed_shares, - 0, # see ticket #778 for why this is - self._verifycap.total_shares, - self._vup.segment_size - )) - d.addCallback(_got_uri_extension) - return d - - def _get_crypttext_hash_tree(self, res): - vchtps = [] - for sharenum, buckets in self._share_buckets.iteritems(): - for bucket in buckets: - vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) - vchtps.append(vchtp) - - _get_crypttext_hash_tree_started = time.time() - if self._status: - self._status.set_status("Retrieving crypttext hash tree") - - vto = ValidatedThingObtainer(vchtps, debugname="vchtps", - log_id=self._parentmsgid) - d = vto.start() - - def _got_crypttext_hash_tree(res): - # Good -- the self._crypttext_hash_tree that we passed to vchtp - # is now populated with hashes. - if self._results: - elapsed = time.time() - _get_crypttext_hash_tree_started - self._results.timings["hashtrees"] = elapsed - d.addCallback(_got_crypttext_hash_tree) - return d - - def _activate_enough_buckets(self): - """either return a mapping from shnum to a ValidatedReadBucketProxy - that can provide data for that share, or raise NotEnoughSharesError""" - - while len(self.active_buckets) < self._verifycap.needed_shares: - # need some more - handled_shnums = set(self.active_buckets.keys()) - available_shnums = set(self._share_vbuckets.keys()) - potential_shnums = list(available_shnums - handled_shnums) - if len(potential_shnums) < (self._verifycap.needed_shares - - len(self.active_buckets)): - have = len(potential_shnums) + len(self.active_buckets) - msg = "Unable to activate enough shares: have %d, need %d" \ - % (have, self._verifycap.needed_shares) - if have: - raise NotEnoughSharesError(msg) - else: - raise NoSharesError(msg) - # For the next share, choose a primary share if available, else a - # randomly chosen secondary share. - potential_shnums.sort() - if potential_shnums[0] < self._verifycap.needed_shares: - shnum = potential_shnums[0] - else: - shnum = random.choice(potential_shnums) - # and a random bucket that will provide it - validated_bucket = random.choice(list(self._share_vbuckets[shnum])) - self.active_buckets[shnum] = validated_bucket - return self.active_buckets - - - def _download_all_segments(self, res): - # From now on if new buckets are received then I will notice that - # self._share_vbuckets is not None and generate a vbucket for that new - # bucket and add it in to _share_vbuckets. (We had to wait because we - # didn't have self._vup and self._share_hash_tree earlier. We didn't - # need validated buckets until now -- now that we are ready to download - # shares.) - self._share_vbuckets = {} - for sharenum, buckets in self._share_buckets.iteritems(): - for bucket in buckets: - vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) - self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) - - # after the above code, self._share_vbuckets contains enough - # buckets to complete the download, and some extra ones to - # tolerate some buckets dropping out or having - # errors. self._share_vbuckets is a dictionary that maps from - # shnum to a set of ValidatedBuckets, which themselves are - # wrappers around RIBucketReader references. - self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance - - self._started_fetching = time.time() - - d = defer.succeed(None) - for segnum in range(self._vup.num_segments): - d.addCallback(self._download_segment, segnum) - # this pause, at the end of write, prevents pre-fetch from - # happening until the consumer is ready for more data. - d.addCallback(self._check_for_pause) - - self._stage_4_d.callback(None) - return d - - def _check_for_pause(self, res): - if self._paused: - d = defer.Deferred() - self._paused.addCallback(lambda ignored: d.callback(res)) - return d - if self._stopped: - raise DownloadStopped("our Consumer called stopProducing()") - self._monitor.raise_if_cancelled() - return res - - def _download_segment(self, res, segnum): - if self._status: - self._status.set_status("Downloading segment %d of %d" % - (segnum+1, self._vup.num_segments)) - self.log("downloading seg#%d of %d (%d%%)" - % (segnum, self._vup.num_segments, - 100.0 * segnum / self._vup.num_segments)) - # memory footprint: when the SegmentDownloader finishes pulling down - # all shares, we have 1*segment_size of usage. - segmentdler = SegmentDownloader(self, segnum, - self._verifycap.needed_shares, - self._results) - started = time.time() - d = segmentdler.start() - def _finished_fetching(res): - elapsed = time.time() - started - self._results.timings["cumulative_fetch"] += elapsed - return res - if self._results: - d.addCallback(_finished_fetching) - # pause before using more memory - d.addCallback(self._check_for_pause) - # while the codec does its job, we hit 2*segment_size - def _started_decode(res): - self._started_decode = time.time() - return res - if self._results: - d.addCallback(_started_decode) - if segnum + 1 == self._vup.num_segments: - codec = self._tail_codec - else: - codec = self._codec - d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids)) - # once the codec is done, we drop back to 1*segment_size, because - # 'shares' goes out of scope. The memory usage is all in the - # plaintext now, spread out into a bunch of tiny buffers. - def _finished_decode(res): - elapsed = time.time() - self._started_decode - self._results.timings["cumulative_decode"] += elapsed - return res - if self._results: - d.addCallback(_finished_decode) - - # pause/check-for-stop just before writing, to honor stopProducing - d.addCallback(self._check_for_pause) - d.addCallback(self._got_segment) - return d - - def _got_segment(self, buffers): - precondition(self._crypttext_hash_tree) - started_decrypt = time.time() - self._status.set_progress(float(self._current_segnum)/self._verifycap.size) - - if self._current_segnum + 1 == self._vup.num_segments: - # This is the last segment. - # Trim off any padding added by the upload side. We never send - # empty segments. If the data was an exact multiple of the - # segment size, the last segment will be full. - tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares) - num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size) - # Remove buffers which don't contain any part of the tail. - del buffers[num_buffers_used:] - # Remove the past-the-tail-part of the last buffer. - tail_in_last_buf = self._vup.tail_data_size % tail_buf_size - if tail_in_last_buf == 0: - tail_in_last_buf = tail_buf_size - buffers[-1] = buffers[-1][:tail_in_last_buf] - - # First compute the hash of this segment and check that it fits. - ch = hashutil.crypttext_segment_hasher() - for buffer in buffers: - self._ciphertext_hasher.update(buffer) - ch.update(buffer) - self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()}) - - # Then write this segment to the target. - if not self._opened: - self._opened = True - self._target.open(self._verifycap.size) - - for buffer in buffers: - self._target.write(buffer) - self._bytes_done += len(buffer) - - self._status.set_progress(float(self._bytes_done)/self._verifycap.size) - self._current_segnum += 1 - - if self._results: - elapsed = time.time() - started_decrypt - self._results.timings["cumulative_decrypt"] += elapsed - - def _done(self, res): - self.log("download done") - if self._results: - now = time.time() - self._results.timings["total"] = now - self._started - self._results.timings["segments"] = now - self._started_fetching - if self._vup.crypttext_hash: - _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(), - "bad crypttext_hash: computed=%s, expected=%s" % - (base32.b2a(self._ciphertext_hasher.digest()), - base32.b2a(self._vup.crypttext_hash))) - _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size) - self._status.set_progress(1) - self._target.close() - return self._target.finish() - def get_download_status(self): - return self._status - - -class ConsumerAdapter: - implements(IDownloadTarget, IConsumer) - def __init__(self, consumer): - self._consumer = consumer - - def registerProducer(self, producer, streaming): - self._consumer.registerProducer(producer, streaming) - def unregisterProducer(self): - self._consumer.unregisterProducer() - - def open(self, size): - pass - def write(self, data): - self._consumer.write(data) - def close(self): - pass - - def fail(self, why): - pass - def register_canceller(self, cb): - pass - def finish(self): - return self._consumer - # The following methods are just because the target might be a - # repairer.DownUpConnector, and just because the current CHKUpload object - # expects to find the storage index and encoding parameters in its - # Uploadable. - def set_storageindex(self, storageindex): - pass - def set_encodingparams(self, encodingparams): - pass - - -class Downloader: - """I am a service that allows file downloading. - """ - # TODO: in fact, this service only downloads immutable files (URI:CHK:). - # It is scheduled to go away, to be replaced by filenode.download() - implements(IDownloader) - - def __init__(self, storage_broker, stats_provider): - self.storage_broker = storage_broker - self.stats_provider = stats_provider - self._all_downloads = weakref.WeakKeyDictionary() # for debugging - - def download(self, u, t, _log_msg_id=None, monitor=None, history=None): - assert isinstance(u, uri.CHKFileURI) - t = IDownloadTarget(t) - assert t.write - assert t.close - - if self.stats_provider: - # these counters are meant for network traffic, and don't - # include LIT files - self.stats_provider.count('downloader.files_downloaded', 1) - self.stats_provider.count('downloader.bytes_downloaded', u.get_size()) - - target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id) - if not monitor: - monitor=Monitor() - dl = CiphertextDownloader(self.storage_broker, - u.get_verify_cap(), target, - monitor=monitor) - self._all_downloads[dl] = None - if history: - history.add_download(dl.get_download_status()) - d = dl.start() - return d rmfile ./src/allmydata/immutable/download.py hunk ./src/allmydata/immutable/filenode.py 1 -import copy, os.path, stat -from cStringIO import StringIO + +import binascii +import copy +import time +now = time.time from zope.interface import implements from twisted.internet import defer hunk ./src/allmydata/immutable/filenode.py 8 -from twisted.internet.interfaces import IPushProducer -from twisted.protocols import basic -from foolscap.api import eventually -from allmydata.interfaces import IImmutableFileNode, ICheckable, \ - IDownloadTarget, IUploadResults -from allmydata.util import dictutil, log, base32 -from allmydata.uri import CHKFileURI, LiteralFileURI -from allmydata.immutable.checker import Checker +from twisted.internet.interfaces import IConsumer + +from allmydata.interfaces import IImmutableFileNode, IUploadResults +from allmydata import uri from allmydata.check_results import CheckResults, CheckAndRepairResults hunk ./src/allmydata/immutable/filenode.py 13 +from allmydata.util.dictutil import DictOfSets +from pycryptopp.cipher.aes import AES + +# local imports +from allmydata.immutable.checker import Checker from allmydata.immutable.repairer import Repairer hunk ./src/allmydata/immutable/filenode.py 19 -from allmydata.immutable import download +from allmydata.immutable.downloader.node import DownloadNode +from allmydata.immutable.downloader.status import DownloadStatus hunk ./src/allmydata/immutable/filenode.py 22 -class _ImmutableFileNodeBase(object): - implements(IImmutableFileNode, ICheckable) +class CiphertextFileNode: + def __init__(self, verifycap, storage_broker, secret_holder, + terminator, history, download_status=None): + assert isinstance(verifycap, uri.CHKFileVerifierURI) + self._verifycap = verifycap + self._storage_broker = storage_broker + self._secret_holder = secret_holder + if download_status is None: + ds = DownloadStatus(verifycap.storage_index, verifycap.size) + if history: + history.add_download(ds) + download_status = ds + self._node = DownloadNode(verifycap, storage_broker, secret_holder, + terminator, history, download_status) hunk ./src/allmydata/immutable/filenode.py 37 - def get_write_uri(self): - return None + def read(self, consumer, offset=0, size=None, read_ev=None): + """I am the main entry point, from which FileNode.read() can get + data. I feed the consumer with the desired range of ciphertext. I + return a Deferred that fires (with the consumer) when the read is + finished.""" + return self._node.read(consumer, offset, size, read_ev) hunk ./src/allmydata/immutable/filenode.py 44 - def get_readonly_uri(self): - return self.get_uri() + def get_segment(self, segnum): + """Begin downloading a segment. I return a tuple (d, c): 'd' is a + Deferred that fires with (offset,data) when the desired segment is + available, and c is an object on which c.cancel() can be called to + disavow interest in the segment (after which 'd' will never fire). hunk ./src/allmydata/immutable/filenode.py 50 - def is_mutable(self): - return False + You probably need to know the segment size before calling this, + unless you want the first few bytes of the file. If you ask for a + segment number which turns out to be too large, the Deferred will + errback with BadSegmentNumberError. hunk ./src/allmydata/immutable/filenode.py 55 - def is_readonly(self): - return True + The Deferred fires with the offset of the first byte of the data + segment, so that you can call get_segment() before knowing the + segment size, and still know which data you received. + """ + return self._node.get_segment(segnum) hunk ./src/allmydata/immutable/filenode.py 61 - def is_unknown(self): - return False + def get_segment_size(self): + # return a Deferred that fires with the file's real segment size + return self._node.get_segsize() hunk ./src/allmydata/immutable/filenode.py 65 - def is_allowed_in_immutable_directory(self): - return True + def get_storage_index(self): + return self._verifycap.storage_index + def get_verify_cap(self): + return self._verifycap + def get_size(self): + return self._verifycap.size def raise_error(self): pass hunk ./src/allmydata/immutable/filenode.py 75 - def __hash__(self): - return self.u.__hash__() - def __eq__(self, other): - if isinstance(other, _ImmutableFileNodeBase): - return self.u.__eq__(other.u) - else: - return False - def __ne__(self, other): - if isinstance(other, _ImmutableFileNodeBase): - return self.u.__eq__(other.u) - else: - return True - -class PortionOfFile: - # like a list slice (things[2:14]), but for a file on disk - def __init__(self, fn, offset=0, size=None): - self.f = open(fn, "rb") - self.f.seek(offset) - self.bytes_left = size - - def read(self, size=None): - # bytes_to_read = min(size, self.bytes_left), but None>anything - if size is None: - bytes_to_read = self.bytes_left - elif self.bytes_left is None: - bytes_to_read = size - else: - bytes_to_read = min(size, self.bytes_left) - data = self.f.read(bytes_to_read) - if self.bytes_left is not None: - self.bytes_left -= len(data) - return data - -class DownloadCache: - implements(IDownloadTarget) - - def __init__(self, filecap, storage_index, downloader, - cachedirectorymanager): - self._downloader = downloader - self._uri = filecap - self._storage_index = storage_index - self.milestones = set() # of (offset,size,Deferred) - self.cachedirectorymanager = cachedirectorymanager - self.cachefile = None - self.download_in_progress = False - # five states: - # new ImmutableFileNode, no downloads ever performed - # new ImmutableFileNode, leftover file (partial) - # new ImmutableFileNode, leftover file (whole) - # download in progress, not yet complete - # download complete - - def when_range_available(self, offset, size): - assert isinstance(offset, (int,long)) - assert isinstance(size, (int,long)) - - d = defer.Deferred() - self.milestones.add( (offset,size,d) ) - self._check_milestones() - if self.milestones and not self.download_in_progress: - self.download_in_progress = True - log.msg(format=("immutable filenode read [%(si)s]: " + - "starting download"), - si=base32.b2a(self._storage_index), - umid="h26Heg", level=log.OPERATIONAL) - d2 = self._downloader.download(self._uri, self) - d2.addBoth(self._download_done) - d2.addErrback(self._download_failed) - d2.addErrback(log.err, umid="cQaM9g") - return d - - def read(self, consumer, offset, size): - assert offset+size <= self.get_filesize() - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - f = PortionOfFile(self.cachefile.get_filename(), offset, size) - d = basic.FileSender().beginFileTransfer(f, consumer) - d.addCallback(lambda lastSent: consumer) - return d - - def _download_done(self, res): - # clear download_in_progress, so failed downloads can be re-tried - self.download_in_progress = False - return res - - def _download_failed(self, f): - # tell anyone who's waiting that we failed - for m in self.milestones: - (offset,size,d) = m - eventually(d.errback, f) - self.milestones.clear() - - def _check_milestones(self): - current_size = self.get_filesize() - for m in list(self.milestones): - (offset,size,d) = m - if offset+size <= current_size: - log.msg(format=("immutable filenode read [%(si)s] " + - "%(offset)d+%(size)d vs %(filesize)d: " + - "done"), - si=base32.b2a(self._storage_index), - offset=offset, size=size, filesize=current_size, - umid="nuedUg", level=log.NOISY) - self.milestones.discard(m) - eventually(d.callback, None) - else: - log.msg(format=("immutable filenode read [%(si)s] " + - "%(offset)d+%(size)d vs %(filesize)d: " + - "still waiting"), - si=base32.b2a(self._storage_index), - offset=offset, size=size, filesize=current_size, - umid="8PKOhg", level=log.NOISY) - - def get_filesize(self): - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - try: - filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE] - except OSError: - filesize = 0 - return filesize - - - def open(self, size): - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - self.f = open(self.cachefile.get_filename(), "wb") - - def write(self, data): - self.f.write(data) - self._check_milestones() - - def close(self): - self.f.close() - self._check_milestones() - - def fail(self, why): - pass - def register_canceller(self, cb): - pass - def finish(self): - return None - # The following methods are just because the target might be a - # repairer.DownUpConnector, and just because the current CHKUpload object - # expects to find the storage index and encoding parameters in its - # Uploadable. - def set_storageindex(self, storageindex): - pass - def set_encodingparams(self, encodingparams): - pass - - -class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): - def __init__(self, filecap, storage_broker, secret_holder, - downloader, history, cachedirectorymanager): - assert isinstance(filecap, CHKFileURI) - self.u = filecap - self._storage_broker = storage_broker - self._secret_holder = secret_holder - self._downloader = downloader - self._history = history - storage_index = self.get_storage_index() - self.download_cache = DownloadCache(filecap, storage_index, downloader, - cachedirectorymanager) - prefix = self.u.get_verify_cap().to_string() - log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix) - self.log("starting", level=log.OPERATIONAL) - - def get_size(self): - return self.u.get_size() - def get_current_size(self): - return defer.succeed(self.get_size()) - - def get_cap(self): - return self.u - def get_readcap(self): - return self.u.get_readonly() - def get_verify_cap(self): - return self.u.get_verify_cap() - def get_repair_cap(self): - # CHK files can be repaired with just the verifycap - return self.u.get_verify_cap() - - def get_uri(self): - return self.u.to_string() - - def get_storage_index(self): - return self.u.get_storage_index() def check_and_repair(self, monitor, verify=False, add_lease=False): hunk ./src/allmydata/immutable/filenode.py 77 - verifycap = self.get_verify_cap() + verifycap = self._verifycap + storage_index = verifycap.storage_index sb = self._storage_broker servers = sb.get_all_servers() sh = self._secret_holder hunk ./src/allmydata/immutable/filenode.py 88 monitor=monitor) d = c.start() def _maybe_repair(cr): - crr = CheckAndRepairResults(self.u.get_storage_index()) + crr = CheckAndRepairResults(storage_index) crr.pre_repair_results = cr if cr.is_healthy(): crr.post_repair_results = cr hunk ./src/allmydata/immutable/filenode.py 98 crr.repair_successful = False # until proven successful def _gather_repair_results(ur): assert IUploadResults.providedBy(ur), ur - # clone the cr -- check results to form the basic of the prr -- post-repair results + # clone the cr (check results) to form the basis of the + # prr (post-repair results) prr = CheckResults(cr.uri, cr.storage_index) prr.data = copy.deepcopy(cr.data) hunk ./src/allmydata/immutable/filenode.py 104 sm = prr.data['sharemap'] - assert isinstance(sm, dictutil.DictOfSets), sm + assert isinstance(sm, DictOfSets), sm sm.update(ur.sharemap) servers_responding = set(prr.data['servers-responding']) servers_responding.union(ur.sharemap.iterkeys()) hunk ./src/allmydata/immutable/filenode.py 111 prr.data['servers-responding'] = list(servers_responding) prr.data['count-shares-good'] = len(sm) prr.data['count-good-share-hosts'] = len(sm) - is_healthy = bool(len(sm) >= self.u.total_shares) - is_recoverable = bool(len(sm) >= self.u.needed_shares) + is_healthy = bool(len(sm) >= verifycap.total_shares) + is_recoverable = bool(len(sm) >= verifycap.needed_shares) prr.set_healthy(is_healthy) prr.set_recoverable(is_recoverable) crr.repair_successful = is_healthy hunk ./src/allmydata/immutable/filenode.py 116 - prr.set_needs_rebalancing(len(sm) >= self.u.total_shares) + prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares) crr.post_repair_results = prr return crr hunk ./src/allmydata/immutable/filenode.py 126 crr.repair_successful = False crr.repair_failure = f return f - r = Repairer(storage_broker=sb, secret_holder=sh, - verifycap=verifycap, monitor=monitor) + r = Repairer(self, storage_broker=sb, secret_holder=sh, + monitor=monitor) d = r.start() d.addCallbacks(_gather_repair_results, _repair_error) return d hunk ./src/allmydata/immutable/filenode.py 136 return d def check(self, monitor, verify=False, add_lease=False): - verifycap = self.get_verify_cap() + verifycap = self._verifycap sb = self._storage_broker servers = sb.get_all_servers() sh = self._secret_holder hunk ./src/allmydata/immutable/filenode.py 146 monitor=monitor) return v.start() - def read(self, consumer, offset=0, size=None): - self.log("read", offset=offset, size=size, - umid="UPP8FA", level=log.OPERATIONAL) - if size is None: - size = self.get_size() - offset - size = min(size, self.get_size() - offset) hunk ./src/allmydata/immutable/filenode.py 147 - if offset == 0 and size == self.get_size(): - # don't use the cache, just do a normal streaming download - self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL) - target = download.ConsumerAdapter(consumer) - return self._downloader.download(self.get_cap(), target, - self._parentmsgid, - history=self._history) +class DecryptingConsumer: + """I sit between a CiphertextDownloader (which acts as a Producer) and + the real Consumer, decrypting everything that passes by. The real + Consumer sees the real Producer, but the Producer sees us instead of the + real consumer.""" + implements(IConsumer) hunk ./src/allmydata/immutable/filenode.py 154 - d = self.download_cache.when_range_available(offset, size) - d.addCallback(lambda res: - self.download_cache.read(consumer, offset, size)) - return d + def __init__(self, consumer, readkey, offset, read_event): + self._consumer = consumer + self._read_event = read_event + # TODO: pycryptopp CTR-mode needs random-access operations: I want + # either a=AES(readkey, offset) or better yet both of: + # a=AES(readkey, offset=0) + # a.process(ciphertext, offset=xyz) + # For now, we fake it with the existing iv= argument. + offset_big = offset // 16 + offset_small = offset % 16 + iv = binascii.unhexlify("%032x" % offset_big) + self._decryptor = AES(readkey, iv=iv) + self._decryptor.process("\x00"*offset_small) hunk ./src/allmydata/immutable/filenode.py 168 -class LiteralProducer: - implements(IPushProducer) - def resumeProducing(self): - pass - def stopProducing(self): - pass + def registerProducer(self, producer, streaming): + # this passes through, so the real consumer can flow-control the real + # producer. Therefore we don't need to provide any IPushProducer + # methods. We implement all the IConsumer methods as pass-throughs, + # and only intercept write() to perform decryption. + self._consumer.registerProducer(producer, streaming) + def unregisterProducer(self): + self._consumer.unregisterProducer() + def write(self, ciphertext): + started = now() + plaintext = self._decryptor.process(ciphertext) + elapsed = now() - started + self._read_event.update(0, elapsed, 0) + self._consumer.write(plaintext) hunk ./src/allmydata/immutable/filenode.py 183 +class ImmutableFileNode: + implements(IImmutableFileNode) hunk ./src/allmydata/immutable/filenode.py 186 -class LiteralFileNode(_ImmutableFileNodeBase): - - def __init__(self, filecap): - assert isinstance(filecap, LiteralFileURI) + # I wrap a CiphertextFileNode with a decryption key + def __init__(self, filecap, storage_broker, secret_holder, terminator, + history): + assert isinstance(filecap, uri.CHKFileURI) + verifycap = filecap.get_verify_cap() + ds = DownloadStatus(verifycap.storage_index, verifycap.size) + if history: + history.add_download(ds) + self._download_status = ds + self._cnode = CiphertextFileNode(verifycap, storage_broker, + secret_holder, terminator, history, ds) + assert isinstance(filecap, uri.CHKFileURI) self.u = filecap hunk ./src/allmydata/immutable/filenode.py 199 + self._readkey = filecap.key hunk ./src/allmydata/immutable/filenode.py 201 - def get_size(self): - return len(self.u.data) - def get_current_size(self): - return defer.succeed(self.get_size()) + # TODO: I'm not sure about this.. what's the use case for node==node? If + # we keep it here, we should also put this on CiphertextFileNode + def __hash__(self): + return self.u.__hash__() + def __eq__(self, other): + if isinstance(other, ImmutableFileNode): + return self.u.__eq__(other.u) + else: + return False + def __ne__(self, other): + if isinstance(other, ImmutableFileNode): + return self.u.__eq__(other.u) + else: + return True + + def read(self, consumer, offset=0, size=None): + actual_size = size + if actual_size == None: + actual_size = self.u.size + actual_size = actual_size - offset + read_ev = self._download_status.add_read_event(offset,actual_size, + now()) + decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev) + d = self._cnode.read(decryptor, offset, size, read_ev) + d.addCallback(lambda dc: consumer) + return d + + def raise_error(self): + pass hunk ./src/allmydata/immutable/filenode.py 231 + def get_write_uri(self): + return None + + def get_readonly_uri(self): + return self.get_uri() + + def get_uri(self): + return self.u.to_string() def get_cap(self): return self.u def get_readcap(self): hunk ./src/allmydata/immutable/filenode.py 242 - return self.u + return self.u.get_readonly() def get_verify_cap(self): hunk ./src/allmydata/immutable/filenode.py 244 - return None + return self.u.get_verify_cap() def get_repair_cap(self): hunk ./src/allmydata/immutable/filenode.py 246 - return None - - def get_uri(self): - return self.u.to_string() + # CHK files can be repaired with just the verifycap + return self.u.get_verify_cap() def get_storage_index(self): hunk ./src/allmydata/immutable/filenode.py 250 - return None + return self.u.get_storage_index() hunk ./src/allmydata/immutable/filenode.py 252 - def check(self, monitor, verify=False, add_lease=False): - return defer.succeed(None) + def get_size(self): + return self.u.get_size() + def get_current_size(self): + return defer.succeed(self.get_size()) hunk ./src/allmydata/immutable/filenode.py 257 - def check_and_repair(self, monitor, verify=False, add_lease=False): - return defer.succeed(None) + def is_mutable(self): + return False hunk ./src/allmydata/immutable/filenode.py 260 - def read(self, consumer, offset=0, size=None): - if size is None: - data = self.u.data[offset:] - else: - data = self.u.data[offset:offset+size] + def is_readonly(self): + return True hunk ./src/allmydata/immutable/filenode.py 263 - # We use twisted.protocols.basic.FileSender, which only does - # non-streaming, i.e. PullProducer, where the receiver/consumer must - # ask explicitly for each chunk of data. There are only two places in - # the Twisted codebase that can't handle streaming=False, both of - # which are in the upload path for an FTP/SFTP server - # (protocols.ftp.FileConsumer and - # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is - # likely to be used as the target for a Tahoe download. + def is_unknown(self): + return False hunk ./src/allmydata/immutable/filenode.py 266 - d = basic.FileSender().beginFileTransfer(StringIO(data), consumer) - d.addCallback(lambda lastSent: consumer) - return d + def is_allowed_in_immutable_directory(self): + return True + + def check_and_repair(self, monitor, verify=False, add_lease=False): + return self._cnode.check_and_repair(monitor, verify, add_lease) + def check(self, monitor, verify=False, add_lease=False): + return self._cnode.check(monitor, verify, add_lease) hunk ./src/allmydata/immutable/layout.py 77 # they are still provided when writing so that older versions of Tahoe can # read them. +FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares + def make_write_bucket_proxy(rref, data_size, block_size, num_segments, num_share_hashes, uri_extension_size_max, nodeid): # Use layout v1 for small files, so they'll be readable by older versions hunk ./src/allmydata/immutable/layout.py 85 # (= self.next_read_lens[0]) - or self._closed_to_pusher): - nrd = self.next_read_ds.popleft() - nrl = self.next_read_lens.popleft() - - # Pick out the requested number of bytes from self.bufs, turn it - # into a string, and callback the deferred with that. - res = [] - ressize = 0 - while ressize < nrl and self.bufs: - nextbuf = self.bufs.popleft() - res.append(nextbuf) - ressize += len(nextbuf) - if ressize > nrl: - extra = ressize - nrl - self.bufs.appendleft(nextbuf[:-extra]) - res[-1] = nextbuf[:-extra] - assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl) - assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl) - self.bufsiz -= nrl - if self.bufsiz < self.buflim and self.producer: - self.producer.resumeProducing() - nrd.callback(res) - - # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From - # the perspective of a downloader I am an IDownloadTarget and an - # IConsumer.) - def registerProducer(self, producer, streaming): - assert streaming # We know how to handle only streaming producers. - self.producer = producer # the downloader - def unregisterProducer(self): - self.producer = None - def open(self, size): - self.size = size - self._size_osol.fire(self.size) - def set_encodingparams(self, encodingparams): - self.encodingparams = encodingparams - self._encodingparams_osol.fire(self.encodingparams) - def set_storageindex(self, storageindex): - self.storageindex = storageindex - self._storageindex_osol.fire(self.storageindex) - def write(self, data): - precondition(data) # please don't write empty strings - self.bufs.append(data) - self.bufsiz += len(data) - self._satisfy_reads_if_possible() - if self.bufsiz >= self.buflim and self.producer: - self.producer.pauseProducing() - def finish(self): - pass - def close(self): - self._closed_to_pusher = True - # Any reads which haven't been satisfied by now are going to - # have to be satisfied with short reads. - self._satisfy_reads_if_possible() # methods to satisfy the IEncryptedUploader interface # (From the perspective of an uploader I am an IEncryptedUploadable.) hunk ./src/allmydata/immutable/repairer.py 73 def set_upload_status(self, upload_status): self.upload_status = upload_status def get_size(self): - if hasattr(self, 'size'): # attribute created by self.open() - return defer.succeed(self.size) - else: - return self._size_osol.when_fired() + size = self._filenode.get_size() + assert size is not None + return defer.succeed(size) def get_all_encoding_parameters(self): hunk ./src/allmydata/immutable/repairer.py 77 - # We have to learn the encoding params from pusher. - if hasattr(self, 'encodingparams'): - # attribute created by self.set_encodingparams() - return defer.succeed(self.encodingparams) - else: - return self._encodingparams_osol.when_fired() + return defer.succeed(self._encodingparams) def read_encrypted(self, length, hash_only): hunk ./src/allmydata/immutable/repairer.py 79 - """Returns a deferred which eventually fired with the requested - ciphertext.""" + """Returns a deferred which eventually fires with the requested + ciphertext, as a list of strings.""" precondition(length) # please don't ask to read 0 bytes hunk ./src/allmydata/immutable/repairer.py 82 - d = defer.Deferred() - self.next_read_ds.append(d) - self.next_read_lens.append(length) - self._satisfy_reads_if_possible() + mc = consumer.MemoryConsumer() + d = self._filenode.read(mc, self._offset, length) + self._offset += length + d.addCallback(lambda ign: mc.chunks) return d def get_storage_index(self): hunk ./src/allmydata/immutable/repairer.py 88 - # We have to learn the storage index from pusher. - if hasattr(self, 'storageindex'): - # attribute created by self.set_storageindex() - return defer.succeed(self.storageindex) - else: - return self._storageindex.when_fired() + return self._filenode.get_storage_index() + def close(self): + pass hunk ./src/allmydata/immutable/upload.py 23 from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ - NoServersError, InsufficientVersionError, UploadUnhappinessError + NoServersError, InsufficientVersionError, UploadUnhappinessError, \ + DEFAULT_MAX_SEGMENT_SIZE from allmydata.immutable import layout from pycryptopp.cipher.aes import AES hunk ./src/allmydata/immutable/upload.py 1209 return self._upload_status class BaseUploadable: - default_max_segment_size = 128*KiB # overridden by max_segment_size + # this is overridden by max_segment_size + default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE default_encoding_param_k = 3 # overridden by encoding_parameters default_encoding_param_happy = 7 default_encoding_param_n = 10 hunk ./src/allmydata/interfaces.py 18 MAX_BUCKETS = 256 # per peer -- zfec offers at most 256 shares per file +DEFAULT_MAX_SEGMENT_SIZE = 128*1024 + ShareData = StringConstraint(None) URIExtensionData = StringConstraint(1000) Number = IntegerConstraint(8) # 2**(8*8) == 16EiB ~= 18e18 ~= 18 exabytes hunk ./src/allmydata/nodemaker.py 4 import weakref from zope.interface import implements from allmydata.interfaces import INodeMaker -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode from allmydata.immutable.upload import Data from allmydata.mutable.filenode import MutableFileNode from allmydata.dirnode import DirectoryNode, pack_children hunk ./src/allmydata/nodemaker.py 16 implements(INodeMaker) def __init__(self, storage_broker, secret_holder, history, - uploader, downloader, download_cache_dirman, + uploader, terminator, default_encoding_parameters, key_generator): self.storage_broker = storage_broker self.secret_holder = secret_holder hunk ./src/allmydata/nodemaker.py 22 self.history = history self.uploader = uploader - self.downloader = downloader - self.download_cache_dirman = download_cache_dirman + self.terminator = terminator self.default_encoding_parameters = default_encoding_parameters self.key_generator = key_generator hunk ./src/allmydata/nodemaker.py 32 return LiteralFileNode(cap) def _create_immutable(self, cap): return ImmutableFileNode(cap, self.storage_broker, self.secret_holder, - self.downloader, self.history, - self.download_cache_dirman) + self.terminator, self.history) + def _create_immutable_verifier(self, cap): + return CiphertextFileNode(cap, self.storage_broker, self.secret_holder, + self.terminator, self.history) def _create_mutable(self, cap): n = MutableFileNode(self.storage_broker, self.secret_holder, self.default_encoding_parameters, hunk ./src/allmydata/nodemaker.py 78 return self._create_lit(cap) if isinstance(cap, uri.CHKFileURI): return self._create_immutable(cap) + if isinstance(cap, uri.CHKFileVerifierURI): + return self._create_immutable_verifier(cap) if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)): return self._create_mutable(cap) if isinstance(cap, (uri.DirectoryURI, hunk ./src/allmydata/test/no_network.py 226 fileutil.make_dirs(serverdir) ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(), readonly_storage=readonly) + ss._no_network_server_number = i return ss def add_server(self, i, ss): hunk ./src/allmydata/test/no_network.py 323 pass return sorted(shares) + def copy_shares(self, uri): + shares = {} + for (shnum, serverid, sharefile) in self.find_uri_shares(uri): + shares[sharefile] = open(sharefile, "rb").read() + return shares + + def restore_all_shares(self, shares): + for sharefile, data in shares.items(): + open(sharefile, "wb").write(data) + def delete_share(self, (shnum, serverid, sharefile)): os.unlink(sharefile) hunk ./src/allmydata/test/no_network.py 353 corruptdata = corruptor(sharedata, debug=debug) open(i_sharefile, "wb").write(corruptdata) + def corrupt_all_shares(self, uri, corruptor, debug=False): + for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri): + sharedata = open(i_sharefile, "rb").read() + corruptdata = corruptor(sharedata, debug=debug) + open(i_sharefile, "wb").write(corruptdata) + def GET(self, urlpath, followRedirect=False, return_response=False, method="GET", clientnum=0, **kwargs): # if return_response=True, this fires with (data, statuscode, hunk ./src/allmydata/test/test_cli.py 2304 self.delete_shares_numbered(ur.uri, range(1,10)) d.addCallback(_stash_bad) + # the download is abandoned as soon as it's clear that we won't get + # enough shares. The one remaining share might be in either the + # COMPLETE or the PENDING state. + in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" + in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" + d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): self.failIfEqual(rc, 0) hunk ./src/allmydata/test/test_cli.py 2315 self.failUnless("410 Gone" in err, err) self.failUnlessIn("NotEnoughSharesError: ", err) - self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err) + self.failUnless(in_complete_msg in err or in_pending_msg in err, + err) d.addCallback(_check1) targetf = os.path.join(self.basedir, "output") hunk ./src/allmydata/test/test_cli.py 2325 self.failIfEqual(rc, 0) self.failUnless("410 Gone" in err, err) self.failUnlessIn("NotEnoughSharesError: ", err) - self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err) + self.failUnless(in_complete_msg in err or in_pending_msg in err, + err) self.failIf(os.path.exists(targetf)) d.addCallback(_check2) hunk ./src/allmydata/test/test_dirnode.py 1205 def test_unpack_and_pack_behavior(self): known_tree = b32decode(self.known_tree) nodemaker = NodeMaker(None, None, None, - None, None, None, + None, None, {"k": 3, "n": 10}, None) write_uri = "URI:SSK-RO:e3mdrzfwhoq42hy5ubcz6rp3o4:ybyibhnp3vvwuq2vaw2ckjmesgkklfs6ghxleztqidihjyofgw7q" filenode = nodemaker.create_from_cap(write_uri) hunk ./src/allmydata/test/test_dirnode.py 1267 return kids def test_deep_immutable(self): - nm = NodeMaker(None, None, None, None, None, None, {"k": 3, "n": 10}, - None) + nm = NodeMaker(None, None, None, None, None, {"k": 3, "n": 10}, None) fn = MinimalFakeMutableFile() kids = self._make_kids(nm, ["imm", "lit", "write", "read", hunk ./src/allmydata/test/test_dirnode.py 1361 class FakeClient2(Client): def __init__(self): self.nodemaker = FakeNodeMaker(None, None, None, - None, None, None, + None, None, {"k":3,"n":10}, None) def create_node_from_uri(self, rwcap, rocap): return self.nodemaker.create_from_cap(rwcap, rocap) hunk ./src/allmydata/test/test_dirnode.py 1645 def _do_delete(ignored): nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder, c0.get_history(), c0.getServiceNamed("uploader"), - c0.downloader, - c0.download_cache_dirman, + c0.terminator, c0.get_encoding_parameters(), c0._key_generator) n = nm.create_from_cap(self.root_uri) hunk ./src/allmydata/test/test_download.py 8 import os from twisted.trial import unittest +from twisted.internet import defer, reactor from allmydata import uri from allmydata.storage.server import storage_index_to_dir hunk ./src/allmydata/test/test_download.py 11 -from allmydata.util import base32, fileutil -from allmydata.util.consumer import download_to_data -from allmydata.immutable import upload +from allmydata.util import base32, fileutil, spans, log +from allmydata.util.consumer import download_to_data, MemoryConsumer +from allmydata.immutable import upload, layout from allmydata.test.no_network import GridTestMixin hunk ./src/allmydata/test/test_download.py 15 +from allmydata.test.common import ShouldFailMixin +from allmydata.interfaces import NotEnoughSharesError, NoSharesError +from allmydata.immutable.downloader.common import BadSegmentNumberError, \ + BadCiphertextHashError, DownloadStopped +from allmydata.codec import CRSDecoder +from foolscap.eventual import fireEventually, flushEventualQueue plaintext = "This is a moderate-sized file.\n" * 10 mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10 hunk ./src/allmydata/test/test_download.py 78 } #--------- END stored_shares.py ---------------- -class DownloadTest(GridTestMixin, unittest.TestCase): - timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. - def test_download(self): - self.basedir = self.mktemp() - self.set_up_grid() - self.c0 = self.g.clients[0] - - # do this to create the shares - #return self.create_shares() - - self.load_shares() - d = self.download_immutable() - d.addCallback(self.download_mutable) - return d +class _Base(GridTestMixin, ShouldFailMixin): def create_shares(self, ignored=None): u = upload.Data(plaintext, None) hunk ./src/allmydata/test/test_download.py 175 def _got_data(data): self.failUnlessEqual(data, plaintext) d.addCallback(_got_data) + # make sure we can use the same node twice + d.addCallback(lambda ign: download_to_data(n)) + d.addCallback(_got_data) return d def download_mutable(self, ignored=None): hunk ./src/allmydata/test/test_download.py 188 d.addCallback(_got_data) return d +class DownloadTest(_Base, unittest.TestCase): + timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. + def test_download(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # do this to create the shares + #return self.create_shares() + + self.load_shares() + d = self.download_immutable() + d.addCallback(self.download_mutable) + return d + + def test_download_failover(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + si = uri.from_string(immutable_uri).get_storage_index() + si_dir = storage_index_to_dir(si) + + n = self.c0.create_node_from_uri(immutable_uri) + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + + def _clobber_some_shares(ign): + # find the three shares that were used, and delete them. Then + # download again, forcing the downloader to fail over to other + # shares + for s in n._cnode._node._shares: + for clientnum in immutable_shares: + for shnum in immutable_shares[clientnum]: + if s._shnum == shnum: + fn = os.path.join(self.get_serverdir(clientnum), + "shares", si_dir, str(shnum)) + os.unlink(fn) + d.addCallback(_clobber_some_shares) + d.addCallback(lambda ign: download_to_data(n)) + d.addCallback(_got_data) + + def _clobber_most_shares(ign): + # delete all but one of the shares that are still alive + live_shares = [s for s in n._cnode._node._shares if s.is_alive()] + save_me = live_shares[0]._shnum + for clientnum in immutable_shares: + for shnum in immutable_shares[clientnum]: + if shnum == save_me: + continue + fn = os.path.join(self.get_serverdir(clientnum), + "shares", si_dir, str(shnum)) + if os.path.exists(fn): + os.unlink(fn) + # now the download should fail with NotEnoughSharesError + return self.shouldFail(NotEnoughSharesError, "1shares", None, + download_to_data, n) + d.addCallback(_clobber_most_shares) + + def _clobber_all_shares(ign): + # delete the last remaining share + for clientnum in immutable_shares: + for shnum in immutable_shares[clientnum]: + fn = os.path.join(self.get_serverdir(clientnum), + "shares", si_dir, str(shnum)) + if os.path.exists(fn): + os.unlink(fn) + # now a new download should fail with NoSharesError. We want a + # new ImmutableFileNode so it will forget about the old shares. + # If we merely called create_node_from_uri() without first + # dereferencing the original node, the NodeMaker's _node_cache + # would give us back the old one. + n = None + n = self.c0.create_node_from_uri(immutable_uri) + return self.shouldFail(NoSharesError, "0shares", None, + download_to_data, n) + d.addCallback(_clobber_all_shares) + return d + + def test_lost_servers(self): + # while downloading a file (after seg[0], before seg[1]), lose the + # three servers that we were using. The download should switch over + # to other servers. + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, so we can catch the download + # in the middle. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs + d = self.c0.upload(u) + def _uploaded(ur): + self.uri = ur.uri + self.n = self.c0.create_node_from_uri(self.uri) + return download_to_data(self.n) + d.addCallback(_uploaded) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + def _kill_some_servers(): + # find the three shares that were used, and delete them. Then + # download again, forcing the downloader to fail over to other + # shares + servers = [] + shares = sorted([s._shnum for s in self.n._cnode._node._shares]) + self.failUnlessEqual(shares, [0,1,2]) + # break the RIBucketReader references + for s in self.n._cnode._node._shares: + s._rref.broken = True + for servernum in immutable_shares: + for shnum in immutable_shares[servernum]: + if s._shnum == shnum: + ss = self.g.servers_by_number[servernum] + servers.append(ss) + # and, for good measure, break the RIStorageServer references + # too, just in case the downloader gets more aggressive in the + # future and tries to re-fetch the same share. + for ss in servers: + wrapper = self.g.servers_by_id[ss.my_nodeid] + wrapper.broken = True + def _download_again(ign): + c = StallingConsumer(_kill_some_servers) + return self.n.read(c) + d.addCallback(_download_again) + def _check_failover(c): + self.failUnlessEqual("".join(c.chunks), plaintext) + shares = sorted([s._shnum for s in self.n._cnode._node._shares]) + # we should now be using more shares than we were before + self.failIfEqual(shares, [0,1,2]) + d.addCallback(_check_failover) + return d + + def test_badguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + + # Cause the downloader to guess a segsize that's too low, so it will + # ask for a segment number that's too high (beyond the end of the + # real list, causing BadSegmentNumberError), to exercise + # Segmentation._retry_bad_segment + + con1 = MemoryConsumer() + n._cnode._node._build_guessed_tables(90) + # plaintext size of 310 bytes, wrong-segsize of 90 bytes, will make + # us think that file[180:200] is in the third segment (segnum=2), but + # really there's only one segment + d = n.read(con1, 180, 20) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[180:200]) + d.addCallback(_done) + return d + + def test_simultaneous_badguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. Because we don't tell the + # downloader about the unusual segsize, it will guess wrong, and have + # to do extra roundtrips to get the correct data. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + d1 = n.read(con1, 70, 20) + d2 = n.read(con2, 140, 20) + return defer.gatherResults([d1,d2]) + d.addCallback(_uploaded) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + d.addCallback(_done) + return d + + def test_simultaneous_goodguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. This time we *do* tell the + # downloader about the unusual segsize, so it can guess right. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + d1 = n.read(con1, 70, 20) + #d2 = n.read(con2, 140, 20) # XXX + d2 = defer.succeed(None) + return defer.gatherResults([d1,d2]) + d.addCallback(_uploaded) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + #d.addCallback(_done) + return d + + def test_sequential_goodguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + data = (plaintext*100)[:30000] # multiple of k + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. This time we *do* tell the + # downloader about the unusual segsize, so it can guess right. + u = upload.Data(data, None) + u.max_segment_size = 6000 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + d = n.read(con1, 12000, 20) + def _read1(ign): + self.failUnlessEqual("".join(con1.chunks), data[12000:12020]) + return n.read(con2, 24000, 20) + d.addCallback(_read1) + def _read2(ign): + self.failUnlessEqual("".join(con2.chunks), data[24000:24020]) + d.addCallback(_read2) + return d + d.addCallback(_uploaded) + return d + + + def test_simultaneous_get_blocks(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + stay_empty = [] + + n = self.c0.create_node_from_uri(immutable_uri) + d = download_to_data(n) + def _use_shares(ign): + shares = list(n._cnode._node._shares) + s0 = shares[0] + # make sure .cancel works too + o0 = s0.get_block(0) + o0.subscribe(lambda **kwargs: stay_empty.append(kwargs)) + o1 = s0.get_block(0) + o2 = s0.get_block(0) + o0.cancel() + o3 = s0.get_block(1) # state=BADSEGNUM + d1 = defer.Deferred() + d2 = defer.Deferred() + d3 = defer.Deferred() + o1.subscribe(lambda **kwargs: d1.callback(kwargs)) + o2.subscribe(lambda **kwargs: d2.callback(kwargs)) + o3.subscribe(lambda **kwargs: d3.callback(kwargs)) + return defer.gatherResults([d1,d2,d3]) + d.addCallback(_use_shares) + def _done(res): + r1,r2,r3 = res + self.failUnlessEqual(r1["state"], "COMPLETE") + self.failUnlessEqual(r2["state"], "COMPLETE") + self.failUnlessEqual(r3["state"], "BADSEGNUM") + self.failUnless("block" in r1) + self.failUnless("block" in r2) + self.failIf(stay_empty) + d.addCallback(_done) + return d + + def test_download_no_overrun(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + + # tweak the client's copies of server-version data, so it believes + # that they're old and can't handle reads that overrun the length of + # the share. This exercises a different code path. + for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v1["tolerates-immutable-read-overrun"] = False + + n = self.c0.create_node_from_uri(immutable_uri) + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + return d + + def test_download_segment(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + (d,c) = cn.get_segment(0) + def _got_segment((offset,data,decodetime)): + self.failUnlessEqual(offset, 0) + self.failUnlessEqual(len(data), len(plaintext)) + d.addCallback(_got_segment) + return d + + def test_download_segment_cancel(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + (d,c) = cn.get_segment(0) + fired = [] + d.addCallback(fired.append) + c.cancel() + d = fireEventually() + d.addCallback(flushEventualQueue) + def _check(ign): + self.failUnlessEqual(fired, []) + d.addCallback(_check) + return d + + def test_download_bad_segment(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + def _try_download(): + (d,c) = cn.get_segment(1) + return d + d = self.shouldFail(BadSegmentNumberError, "badseg", + "segnum=1, numsegs=1", + _try_download) + return d + + def test_download_segment_terminate(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + (d,c) = cn.get_segment(0) + fired = [] + d.addCallback(fired.append) + self.c0.terminator.disownServiceParent() + d = fireEventually() + d.addCallback(flushEventualQueue) + def _check(ign): + self.failUnlessEqual(fired, []) + d.addCallback(_check) + return d + + def test_pause(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + c = PausingConsumer() + d = n.read(c) + def _downloaded(mc): + newdata = "".join(mc.chunks) + self.failUnlessEqual(newdata, plaintext) + d.addCallback(_downloaded) + return d + + def test_pause_then_stop(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + c = PausingAndStoppingConsumer() + d = self.shouldFail(DownloadStopped, "test_pause_then_stop", + "our Consumer called stopProducing()", + n.read, c) + return d + + def test_stop(self): + # use a download targetthat does an immediate stop (ticket #473) + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + c = StoppingConsumer() + d = self.shouldFail(DownloadStopped, "test_stop", + "our Consumer called stopProducing()", + n.read, c) + return d + + def test_download_segment_bad_ciphertext_hash(self): + # The crypttext_hash_tree asserts the integrity of the decoded + # ciphertext, and exists to detect two sorts of problems. The first + # is a bug in zfec decode. The second is the "two-sided t-shirt" + # attack (found by Christian Grothoff), in which a malicious uploader + # creates two sets of shares (one for file A, second for file B), + # uploads a combination of them (shares 0-4 of A, 5-9 of B), and then + # builds an otherwise normal UEB around those shares: their goal is + # to give their victim a filecap which sometimes downloads the good A + # contents, and sometimes the bad B contents, depending upon which + # servers/shares they can get to. Having a hash of the ciphertext + # forces them to commit to exactly one version. (Christian's prize + # for finding this problem was a t-shirt with two sides: the shares + # of file A on the front, B on the back). + + # creating a set of shares with this property is too hard, although + # it'd be nice to do so and confirm our fix. (it requires a lot of + # tampering with the uploader). So instead, we just damage the + # decoder. The tail decoder is rebuilt each time, so we need to use a + # file with multiple segments. + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + u = upload.Data(plaintext, None) + u.max_segment_size = 60 # 6 segs + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + + d = download_to_data(n) + def _break_codec(data): + # the codec isn't created until the UEB is retrieved + node = n._cnode._node + vcap = node._verifycap + k, N = vcap.needed_shares, vcap.total_shares + bad_codec = BrokenDecoder() + bad_codec.set_params(node.segment_size, k, N) + node._codec = bad_codec + d.addCallback(_break_codec) + # now try to download it again. The broken codec will provide + # ciphertext that fails the hash test. + d.addCallback(lambda ign: + self.shouldFail(BadCiphertextHashError, "badhash", + "hash failure in " + "ciphertext_hash_tree: segnum=0", + download_to_data, n)) + return d + d.addCallback(_uploaded) + return d + + def OFFtest_download_segment_XXX(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. This time we *do* tell the + # downloader about the unusual segsize, so it can guess right. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + d1 = n.read(con1, 70, 20) + #d2 = n.read(con2, 140, 20) + d2 = defer.succeed(None) + return defer.gatherResults([d1,d2]) + d.addCallback(_uploaded) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + #d.addCallback(_done) + return d + + def test_duplicate_shares(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + # make sure everybody has a copy of sh0. The second server contacted + # will report two shares, and the ShareFinder will handle the + # duplicate by attaching both to the same CommonShare instance. + si = uri.from_string(immutable_uri).get_storage_index() + si_dir = storage_index_to_dir(si) + sh0_file = [sharefile + for (shnum, serverid, sharefile) + in self.find_uri_shares(immutable_uri) + if shnum == 0][0] + sh0_data = open(sh0_file, "rb").read() + for clientnum in immutable_shares: + if 0 in immutable_shares[clientnum]: + continue + cdir = self.get_serverdir(clientnum) + target = os.path.join(cdir, "shares", si_dir, "0") + outf = open(target, "wb") + outf.write(sh0_data) + outf.close() + + d = self.download_immutable() + return d + + def test_verifycap(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + + n = self.c0.create_node_from_uri(immutable_uri) + vcap = n.get_verify_cap().to_string() + vn = self.c0.create_node_from_uri(vcap) + d = download_to_data(vn) + def _got_ciphertext(ciphertext): + self.failUnlessEqual(len(ciphertext), len(plaintext)) + self.failIfEqual(ciphertext, plaintext) + d.addCallback(_got_ciphertext) + return d + +class BrokenDecoder(CRSDecoder): + def decode(self, shares, shareids): + d = CRSDecoder.decode(self, shares, shareids) + def _decoded(buffers): + def _corruptor(s, which): + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + buffers[0] = _corruptor(buffers[0], 0) # flip lsb of first byte + return buffers + d.addCallback(_decoded) + return d + + +class PausingConsumer(MemoryConsumer): + def __init__(self): + MemoryConsumer.__init__(self) + self.size = 0 + self.writes = 0 + def write(self, data): + self.size += len(data) + self.writes += 1 + if self.writes <= 2: + # we happen to use 4 segments, and want to avoid pausing on the + # last one (since then the _unpause timer will still be running) + self.producer.pauseProducing() + reactor.callLater(0.1, self._unpause) + return MemoryConsumer.write(self, data) + def _unpause(self): + self.producer.resumeProducing() + +class PausingAndStoppingConsumer(PausingConsumer): + def write(self, data): + self.producer.pauseProducing() + reactor.callLater(0.5, self._stop) + def _stop(self): + self.producer.stopProducing() + +class StoppingConsumer(PausingConsumer): + def write(self, data): + self.producer.stopProducing() + +class StallingConsumer(MemoryConsumer): + def __init__(self, halfway_cb): + MemoryConsumer.__init__(self) + self.halfway_cb = halfway_cb + self.writes = 0 + def write(self, data): + self.writes += 1 + if self.writes == 1: + self.halfway_cb() + return MemoryConsumer.write(self, data) + +class Corruption(_Base, unittest.TestCase): + + def _corrupt_flip(self, ign, imm_uri, which): + log.msg("corrupt %d" % which) + def _corruptor(s, debug=False): + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + + def _corrupt_set(self, ign, imm_uri, which, newvalue): + log.msg("corrupt %d" % which) + def _corruptor(s, debug=False): + return s[:which] + chr(newvalue) + s[which+1:] + self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + + def test_each_byte(self): + # Setting catalog_detection=True performs an exhaustive test of the + # Downloader's response to corruption in the lsb of each byte of the + # 2070-byte share, with two goals: make sure we tolerate all forms of + # corruption (i.e. don't hang or return bad data), and make a list of + # which bytes can be corrupted without influencing the download + # (since we don't need every byte of the share). That takes 50s to + # run on my laptop and doesn't have any actual asserts, so we don't + # normally do that. + self.catalog_detection = False + + self.basedir = "download/Corruption/each_byte" + self.set_up_grid() + self.c0 = self.g.clients[0] + + # to exercise the block-hash-tree code properly, we need to have + # multiple segments. We don't tell the downloader about the different + # segsize, so it guesses wrong and must do extra roundtrips. + u = upload.Data(plaintext, None) + u.max_segment_size = 120 # 3 segs, 4-wide hashtree + + if self.catalog_detection: + undetected = spans.Spans() + + def _download(ign, imm_uri, which, expected): + n = self.c0.create_node_from_uri(imm_uri) + # for this test to work, we need to have a new Node each time. + # Make sure the NodeMaker's weakcache hasn't interfered. + assert not n._cnode._node._shares + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + shnums = sorted([s._shnum for s in n._cnode._node._shares]) + no_sh0 = bool(0 not in shnums) + sh0 = [s for s in n._cnode._node._shares if s._shnum == 0] + sh0_had_corruption = False + if sh0 and sh0[0].had_corruption: + sh0_had_corruption = True + num_needed = len(n._cnode._node._shares) + if self.catalog_detection: + detected = no_sh0 or sh0_had_corruption or (num_needed!=3) + if not detected: + undetected.add(which, 1) + if expected == "no-sh0": + self.failIfIn(0, shnums) + elif expected == "0bad-need-3": + self.failIf(no_sh0) + self.failUnless(sh0[0].had_corruption) + self.failUnlessEqual(num_needed, 3) + elif expected == "need-4th": + self.failIf(no_sh0) + self.failUnless(sh0[0].had_corruption) + self.failIfEqual(num_needed, 3) + d.addCallback(_got_data) + return d + + + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + self.shares = self.copy_shares(imm_uri) + d = defer.succeed(None) + # 'victims' is a list of corruption tests to run. Each one flips + # the low-order bit of the specified offset in the share file (so + # offset=0 is the MSB of the container version, offset=15 is the + # LSB of the share version, offset=24 is the MSB of the + # data-block-offset, and offset=48 is the first byte of the first + # data-block). Each one also specifies what sort of corruption + # we're expecting to see. + no_sh0_victims = [0,1,2,3] # container version + need3_victims = [ ] # none currently in this category + # when the offsets are corrupted, the Share will be unable to + # retrieve the data it wants (because it thinks that data lives + # off in the weeds somewhere), and Share treats DataUnavailable + # as abandon-this-share, so in general we'll be forced to look + # for a 4th share. + need_4th_victims = [12,13,14,15, # share version + 24,25,26,27, # offset[data] + 32,33,34,35, # offset[crypttext_hash_tree] + 36,37,38,39, # offset[block_hashes] + 44,45,46,47, # offset[UEB] + ] + need_4th_victims.append(48) # block data + # when corrupting hash trees, we must corrupt a value that isn't + # directly set from somewhere else. Since we download data from + # seg0, corrupt something on its hash chain, like [2] (the + # right-hand child of the root) + need_4th_victims.append(600+2*32) # block_hashes[2] + # Share.loop is pretty conservative: it abandons the share at the + # first sign of corruption. It doesn't strictly need to be this + # way: if the UEB were corrupt, we could still get good block + # data from that share, as long as there was a good copy of the + # UEB elsewhere. If this behavior is relaxed, then corruption in + # the following fields (which are present in multiple shares) + # should fall into the "need3_victims" case instead of the + # "need_4th_victims" case. + need_4th_victims.append(376+2*32) # crypttext_hash_tree[2] + need_4th_victims.append(824) # share_hashes + need_4th_victims.append(994) # UEB length + need_4th_victims.append(998) # UEB + corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] + + [(i, "0bad-need-3") for i in need3_victims] + + [(i, "need-4th") for i in need_4th_victims]) + if self.catalog_detection: + corrupt_me = [(i, "") for i in range(len(self.sh0_orig))] + for i,expected in corrupt_me: + # All these tests result in a successful download. What we're + # measuring is how many shares the downloader had to use. + d.addCallback(self._corrupt_flip, imm_uri, i) + d.addCallback(_download, imm_uri, i, expected) + d.addCallback(lambda ign: self.restore_all_shares(self.shares)) + d.addCallback(fireEventually) + corrupt_values = [(3, 2, "no-sh0"), + (15, 2, "need-4th"), # share looks v2 + ] + for i,newvalue,expected in corrupt_values: + d.addCallback(self._corrupt_set, imm_uri, i, newvalue) + d.addCallback(_download, imm_uri, i, expected) + d.addCallback(lambda ign: self.restore_all_shares(self.shares)) + d.addCallback(fireEventually) + return d + d.addCallback(_uploaded) + def _show_results(ign): + print + print ("of [0:%d], corruption ignored in %s" % + (len(self.sh0_orig), undetected.dump())) + if self.catalog_detection: + d.addCallback(_show_results) + # of [0:2070], corruption ignored in len=1133: + # [4-11],[16-23],[28-31],[152-439],[600-663],[1309-2069] + # [4-11]: container sizes + # [16-23]: share block/data sizes + # [152-375]: plaintext hash tree + # [376-408]: crypttext_hash_tree[0] (root) + # [408-439]: crypttext_hash_tree[1] (computed) + # [600-631]: block hash tree[0] (root) + # [632-663]: block hash tree[1] (computed) + # [1309-]: reserved+unused UEB space + return d + + def test_failure(self): + # this test corrupts all shares in the same way, and asserts that the + # download fails. + + self.basedir = "download/Corruption/failure" + self.set_up_grid() + self.c0 = self.g.clients[0] + + # to exercise the block-hash-tree code properly, we need to have + # multiple segments. We don't tell the downloader about the different + # segsize, so it guesses wrong and must do extra roundtrips. + u = upload.Data(plaintext, None) + u.max_segment_size = 120 # 3 segs, 4-wide hashtree + + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + self.shares = self.copy_shares(imm_uri) + + corrupt_me = [(48, "block data", "Last failure: None"), + (600+2*32, "block_hashes[2]", "BadHashError"), + (376+2*32, "crypttext_hash_tree[2]", "BadHashError"), + (824, "share_hashes", "BadHashError"), + ] + def _download(imm_uri): + n = self.c0.create_node_from_uri(imm_uri) + # for this test to work, we need to have a new Node each time. + # Make sure the NodeMaker's weakcache hasn't interfered. + assert not n._cnode._node._shares + return download_to_data(n) + + d = defer.succeed(None) + for i,which,substring in corrupt_me: + # All these tests result in a failed download. + d.addCallback(self._corrupt_flip_all, imm_uri, i) + d.addCallback(lambda ign: + self.shouldFail(NotEnoughSharesError, which, + substring, + _download, imm_uri)) + d.addCallback(lambda ign: self.restore_all_shares(self.shares)) + d.addCallback(fireEventually) + return d + d.addCallback(_uploaded) + + return d + + def _corrupt_flip_all(self, ign, imm_uri, which): + def _corruptor(s, debug=False): + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + self.corrupt_all_shares(imm_uri, _corruptor) + +class DownloadV2(_Base, unittest.TestCase): + # tests which exercise v2-share code. They first upload a file with + # FORCE_V2 set. + + def setUp(self): + d = defer.maybeDeferred(_Base.setUp, self) + def _set_force_v2(ign): + self.old_force_v2 = layout.FORCE_V2 + layout.FORCE_V2 = True + d.addCallback(_set_force_v2) + return d + def tearDown(self): + layout.FORCE_V2 = self.old_force_v2 + return _Base.tearDown(self) + + def test_download(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file + u = upload.Data(plaintext, None) + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + n = self.c0.create_node_from_uri(imm_uri) + return download_to_data(n) + d.addCallback(_uploaded) + return d + + def test_download_no_overrun(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # tweak the client's copies of server-version data, so it believes + # that they're old and can't handle reads that overrun the length of + # the share. This exercises a different code path. + for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v1["tolerates-immutable-read-overrun"] = False + + # upload a file + u = upload.Data(plaintext, None) + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + n = self.c0.create_node_from_uri(imm_uri) + return download_to_data(n) + d.addCallback(_uploaded) + return d + + def OFF_test_no_overrun_corrupt_shver(self): # unnecessary + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v1["tolerates-immutable-read-overrun"] = False + + # upload a file + u = upload.Data(plaintext, None) + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + def _do_corrupt(which, newvalue): + def _corruptor(s, debug=False): + return s[:which] + chr(newvalue) + s[which+1:] + self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + _do_corrupt(12+3, 0x00) + n = self.c0.create_node_from_uri(imm_uri) + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + return d + d.addCallback(_uploaded) + return d hunk ./src/allmydata/test/test_encode.py 3 from zope.interface import implements from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.python.failure import Failure from foolscap.api import fireEventually hunk ./src/allmydata/test/test_encode.py 6 -from allmydata import hashtree, uri -from allmydata.immutable import encode, upload, download +from allmydata import uri +from allmydata.immutable import encode, upload, checker from allmydata.util import hashutil from allmydata.util.assertutil import _assert hunk ./src/allmydata/test/test_encode.py 10 -from allmydata.util.consumer import MemoryConsumer -from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ - NotEnoughSharesError, IStorageBroker, UploadUnhappinessError -from allmydata.monitor import Monitor -import allmydata.test.common_util as testutil +from allmydata.util.consumer import download_to_data +from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader +from allmydata.test.no_network import GridTestMixin class LostPeerError(Exception): pass hunk ./src/allmydata/test/test_encode.py 20 def flip_bit(good): # flips the last bit return good[:-1] + chr(ord(good[-1]) ^ 0x01) -class FakeStorageBroker: - implements(IStorageBroker) - class FakeBucketReaderWriterProxy: implements(IStorageBucketWriter, IStorageBucketReader) # these are used for both reading and writing hunk ./src/allmydata/test/test_encode.py 57 self.blocks[segmentnum] = data return defer.maybeDeferred(_try) - def put_plaintext_hashes(self, hashes): - def _try(): - assert not self.closed - assert not self.plaintext_hashes - self.plaintext_hashes = hashes - return defer.maybeDeferred(_try) - def put_crypttext_hashes(self, hashes): def _try(): assert not self.closed hunk ./src/allmydata/test/test_encode.py 214 fb = FakeBucketReaderWriterProxy() fb.put_uri_extension(uebstring) verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE) - vup = download.ValidatedExtendedURIProxy(fb, verifycap) + vup = checker.ValidatedExtendedURIProxy(fb, verifycap) return vup.start() def _test_accept(self, uebdict): hunk ./src/allmydata/test/test_encode.py 228 def _test_reject(self, uebdict): d = self._test(uebdict) - d.addBoth(self._should_fail, (KeyError, download.BadURIExtension)) + d.addBoth(self._should_fail, (KeyError, checker.BadURIExtension)) return d def test_accept_minimal(self): hunk ./src/allmydata/test/test_encode.py 324 return d - # a series of 3*3 tests to check out edge conditions. One axis is how the - # plaintext is divided into segments: kn+(-1,0,1). Another way to express - # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we - # might test 74 bytes, 75 bytes, and 76 bytes. - - # on the other axis is how many leaves in the block hash tree we wind up - # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns - # into a single leaf. So we'd like to check out, e.g., 3 segments, 4 - # segments, and 5 segments. - - # that results in the following series of data lengths: - # 3 segs: 74, 75, 51 - # 4 segs: 99, 100, 76 - # 5 segs: 124, 125, 101 - - # all tests encode to 100 shares, which means the share hash tree will - # have 128 leaves, which means that buckets will be given an 8-long share - # hash chain - - # all 3-segment files will have a 4-leaf blockhashtree, and thus expect - # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash - # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash - # trees, which get 15 blockhashes. - def test_send_74(self): # 3 segments (25, 25, 24) return self.do_encode(25, 74, 100, 3, 7, 8) hunk ./src/allmydata/test/test_encode.py 354 # 5 segments: 25, 25, 25, 25, 1 return self.do_encode(25, 101, 100, 5, 15, 8) -class PausingConsumer(MemoryConsumer): - def __init__(self): - MemoryConsumer.__init__(self) - self.size = 0 - self.writes = 0 - def write(self, data): - self.size += len(data) - self.writes += 1 - if self.writes <= 2: - # we happen to use 4 segments, and want to avoid pausing on the - # last one (since then the _unpause timer will still be running) - self.producer.pauseProducing() - reactor.callLater(0.1, self._unpause) - return MemoryConsumer.write(self, data) - def _unpause(self): - self.producer.resumeProducing() - -class PausingAndStoppingConsumer(PausingConsumer): - def write(self, data): - self.producer.pauseProducing() - reactor.callLater(0.5, self._stop) - def _stop(self): - self.producer.stopProducing() - -class StoppingConsumer(PausingConsumer): - def write(self, data): - self.producer.stopProducing() - -class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): - timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. - def send_and_recover(self, k_and_happy_and_n=(25,75,100), - AVAILABLE_SHARES=None, - datalen=76, - max_segment_size=25, - bucket_modes={}, - recover_mode="recover", - consumer=None, - ): - if AVAILABLE_SHARES is None: - AVAILABLE_SHARES = k_and_happy_and_n[2] - data = make_data(datalen) - d = self.send(k_and_happy_and_n, AVAILABLE_SHARES, - max_segment_size, bucket_modes, data) - # that fires with (uri_extension_hash, e, shareholders) - d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode, - consumer=consumer) - # that fires with newdata - def _downloaded((newdata, fd)): - self.failUnless(newdata == data, str((len(newdata), len(data)))) - return fd - d.addCallback(_downloaded) - return d - - def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size, - bucket_modes, data): - k, happy, n = k_and_happy_and_n - NUM_SHARES = k_and_happy_and_n[2] - if AVAILABLE_SHARES is None: - AVAILABLE_SHARES = NUM_SHARES - e = encode.Encoder() - u = upload.Data(data, convergence="some convergence string") - # force use of multiple segments by using a low max_segment_size - u.max_segment_size = max_segment_size - u.encoding_param_k = k - u.encoding_param_happy = happy - u.encoding_param_n = n - eu = upload.EncryptAnUploadable(u) - d = e.set_encrypted_uploadable(eu) - - shareholders = {} - def _ready(res): - k,happy,n = e.get_param("share_counts") - assert n == NUM_SHARES # else we'll be completely confused - servermap = {} - for shnum in range(NUM_SHARES): - mode = bucket_modes.get(shnum, "good") - peer = FakeBucketReaderWriterProxy(mode, "peer%d" % shnum) - shareholders[shnum] = peer - servermap.setdefault(shnum, set()).add(peer.get_peerid()) - e.set_shareholders(shareholders, servermap) - return e.start() - d.addCallback(_ready) - def _sent(res): - d1 = u.get_encryption_key() - d1.addCallback(lambda key: (res, key, shareholders)) - return d1 - d.addCallback(_sent) - return d - - def recover(self, (res, key, shareholders), AVAILABLE_SHARES, - recover_mode, consumer=None): - verifycap = res - - if "corrupt_key" in recover_mode: - # we corrupt the key, so that the decrypted data is corrupted and - # will fail the plaintext hash check. Since we're manually - # attaching shareholders, the fact that the storage index is also - # corrupted doesn't matter. - key = flip_bit(key) - - u = uri.CHKFileURI(key=key, - uri_extension_hash=verifycap.uri_extension_hash, - needed_shares=verifycap.needed_shares, - total_shares=verifycap.total_shares, - size=verifycap.size) - - sb = FakeStorageBroker() - if not consumer: - consumer = MemoryConsumer() - innertarget = download.ConsumerAdapter(consumer) - target = download.DecryptingTarget(innertarget, u.key) - fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor()) - - # we manually cycle the CiphertextDownloader through a number of steps that - # would normally be sequenced by a Deferred chain in - # CiphertextDownloader.start(), to give us more control over the process. - # In particular, by bypassing _get_all_shareholders, we skip - # permuted-peerlist selection. - for shnum, bucket in shareholders.items(): - if shnum < AVAILABLE_SHARES and bucket.closed: - fd.add_share_bucket(shnum, bucket) - fd._got_all_shareholders(None) - - # Make it possible to obtain uri_extension from the shareholders. - # Arrange for shareholders[0] to be the first, so we can selectively - # corrupt the data it returns. - uri_extension_sources = shareholders.values() - uri_extension_sources.remove(shareholders[0]) - uri_extension_sources.insert(0, shareholders[0]) - - d = defer.succeed(None) - - # have the CiphertextDownloader retrieve a copy of uri_extension itself - d.addCallback(fd._obtain_uri_extension) hunk ./src/allmydata/test/test_encode.py 355 - if "corrupt_crypttext_hashes" in recover_mode: - # replace everybody's crypttext hash trees with a different one - # (computed over a different file), then modify our uri_extension - # to reflect the new crypttext hash tree root - def _corrupt_crypttext_hashes(unused): - assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup - assert fd._vup.crypttext_root_hash, fd._vup - badhash = hashutil.tagged_hash("bogus", "data") - bad_crypttext_hashes = [badhash] * fd._vup.num_segments - badtree = hashtree.HashTree(bad_crypttext_hashes) - for bucket in shareholders.values(): - bucket.crypttext_hashes = list(badtree) - fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments) - fd._crypttext_hash_tree.set_hashes({0: badtree[0]}) - return fd._vup - d.addCallback(_corrupt_crypttext_hashes) +class Roundtrip(GridTestMixin, unittest.TestCase): hunk ./src/allmydata/test/test_encode.py 357 - # also have the CiphertextDownloader ask for hash trees - d.addCallback(fd._get_crypttext_hash_tree) - - d.addCallback(fd._download_all_segments) - d.addCallback(fd._done) - def _done(t): - newdata = "".join(consumer.chunks) - return (newdata, fd) - d.addCallback(_done) - return d - - def test_not_enough_shares(self): - d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(NotEnoughSharesError)) - d.addBoth(_done) - return d - - def test_one_share_per_peer(self): - return self.send_and_recover() - - def test_74(self): - return self.send_and_recover(datalen=74) - def test_75(self): - return self.send_and_recover(datalen=75) - def test_51(self): - return self.send_and_recover(datalen=51) - - def test_99(self): - return self.send_and_recover(datalen=99) - def test_100(self): - return self.send_and_recover(datalen=100) - def test_76(self): - return self.send_and_recover(datalen=76) - - def test_124(self): - return self.send_and_recover(datalen=124) - def test_125(self): - return self.send_and_recover(datalen=125) - def test_101(self): - return self.send_and_recover(datalen=101) - - def test_pause(self): - # use a download target that does pauseProducing/resumeProducing a - # few times, then finishes - c = PausingConsumer() - d = self.send_and_recover(consumer=c) - return d - - def test_pause_then_stop(self): - # use a download target that pauses, then stops. - c = PausingAndStoppingConsumer() - d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop", - "our Consumer called stopProducing()", - self.send_and_recover, consumer=c) - return d - - def test_stop(self): - # use a download targetthat does an immediate stop (ticket #473) - c = StoppingConsumer() - d = self.shouldFail(download.DownloadStopped, "test_stop", - "our Consumer called stopProducing()", - self.send_and_recover, consumer=c) - return d - - # the following tests all use 4-out-of-10 encoding - - def test_bad_blocks(self): - # the first 6 servers have bad blocks, which will be caught by the - # blockhashes - modemap = dict([(i, "bad block") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_bad_blocks_failure(self): - # the first 7 servers have bad blocks, which will be caught by the - # blockhashes, and the download will fail - modemap = dict([(i, "bad block") - for i in range(7)] - + [(i, "good") - for i in range(7, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure), res) - self.failUnless(res.check(NotEnoughSharesError), res) - d.addBoth(_done) - return d - - def test_bad_blockhashes(self): - # the first 6 servers have bad block hashes, so the blockhash tree - # will not validate - modemap = dict([(i, "bad blockhash") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_bad_blockhashes_failure(self): - # the first 7 servers have bad block hashes, so the blockhash tree - # will not validate, and the download will fail - modemap = dict([(i, "bad blockhash") - for i in range(7)] - + [(i, "good") - for i in range(7, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(NotEnoughSharesError), res) - d.addBoth(_done) - return d - - def test_bad_sharehashes(self): - # the first 6 servers have bad block hashes, so the sharehash tree - # will not validate - modemap = dict([(i, "bad sharehash") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def assertFetchFailureIn(self, fd, where): - expected = {"uri_extension": 0, - "crypttext_hash_tree": 0, - } - if where is not None: - expected[where] += 1 - self.failUnlessEqual(fd._fetch_failures, expected) - - def test_good(self): - # just to make sure the test harness works when we aren't - # intentionally causing failures - modemap = dict([(i, "good") for i in range(0, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, None) - return d - - def test_bad_uri_extension(self): - # the first server has a bad uri_extension block, so we will fail - # over to a different server. - modemap = dict([(i, "bad uri_extension") for i in range(1)] + - [(i, "good") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, "uri_extension") - return d - - def test_bad_crypttext_hashroot(self): - # the first server has a bad crypttext hashroot, so we will fail - # over to a different server. - modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] + - [(i, "good") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree") - return d - - def test_bad_crypttext_hashes(self): - # the first server has a bad crypttext hash block, so we will fail - # over to a different server. - modemap = dict([(i, "bad crypttext hash") for i in range(1)] + - [(i, "good") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree") - return d - - def test_bad_crypttext_hashes_failure(self): - # to test that the crypttext merkle tree is really being applied, we - # sneak into the download process and corrupt two things: we replace - # everybody's crypttext hashtree with a bad version (computed over - # bogus data), and we modify the supposedly-validated uri_extension - # block to match the new crypttext hashtree root. The download - # process should notice that the crypttext coming out of FEC doesn't - # match the tree, and fail. - - modemap = dict([(i, "good") for i in range(0, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap, - recover_mode=("corrupt_crypttext_hashes")) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(hashtree.BadHashError), res) - d.addBoth(_done) - return d - - def OFF_test_bad_plaintext(self): - # faking a decryption failure is easier: just corrupt the key - modemap = dict([(i, "good") for i in range(0, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap, - recover_mode=("corrupt_key")) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(hashtree.BadHashError), res) - d.addBoth(_done) - return d + # a series of 3*3 tests to check out edge conditions. One axis is how the + # plaintext is divided into segments: kn+(-1,0,1). Another way to express + # this is n%k == -1 or 0 or 1. For example, for 25-byte segments, we + # might test 74 bytes, 75 bytes, and 76 bytes. hunk ./src/allmydata/test/test_encode.py 362 - def test_bad_sharehashes_failure(self): - # all ten servers have bad share hashes, so the sharehash tree - # will not validate, and the download will fail - modemap = dict([(i, "bad sharehash") - for i in range(10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(NotEnoughSharesError)) - d.addBoth(_done) - return d + # on the other axis is how many leaves in the block hash tree we wind up + # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns + # into a single leaf. So we'd like to check out, e.g., 3 segments, 4 + # segments, and 5 segments. hunk ./src/allmydata/test/test_encode.py 367 - def test_missing_sharehashes(self): - # the first 6 servers are missing their sharehashes, so the - # sharehash tree will not validate - modemap = dict([(i, "missing sharehash") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) + # that results in the following series of data lengths: + # 3 segs: 74, 75, 51 + # 4 segs: 99, 100, 76 + # 5 segs: 124, 125, 101 hunk ./src/allmydata/test/test_encode.py 372 - def test_missing_sharehashes_failure(self): - # all servers are missing their sharehashes, so the sharehash tree will not validate, - # and the download will fail - modemap = dict([(i, "missing sharehash") - for i in range(10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure), res) - self.failUnless(res.check(NotEnoughSharesError), res) - d.addBoth(_done) - return d + # all tests encode to 100 shares, which means the share hash tree will + # have 128 leaves, which means that buckets will be given an 8-long share + # hash chain hunk ./src/allmydata/test/test_encode.py 376 - def test_lost_one_shareholder(self): - # we have enough shareholders when we start, but one segment in we - # lose one of them. The upload should still succeed, as long as we - # still have 'servers_of_happiness' peers left. - modemap = dict([(i, "good") for i in range(9)] + - [(i, "lost") for i in range(9, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) + # all 3-segment files will have a 4-leaf blockhashtree, and thus expect + # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash + # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash + # trees, which gets 15 blockhashes. hunk ./src/allmydata/test/test_encode.py 381 - def test_lost_one_shareholder_early(self): - # we have enough shareholders when we choose peers, but just before - # we send the 'start' message, we lose one of them. The upload should - # still succeed, as long as we still have 'servers_of_happiness' peers - # left. - modemap = dict([(i, "good") for i in range(9)] + - [(i, "lost-early") for i in range(9, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) + def test_74(self): return self.do_test_size(74) + def test_75(self): return self.do_test_size(75) + def test_51(self): return self.do_test_size(51) + def test_99(self): return self.do_test_size(99) + def test_100(self): return self.do_test_size(100) + def test_76(self): return self.do_test_size(76) + def test_124(self): return self.do_test_size(124) + def test_125(self): return self.do_test_size(125) + def test_101(self): return self.do_test_size(101) hunk ./src/allmydata/test/test_encode.py 391 - def test_lost_many_shareholders(self): - # we have enough shareholders when we start, but one segment in we - # lose all but one of them. The upload should fail. - modemap = dict([(i, "good") for i in range(1)] + - [(i, "lost") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(UploadUnhappinessError), res) - d.addBoth(_done) + def upload(self, data): + u = upload.Data(data, None) + u.max_segment_size = 25 + u.encoding_param_k = 25 + u.encoding_param_happy = 1 + u.encoding_param_n = 100 + d = self.c0.upload(u) + d.addCallback(lambda ur: self.c0.create_node_from_uri(ur.uri)) + # returns a FileNode return d hunk ./src/allmydata/test/test_encode.py 402 - def test_lost_all_shareholders(self): - # we have enough shareholders when we start, but one segment in we - # lose all of them. The upload should fail. - modemap = dict([(i, "lost") for i in range(10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(UploadUnhappinessError)) - d.addBoth(_done) + def do_test_size(self, size): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + DATA = "p"*size + d = self.upload(DATA) + d.addCallback(lambda n: download_to_data(n)) + def _downloaded(newdata): + self.failUnlessEqual(newdata, DATA) + d.addCallback(_downloaded) return d hunk ./src/allmydata/test/test_filenode.py 5 from twisted.trial import unittest from allmydata import uri, client from allmydata.monitor import Monitor -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode from allmydata.mutable.filenode import MutableFileNode hunk ./src/allmydata/test/test_filenode.py 8 -from allmydata.util import hashutil, cachedir +from allmydata.util import hashutil from allmydata.util.consumer import download_to_data class NotANode: hunk ./src/allmydata/test/test_filenode.py 34 needed_shares=3, total_shares=10, size=1000) - cf = cachedir.CacheFile("none") - fn1 = ImmutableFileNode(u, None, None, None, None, cf) - fn2 = ImmutableFileNode(u, None, None, None, None, cf) + fn1 = ImmutableFileNode(u, None, None, None, None) + fn2 = ImmutableFileNode(u, None, None, None, None) self.failUnlessEqual(fn1, fn2) self.failIfEqual(fn1, "I am not a filenode") self.failIfEqual(fn1, NotANode()) hunk ./src/allmydata/test/test_hung_server.py 12 from allmydata.mutable.common import UnrecoverableFileError from allmydata.storage.common import storage_index_to_dir from allmydata.test.no_network import GridTestMixin -from allmydata.test.common import ShouldFailMixin, _corrupt_share_data +from allmydata.test.common import ShouldFailMixin +from allmydata.util.pollmixin import PollMixin from allmydata.interfaces import NotEnoughSharesError immutable_plaintext = "data" * 10000 hunk ./src/allmydata/test/test_hung_server.py 19 mutable_plaintext = "muta" * 10000 -class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): +class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, + unittest.TestCase): # Many of these tests take around 60 seconds on François's ARM buildslave: # http://tahoe-lafs.org/buildbot/builders/FranXois%20lenny-armv5tel hunk ./src/allmydata/test/test_hung_server.py 23 - # allmydata.test.test_hung_server.HungServerDownloadTest.test_2_good_8_broken_duplicate_share_fail once ERRORed after 197 seconds on Midnight Magic's NetBSD buildslave: + # allmydata.test.test_hung_server.HungServerDownloadTest.test_2_good_8_broken_duplicate_share_fail + # once ERRORed after 197 seconds on Midnight Magic's NetBSD buildslave: # http://tahoe-lafs.org/buildbot/builders/MM%20netbsd4%20i386%20warp # MM's buildslave varies a lot in how long it takes to run tests. hunk ./src/allmydata/test/test_hung_server.py 42 for (id, ss) in servers: self.g.unhang_server(id, **kwargs) + def _hang_shares(self, shnums, **kwargs): + # hang all servers who are holding the given shares + hung_serverids = set() + for (i_shnum, i_serverid, i_sharefile) in self.shares: + if i_shnum in shnums: + if i_serverid not in hung_serverids: + self.g.hang_server(i_serverid, **kwargs) + hung_serverids.add(i_serverid) + def _delete_all_shares_from(self, servers): serverids = [id for (id, ss) in servers] for (i_shnum, i_serverid, i_sharefile) in self.shares: hunk ./src/allmydata/test/test_hung_server.py 128 stage_4_d = None # currently we aren't doing any tests which require this for mutable files else: d = download_to_data(n) - stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME + #stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME + stage_4_d = None return (d, stage_4_d,) def _wait_for_data(self, n): hunk ./src/allmydata/test/test_hung_server.py 157 self._download_and_check) else: return self.shouldFail(NotEnoughSharesError, self.basedir, - "Failed to get enough shareholders", + "ran out of shares", self._download_and_check) hunk ./src/allmydata/test/test_hung_server.py 220 # The tests below do not currently pass for mutable files. - def test_3_good_7_hung(self): + def test_3_good_7_hung_immutable(self): d = defer.succeed(None) hunk ./src/allmydata/test/test_hung_server.py 222 - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung")) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._download_and_check()) + d.addCallback(lambda ign: self._set_up(False, "test_3_good_7_hung")) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._download_and_check()) return d hunk ./src/allmydata/test/test_hung_server.py 227 - def test_2_good_8_hung_then_1_recovers(self): + def test_5_overdue_immutable(self): + # restrict the ShareFinder to only allow 5 outstanding requests, and + # arrange for the first 5 servers to hang. Then trigger the OVERDUE + # timers (simulating 10 seconds passed), at which point the + # ShareFinder should send additional queries and finish the download + # quickly. If we didn't have OVERDUE timers, this test would fail by + # timing out. + done = [] + d = self._set_up(False, "test_5_overdue_immutable") + def _reduce_max_outstanding_requests_and_download(ign): + self._hang_shares(range(5)) + n = self.c0.create_node_from_uri(self.uri) + self._sf = n._cnode._node._sharefinder + self._sf.max_outstanding_requests = 5 + self._sf.OVERDUE_TIMEOUT = 1000.0 + d2 = download_to_data(n) + # start download, but don't wait for it to complete yet + def _done(res): + done.append(res) # we will poll for this later + d2.addBoth(_done) + d.addCallback(_reduce_max_outstanding_requests_and_download) + from foolscap.eventual import fireEventually, flushEventualQueue + # wait here a while + d.addCallback(lambda res: fireEventually(res)) + d.addCallback(lambda res: flushEventualQueue()) + d.addCallback(lambda ign: self.failIf(done)) + def _check_waiting(ign): + # all the share requests should now be stuck waiting + self.failUnlessEqual(len(self._sf.pending_requests), 5) + # but none should be marked as OVERDUE until the timers expire + self.failUnlessEqual(len(self._sf.overdue_requests), 0) + d.addCallback(_check_waiting) + def _mark_overdue(ign): + # declare four requests overdue, allowing new requests to take + # their place, and leaving one stuck. The finder will keep + # sending requests until there are 5 non-overdue ones + # outstanding, at which point we'll have 4 OVERDUE, 1 + # stuck-but-not-overdue, and 4 live requests. All 4 live requests + # will retire before the download is complete and the ShareFinder + # is shut off. That will leave 4 OVERDUE and 1 + # stuck-but-not-overdue, for a total of 5 requests in in + # _sf.pending_requests + for t in self._sf.overdue_timers.values()[:4]: + t.reset(-1.0) + # the timers ought to fire before the eventual-send does + return fireEventually() + d.addCallback(_mark_overdue) + def _we_are_done(): + return bool(done) + d.addCallback(lambda ign: self.poll(_we_are_done)) + def _check_done(ign): + self.failUnlessEqual(done, [immutable_plaintext]) + self.failUnlessEqual(len(self._sf.pending_requests), 5) + self.failUnlessEqual(len(self._sf.overdue_requests), 4) + d.addCallback(_check_done) + return d + + def test_3_good_7_hung_mutable(self): + raise unittest.SkipTest("still broken") d = defer.succeed(None) hunk ./src/allmydata/test/test_hung_server.py 287 - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers")) - d.addCallback(lambda ign: self._hang(self.servers[2:3])) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._unhang(self.servers[2:3])) - d.addCallback(lambda ign: self._download_and_check()) + d.addCallback(lambda ign: self._set_up(True, "test_3_good_7_hung")) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._download_and_check()) return d hunk ./src/allmydata/test/test_hung_server.py 292 - def test_2_good_8_hung_then_1_recovers_with_2_shares(self): + def test_2_good_8_hung_then_1_recovers_immutable(self): d = defer.succeed(None) hunk ./src/allmydata/test/test_hung_server.py 294 - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares")) - d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) - d.addCallback(lambda ign: self._hang(self.servers[2:3])) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._unhang(self.servers[2:3])) - d.addCallback(lambda ign: self._download_and_check()) + d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers")) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) + return d + + def test_2_good_8_hung_then_1_recovers_mutable(self): + raise unittest.SkipTest("still broken") + d = defer.succeed(None) + d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers")) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) return d hunk ./src/allmydata/test/test_hung_server.py 311 - def test_failover_during_stage_4(self): - # See #287 + def test_2_good_8_hung_then_1_recovers_with_2_shares_immutable(self): d = defer.succeed(None) hunk ./src/allmydata/test/test_hung_server.py 313 - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) - d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data)) - d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._start_download()) - def _after_starting_download((doned, started4d)): - started4d.addCallback(lambda ign: self._unhang(self.servers[3:4])) - doned.addCallback(self._check) - return doned - d.addCallback(_after_starting_download) + d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers_with_2_shares")) + d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) + return d hunk ./src/allmydata/test/test_hung_server.py 321 + def test_2_good_8_hung_then_1_recovers_with_2_shares_mutable(self): + raise unittest.SkipTest("still broken") + d = defer.succeed(None) + d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers_with_2_shares")) + d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) return d hunk ./src/allmydata/test/test_immutable.py 8 from twisted.trial import unittest import random -class Test(common.ShareManglingMixin, unittest.TestCase): +class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase): def test_test_code(self): # The following process of stashing the shares, running # replace_shares, and asserting that the new set of shares equals the hunk ./src/allmydata/test/test_immutable.py 21 return res d.addCallback(_stash_it) - # The following process of deleting 8 of the shares and asserting that you can't - # download it is more to test this test code than to test the Tahoe code... + # The following process of deleting 8 of the shares and asserting + # that you can't download it is more to test this test code than to + # test the Tahoe code... def _then_delete_8(unused=None): self.replace_shares(stash[0], storage_index=self.uri.get_storage_index()) for i in range(8): hunk ./src/allmydata/test/test_immutable.py 46 return d def test_download(self): - """ Basic download. (This functionality is more or less already tested by test code in - other modules, but this module is also going to test some more specific things about - immutable download.) + """ Basic download. (This functionality is more or less already + tested by test code in other modules, but this module is also going + to test some more specific things about immutable download.) """ d = defer.succeed(None) before_download_reads = self._count_reads() hunk ./src/allmydata/test/test_immutable.py 54 def _after_download(unused=None): after_download_reads = self._count_reads() - self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) + #print before_download_reads, after_download_reads + self.failIf(after_download_reads-before_download_reads > 27, + (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d hunk ./src/allmydata/test/test_immutable.py 62 def test_download_from_only_3_remaining_shares(self): - """ Test download after 7 random shares (of the 10) have been removed. """ + """ Test download after 7 random shares (of the 10) have been + removed.""" d = defer.succeed(None) def _then_delete_7(unused=None): for i in range(7): hunk ./src/allmydata/test/test_immutable.py 72 d.addCallback(_then_delete_7) def _after_download(unused=None): after_download_reads = self._count_reads() + #print before_download_reads, after_download_reads self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) hunk ./src/allmydata/test/test_immutable.py 79 return d def test_download_from_only_3_shares_with_good_crypttext_hash(self): - """ Test download after 7 random shares (of the 10) have had their crypttext hash tree corrupted. """ + """ Test download after 7 random shares (of the 10) have had their + crypttext hash tree corrupted.""" d = defer.succeed(None) def _then_corrupt_7(unused=None): shnums = range(10) hunk ./src/allmydata/test/test_immutable.py 93 return d def test_download_abort_if_too_many_missing_shares(self): - """ Test that download gives up quickly when it realizes there aren't enough shares out - there.""" - d = defer.succeed(None) - def _then_delete_8(unused=None): - for i in range(8): - self._delete_a_share() - d.addCallback(_then_delete_8) - - before_download_reads = self._count_reads() - def _attempt_to_download(unused=None): - d2 = download_to_data(self.n) - - def _callb(res): - self.fail("Should have gotten an error from attempt to download, not %r" % (res,)) - def _errb(f): - self.failUnless(f.check(NotEnoughSharesError)) - d2.addCallbacks(_callb, _errb) - return d2 - - d.addCallback(_attempt_to_download) - - def _after_attempt(unused=None): - after_download_reads = self._count_reads() - # To pass this test, you are required to give up before actually trying to read any - # share data. - self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads)) - d.addCallback(_after_attempt) + """ Test that download gives up quickly when it realizes there aren't + enough shares out there.""" + for i in range(8): + self._delete_a_share() + d = self.shouldFail(NotEnoughSharesError, "delete 8", None, + download_to_data, self.n) + # the new downloader pipelines a bunch of read requests in parallel, + # so don't bother asserting anything about the number of reads return d def test_download_abort_if_too_many_corrupted_shares(self): hunk ./src/allmydata/test/test_immutable.py 104 - """ Test that download gives up quickly when it realizes there aren't enough uncorrupted - shares out there. It should be able to tell because the corruption occurs in the - sharedata version number, which it checks first.""" + """Test that download gives up quickly when it realizes there aren't + enough uncorrupted shares out there. It should be able to tell + because the corruption occurs in the sharedata version number, which + it checks first.""" d = defer.succeed(None) def _then_corrupt_8(unused=None): shnums = range(10) hunk ./src/allmydata/test/test_immutable.py 131 def _after_attempt(unused=None): after_download_reads = self._count_reads() - # To pass this test, you are required to give up before reading all of the share - # data. Actually, we could give up sooner than 45 reads, but currently our download - # code does 45 reads. This test then serves as a "performance regression detector" - # -- if you change download code so that it takes *more* reads, then this test will - # fail. - self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads)) + #print before_download_reads, after_download_reads + # To pass this test, you are required to give up before reading + # all of the share data. Actually, we could give up sooner than + # 45 reads, but currently our download code does 45 reads. This + # test then serves as a "performance regression detector" -- if + # you change download code so that it takes *more* reads, then + # this test will fail. + self.failIf(after_download_reads-before_download_reads > 45, + (after_download_reads, before_download_reads)) d.addCallback(_after_attempt) return d hunk ./src/allmydata/test/test_immutable.py 144 -# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example +# XXX extend these tests to show bad behavior of various kinds from servers: +# raising exception from each remove_foo() method, for example # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit hunk ./src/allmydata/test/test_immutable.py 149 +# TODO: delete this whole file hunk ./src/allmydata/test/test_mutable.py 200 keygen = client.KeyGenerator() keygen.set_default_keysize(522) nodemaker = NodeMaker(storage_broker, sh, None, - None, None, None, + None, None, {"k": 3, "n": 10}, keygen) return nodemaker hunk ./src/allmydata/test/test_repairer.py 6 from allmydata.monitor import Monitor from allmydata import check_results from allmydata.interfaces import NotEnoughSharesError -from allmydata.immutable import repairer, upload +from allmydata.immutable import upload from allmydata.util.consumer import download_to_data from twisted.internet import defer from twisted.trial import unittest hunk ./src/allmydata/test/test_repairer.py 366 # Optimally, you could repair one of these (small) files in a single write. DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY -class DownUpConnector(unittest.TestCase): - def test_deferred_satisfaction(self): - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - # case 1: total data in buf is < requested data at time of request - duc.write('\x01') - d = duc.read_encrypted(2, False) - def _then(data): - self.failUnlessEqual(len(data), 2) - self.failUnlessEqual(data[0], '\x01') - self.failUnlessEqual(data[1], '\x02') - d.addCallback(_then) - duc.write('\x02') - return d - - def test_extra(self): - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - # case 1: total data in buf is < requested data at time of request - duc.write('\x01') - d = duc.read_encrypted(2, False) - def _then(data): - self.failUnlessEqual(len(data), 2) - self.failUnlessEqual(data[0], '\x01') - self.failUnlessEqual(data[1], '\x02') - d.addCallback(_then) - duc.write('\x02\0x03') - return d - - def test_short_reads_1(self): - # You don't get fewer bytes than you requested -- instead you get no callback at all. - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - - d = duc.read_encrypted(2, False) - duc.write('\x04') - - def _callb(res): - self.fail("Shouldn't have gotten this callback res: %s" % (res,)) - d.addCallback(_callb) - - # Also in the other order of read-vs-write: - duc2 = repairer.DownUpConnector() - duc2.registerProducer(None, True) # just because you have to call registerProducer first - duc2.write('\x04') - d = duc2.read_encrypted(2, False) - - def _callb2(res): - self.fail("Shouldn't have gotten this callback res: %s" % (res,)) - d.addCallback(_callb2) - - # But once the DUC is closed then you *do* get short reads. - duc3 = repairer.DownUpConnector() - duc3.registerProducer(None, True) # just because you have to call registerProducer first - - d = duc3.read_encrypted(2, False) - duc3.write('\x04') - duc3.close() - def _callb3(res): - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0], '\x04') - d.addCallback(_callb3) - return d - - def test_short_reads_2(self): - # Also in the other order of read-vs-write. - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - - duc.write('\x04') - d = duc.read_encrypted(2, False) - duc.close() - - def _callb(res): - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0], '\x04') - d.addCallback(_callb) - return d - - def test_short_reads_3(self): - # Also if it is closed before the read. - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - - duc.write('\x04') - duc.close() - d = duc.read_encrypted(2, False) - def _callb(res): - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0], '\x04') - d.addCallback(_callb) - return d - class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, common.ShouldFailMixin): hunk ./src/allmydata/test/test_system.py 12 from allmydata.storage.mutable import MutableShareFile from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode from allmydata.util import idlib, mathutil from allmydata.util import log, base32 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding hunk ./src/allmydata/test/test_upload.py 2090 # have a download fail # cancel a download (need to implement more cancel stuff) +# from test_encode: +# NoNetworkGrid, upload part of ciphertext, kill server, continue upload +# check with Kevan, they want to live in test_upload, existing tests might cover +# def test_lost_one_shareholder(self): # these are upload-side tests +# def test_lost_one_shareholder_early(self): +# def test_lost_many_shareholders(self): +# def test_lost_all_shareholders(self): + hunk ./src/allmydata/test/test_util.py 10 from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.python import log +from hashlib import md5 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil from allmydata.util import assertutil, fileutil, deferredutil, abbreviate hunk ./src/allmydata/test/test_util.py 17 from allmydata.util import limiter, time_format, pollmixin, cachedir from allmydata.util import statistics, dictutil, pipeline from allmydata.util import log as tahoe_log +from allmydata.util.spans import Spans, overlap, DataSpans class Base32(unittest.TestCase): def test_b2a_matches_Pythons(self): hunk ./src/allmydata/test/test_util.py 1573 tahoe_log.err(format="intentional sample error", failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ") self.flushLoggedErrors(SampleError) + + +class SimpleSpans: + # this is a simple+inefficient form of util.spans.Spans . We compare the + # behavior of this reference model against the real (efficient) form. + + def __init__(self, _span_or_start=None, length=None): + self._have = set() + if length is not None: + for i in range(_span_or_start, _span_or_start+length): + self._have.add(i) + elif _span_or_start: + for (start,length) in _span_or_start: + self.add(start, length) + + def add(self, start, length): + for i in range(start, start+length): + self._have.add(i) + return self + + def remove(self, start, length): + for i in range(start, start+length): + self._have.discard(i) + return self + + def each(self): + return sorted(self._have) + + def __iter__(self): + items = sorted(self._have) + prevstart = None + prevend = None + for i in items: + if prevstart is None: + prevstart = prevend = i + continue + if i == prevend+1: + prevend = i + continue + yield (prevstart, prevend-prevstart+1) + prevstart = prevend = i + if prevstart is not None: + yield (prevstart, prevend-prevstart+1) + + def __len__(self): + # this also gets us bool(s) + return len(self._have) + + def __add__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.add(start, length) + return s + + def __sub__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.remove(start, length) + return s + + def __iadd__(self, other): + for (start, length) in other: + self.add(start, length) + return self + + def __isub__(self, other): + for (start, length) in other: + self.remove(start, length) + return self + + def __and__(self, other): + s = self.__class__() + for i in other.each(): + if i in self._have: + s.add(i, 1) + return s + + def __contains__(self, (start,length)): + for i in range(start, start+length): + if i not in self._have: + return False + return True + +class ByteSpans(unittest.TestCase): + def test_basic(self): + s = Spans() + self.failUnlessEqual(list(s), []) + self.failIf(s) + self.failIf((0,1) in s) + self.failUnlessEqual(len(s), 0) + + s1 = Spans(3, 4) # 3,4,5,6 + self._check1(s1) + + s2 = Spans(s1) + self._check1(s2) + + s2.add(10,2) # 10,11 + self._check1(s1) + self.failUnless((10,1) in s2) + self.failIf((10,1) in s1) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) + self.failUnlessEqual(len(s2), 6) + + s2.add(15,2).add(20,2) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) + self.failUnlessEqual(len(s2), 10) + + s2.remove(4,3).remove(15,1) + self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) + self.failUnlessEqual(len(s2), 6) + + s1 = SimpleSpans(3, 4) # 3 4 5 6 + s2 = SimpleSpans(5, 4) # 5 6 7 8 + i = s1 & s2 + self.failUnlessEqual(list(i.each()), [5, 6]) + + def _check1(self, s): + self.failUnlessEqual(list(s), [(3,4)]) + self.failUnless(s) + self.failUnlessEqual(len(s), 4) + self.failIf((0,1) in s) + self.failUnless((3,4) in s) + self.failUnless((3,1) in s) + self.failUnless((5,2) in s) + self.failUnless((6,1) in s) + self.failIf((6,2) in s) + self.failIf((7,1) in s) + self.failUnlessEqual(list(s.each()), [3,4,5,6]) + + def test_math(self): + s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 + s2 = Spans(5, 3) # 5,6,7 + s3 = Spans(8, 4) # 8,9,10,11 + + s = s1 - s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = s1 - s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = s2 - s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s1 & s2 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s2 & s1 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s1 & s3 + self.failUnlessEqual(list(s.each()), [8,9]) + s = s3 & s1 + self.failUnlessEqual(list(s.each()), [8,9]) + s = s2 & s3 + self.failUnlessEqual(list(s.each()), []) + s = s3 & s2 + self.failUnlessEqual(list(s.each()), []) + s = Spans() & s3 + self.failUnlessEqual(list(s.each()), []) + s = s3 & Spans() + self.failUnlessEqual(list(s.each()), []) + + s = s1 + s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = s1 + s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = s2 + s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + s = Spans(s1) + s -= s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = Spans(s1) + s -= s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = Spans(s2) + s -= s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + + s = Spans(s1) + s += s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = Spans(s1) + s += s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = Spans(s2) + s += s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + def test_random(self): + # attempt to increase coverage of corner cases by comparing behavior + # of a simple-but-slow model implementation against the + # complex-but-fast actual implementation, in a large number of random + # operations + S1 = SimpleSpans + S2 = Spans + s1 = S1(); s2 = S2() + seed = "" + def _create(subseed): + ns1 = S1(); ns2 = S2() + for i in range(10): + what = md5(subseed+str(i)).hexdigest() + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + ns1.add(start, length); ns2.add(start, length) + return ns1, ns2 + + #print + for i in range(1000): + what = md5(seed+str(i)).hexdigest() + op = what[0] + subop = what[1] + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + #print what + if op in "0": + if subop in "01234": + s1 = S1(); s2 = S2() + elif subop in "5678": + s1 = S1(start, length); s2 = S2(start, length) + else: + s1 = S1(s1); s2 = S2(s2) + #print "s2 = %s" % s2.dump() + elif op in "123": + #print "s2.add(%d,%d)" % (start, length) + s1.add(start, length); s2.add(start, length) + elif op in "456": + #print "s2.remove(%d,%d)" % (start, length) + s1.remove(start, length); s2.remove(start, length) + elif op in "78": + ns1, ns2 = _create(what[7:11]) + #print "s2 + %s" % ns2.dump() + s1 = s1 + ns1; s2 = s2 + ns2 + elif op in "9a": + ns1, ns2 = _create(what[7:11]) + #print "%s - %s" % (s2.dump(), ns2.dump()) + s1 = s1 - ns1; s2 = s2 - ns2 + elif op in "bc": + ns1, ns2 = _create(what[7:11]) + #print "s2 += %s" % ns2.dump() + s1 += ns1; s2 += ns2 + elif op in "de": + ns1, ns2 = _create(what[7:11]) + #print "%s -= %s" % (s2.dump(), ns2.dump()) + s1 -= ns1; s2 -= ns2 + else: + ns1, ns2 = _create(what[7:11]) + #print "%s &= %s" % (s2.dump(), ns2.dump()) + s1 = s1 & ns1; s2 = s2 & ns2 + #print "s2 now %s" % s2.dump() + self.failUnlessEqual(list(s1.each()), list(s2.each())) + self.failUnlessEqual(len(s1), len(s2)) + self.failUnlessEqual(bool(s1), bool(s2)) + self.failUnlessEqual(list(s1), list(s2)) + for j in range(10): + what = md5(what[12:14]+str(j)).hexdigest() + start = int(what[2:4], 16) + length = max(1, int(what[5:6], 16)) + span = (start, length) + self.failUnlessEqual(bool(span in s1), bool(span in s2)) + + + # s() + # s(start,length) + # s(s0) + # s.add(start,length) : returns s + # s.remove(start,length) + # s.each() -> list of byte offsets, mostly for testing + # list(s) -> list of (start,length) tuples, one per span + # (start,length) in s -> True if (start..start+length-1) are all members + # NOT equivalent to x in list(s) + # len(s) -> number of bytes, for testing, bool(), and accounting/limiting + # bool(s) (__len__) + # s = s1+s2, s1-s2, +=s1, -=s1 + + def test_overlap(self): + for a in range(20): + for b in range(10): + for c in range(20): + for d in range(10): + self._test_overlap(a,b,c,d) + + def _test_overlap(self, a, b, c, d): + s1 = set(range(a,a+b)) + s2 = set(range(c,c+d)) + #print "---" + #self._show_overlap(s1, "1") + #self._show_overlap(s2, "2") + o = overlap(a,b,c,d) + expected = s1.intersection(s2) + if not expected: + self.failUnlessEqual(o, None) + else: + start,length = o + so = set(range(start,start+length)) + #self._show(so, "o") + self.failUnlessEqual(so, expected) + + def _show_overlap(self, s, c): + import sys + out = sys.stdout + if s: + for i in range(max(s)): + if i in s: + out.write(c) + else: + out.write(" ") + out.write("\n") + +def extend(s, start, length, fill): + if len(s) >= start+length: + return s + assert len(fill) == 1 + return s + fill*(start+length-len(s)) + +def replace(s, start, data): + assert len(s) >= start+len(data) + return s[:start] + data + s[start+len(data):] + +class SimpleDataSpans: + def __init__(self, other=None): + self.missing = "" # "1" where missing, "0" where found + self.data = "" + if other: + for (start, data) in other.get_chunks(): + self.add(start, data) + + def __len__(self): + return len(self.missing.translate(None, "1")) + def _dump(self): + return [i for (i,c) in enumerate(self.missing) if c == "0"] + def _have(self, start, length): + m = self.missing[start:start+length] + if not m or len(m)" in body, body) body = " ".join(body.strip().split()) - exp = ("NotEnoughSharesError: This indicates that some " + msg = ("NotEnoughSharesError: This indicates that some " "servers were unavailable, or that shares have been " "lost to server departure, hard drive failure, or disk " "corruption. You should perform a filecheck on " hunk ./src/allmydata/test/test_web.py 4208 "this object to learn more. The full error message is:" - " Failed to get enough shareholders: have 1, need 3") - self.failUnlessReallyEqual(exp, body) + " ran out of shares: %d complete, %d pending, 0 overdue," + " 0 unused, need 3. Last failure: None") + msg1 = msg % (1, 0) + msg2 = msg % (0, 1) + self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share) d.addCallback(lambda ignored: hunk ./src/allmydata/util/observer.py 3 # -*- test-case-name: allmydata.test.test_observer -*- +import weakref from twisted.internet import defer from foolscap.api import eventually hunk ./src/allmydata/util/observer.py 95 def notify(self, *args, **kwargs): for o in self._watchers: eventually(o, *args, **kwargs) + +class EventStreamObserver: + """A simple class to distribute multiple events to a single subscriber. + It accepts arbitrary kwargs, but no posargs.""" + def __init__(self): + self._watcher = None + self._undelivered_results = [] + self._canceler = None + + def set_canceler(self, c, methname): + """I will call c.METHNAME(self) when somebody cancels me.""" + # we use a weakref to avoid creating a cycle between us and the thing + # we're observing: they'll be holding a reference to us to compare + # against the value we pass to their canceler function. However, + # since bound methods are first-class objects (and not kept alive by + # the object they're bound to), we can't just stash a weakref to the + # bound cancel method. Instead, we must hold a weakref to the actual + # object, and obtain its cancel method later. + # http://code.activestate.com/recipes/81253-weakmethod/ has an + # alternative. + self._canceler = (weakref.ref(c), methname) + + def subscribe(self, observer, **watcher_kwargs): + self._watcher = (observer, watcher_kwargs) + while self._undelivered_results: + self._notify(self._undelivered_results.pop(0)) + + def notify(self, **result_kwargs): + if self._watcher: + self._notify(result_kwargs) + else: + self._undelivered_results.append(result_kwargs) + + def _notify(self, result_kwargs): + o, watcher_kwargs = self._watcher + kwargs = dict(result_kwargs) + kwargs.update(watcher_kwargs) + eventually(o, **kwargs) + + def cancel(self): + wr,methname = self._canceler + o = wr() + if o: + getattr(o,methname)(self) hunk ./src/allmydata/web/download-status.xhtml 21
  • Status:
  • +

    Download Results

    hunk ./src/allmydata/web/status.py 361 def download_results(self): return defer.maybeDeferred(self.download_status.get_results) + def relative_time(self, t): + if t is None: + return t + if self.download_status.started is not None: + return t - self.download_status.started + return t + def short_relative_time(self, t): + t = self.relative_time(t) + if t is None: + return "" + return "+%.6fs" % t + + def renderHTTP(self, ctx): + req = inevow.IRequest(ctx) + t = get_arg(req, "t") + if t == "json": + return self.json(req) + return rend.Page.renderHTTP(self, ctx) + + def json(self, req): + req.setHeader("content-type", "text/plain") + data = {} + dyhb_events = [] + for serverid,requests in self.download_status.dyhb_requests.iteritems(): + for req in requests: + dyhb_events.append( (base32.b2a(serverid),) + req ) + dyhb_events.sort(key=lambda req: req[1]) + data["dyhb"] = dyhb_events + request_events = [] + for serverid,requests in self.download_status.requests.iteritems(): + for req in requests: + request_events.append( (base32.b2a(serverid),) + req ) + request_events.sort(key=lambda req: (req[4],req[1])) + data["requests"] = request_events + data["segment"] = self.download_status.segment_events + data["read"] = self.download_status.read_events + return simplejson.dumps(data, indent=1) + "\n" + + def render_events(self, ctx, data): + if not self.download_status.storage_index: + return + srt = self.short_relative_time + l = T.ul() + + t = T.table(class_="status-download-events") + t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"], + T.td["shnums"], T.td["RTT"]]] + dyhb_events = [] + for serverid,requests in self.download_status.dyhb_requests.iteritems(): + for req in requests: + dyhb_events.append( (serverid,) + req ) + dyhb_events.sort(key=lambda req: req[1]) + for d_ev in dyhb_events: + (serverid, sent, shnums, received) = d_ev + serverid_s = idlib.shortnodeid_b2a(serverid) + rtt = received - sent + t[T.tr(style="background: %s" % self.color(serverid))[ + [T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)], + T.td[",".join([str(shnum) for shnum in shnums])], + T.td[self.render_time(None, rtt)], + ]]] + l["DYHB Requests:", t] + + t = T.table(class_="status-download-events") + t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"], + T.td["time"], T.td["decrypttime"], T.td["pausedtime"], + T.td["speed"]]] + for r_ev in self.download_status.read_events: + (start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev + #print r_ev + if finishtime is not None: + rtt = finishtime - requesttime - paused + speed = self.render_rate(None, 1.0 * bytes / rtt) + rtt = self.render_time(None, rtt) + decrypt = self.render_time(None, decrypt) + paused = self.render_time(None, paused) + else: + speed, rtt, decrypt, paused = "","","","" + t[T.tr[T.td["[%d:+%d]" % (start, length)], + T.td[srt(requesttime)], T.td[srt(finishtime)], + T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused], + T.td[speed], + ]] + l["Read Events:", t] + + t = T.table(class_="status-download-events") + t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"], + T.td["decodetime"], T.td["segtime"], T.td["speed"]]] + reqtime = (None, None) + for s_ev in self.download_status.segment_events: + (etype, segnum, when, segstart, seglen, decodetime) = s_ev + if etype == "request": + t[T.tr[T.td["request"], T.td["seg%d" % segnum], + T.td[srt(when)]]] + reqtime = (segnum, when) + elif etype == "delivery": + if reqtime[0] == segnum: + segtime = when - reqtime[1] + speed = self.render_rate(None, 1.0 * seglen / segtime) + segtime = self.render_time(None, segtime) + else: + segtime, speed = "", "" + t[T.tr[T.td["delivery"], T.td["seg%d" % segnum], + T.td[srt(when)], + T.td["[%d:+%d]" % (segstart, seglen)], + T.td[self.render_time(None,decodetime)], + T.td[segtime], T.td[speed]]] + elif etype == "error": + t[T.tr[T.td["error"], T.td["seg%d" % segnum]]] + l["Segment Events:", t] + + t = T.table(border="1") + t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"], + T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]] + reqtime = (None, None) + request_events = [] + for serverid,requests in self.download_status.requests.iteritems(): + for req in requests: + request_events.append( (serverid,) + req ) + request_events.sort(key=lambda req: (req[4],req[1])) + for r_ev in request_events: + (peerid, shnum, start, length, sent, receivedlen, received) = r_ev + rtt = None + if received is not None: + rtt = received - sent + peerid_s = idlib.shortnodeid_b2a(peerid) + t[T.tr(style="background: %s" % self.color(peerid))[ + T.td[peerid_s], T.td[shnum], + T.td["[%d:+%d]" % (start, length)], + T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen], + T.td[self.render_time(None, rtt)], + ]] + l["Requests:", t] + + return l + + def color(self, peerid): + def m(c): + return min(ord(c) / 2 + 0x80, 0xff) + return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2])) + def render_results(self, ctx, data): d = self.download_results() def _got_results(results): hunk ./src/allmydata/web/status.py 515 TIME_FORMAT = "%H:%M:%S %d-%b-%Y" started_s = time.strftime(TIME_FORMAT, time.localtime(data.get_started())) - return started_s + return started_s + " (%s)" % data.get_started() def render_si(self, ctx, data): si_s = base32.b2a_or_none(data.get_storage_index()) hunk ./src/allmydata/web/tahoe.css 139 text-align: center; padding: 0 1em; } + +/* recent upload/download status pages */ + +table.status-download-events { + border: 1px solid #aaa; +} +table.status-download-events td { + border: 1px solid #a00; + padding: 2px +} + } Context: [bundled zetuptoolz' scriptsetup.py: broadcast WM_SETTINGCHANGE if environment has changed. david-sarah@jacaranda.org**20100801010958 Ignore-this: ac4ac78c45c538c2e50610997b56a86e ] [abbreviate time edge case python2.5 unit test jacob.lyles@gmail.com**20100729210638 Ignore-this: 80f9b1dc98ee768372a50be7d0ef66af ] [test_upload.py: rename test_problem_layout_ticket1124 to test_problem_layout_ticket_1124 -- fix .todo reference. david-sarah@jacaranda.org**20100729152927 Ignore-this: c8fe1047edcc83c87b9feb47f4aa587b ] [misc/build_helpers/run_trial.py: correct error in formatting wrong-code error message. david-sarah@jacaranda.org**20100729151457 Ignore-this: bf4014842a1ffc075e8017053356e3a0 ] [test_upload.py: rename test_problem_layout_ticket1124 to test_problem_layout_ticket_1124 for consistency. david-sarah@jacaranda.org**20100729142250 Ignore-this: bc3aad5919ae9079ceb9968ad0f5ea5a ] [docs: fix licensing typo that was earlier fixed in [20090921164651-92b7f-7f97b58101d93dc588445c52a9aaa56a2c7ae336] zooko@zooko.com**20100729052923 Ignore-this: a975d79115911688e5469d4d869e1664 I wish we didn't copies of this licensing text in several different files so that changes can be accidentally omitted from some of them. ] [misc/build_helpers/run_trial.py: allow 'pythonx.y' between 'lib' and 'site-packages'. Also, have the wrong-code error message give the original module source filename. david-sarah@jacaranda.org**20100729052813 Ignore-this: fafb184f1ecc4a9047aa6ea98b51cab6 ] [misc/build_helpers/run_trial.py: fix another off-by-two error when the module is loaded from lib/site-packages. david-sarah@jacaranda.org**20100729032853 Ignore-this: b54312cb736ec35e528567c989b79a5d ] [misc/build_helpers/run_trial.py: fix an off-by-one error when determining the root directory from which the module was loaded, and an off-by-two error when it is loaded from an .egg. david-sarah@jacaranda.org**20100729031147 Ignore-this: e38b5dc8dd7b7e387641a4af743c7a27 ] [misc/build_helpers/run_trial.py and test_runner.py: refine the false-positive detection for Unicode paths. david-sarah@jacaranda.org**20100729030903 Ignore-this: 42bbfb52bd56ce72dae66f8971e47074 ] [misc/build_helpers/run_trial.py: skip option arguments before module name. david-sarah@jacaranda.org**20100729024602 Ignore-this: 51f7f0e4e73205ef3b7c644c4be5cd27 ] [misc/build_helpers/run_trial.py and test_runner.py: avoid spurious failures due to non-canonical paths when checking that we are testing the right code. Also simplify module loading in run_trial.py. david-sarah@jacaranda.org**20100729023233 Ignore-this: 5a65065299cdf52b257dbf1ea83bdbaa ] [.darcs-boringfile: fix errors in previous patch, and make _trial_temp a prefix rather than an exact match. david-sarah@jacaranda.org**20100729010634 Ignore-this: 247e24993b578682219d110c031daac3 ] [misc/build_helpers/run_trial.py: check that the root from which the module we are testing was loaded is the current directory. addresses #1137 david-sarah@jacaranda.org**20100729004317 Ignore-this: e285af3f5cf0e0bc9537632d8457b8a8 ] [.darcs-boringfile: take account of generated bin/tahoe.pyscript and bundled .egg directories. david-sarah@jacaranda.org**20100728224723 Ignore-this: dce133644614753907d5d617dc8dd771 ] [test_runner.py: add 'test_the_right_code', which partly addresses #1137 david-sarah@jacaranda.org**20100728194325 Ignore-this: ed67365cc067881bcffb9ff5fcfa3ef6 ] [test_runner.py: add test_run_with_python_options, to test that we haven't broken skipping of option arguments in argv. Also fix errors in the message arguments to failUnlessEqual. david-sarah@jacaranda.org**20100728070445 Ignore-this: fb4a907603dc8ffa71c121dd465b4bb8 ] [Skip option arguments to the python interpreter when reconstructing Unicode argv on Windows. david-sarah@jacaranda.org**20100728062731 Ignore-this: 2b17fc43860bcc02a66bb6e5e050ea7c ] [windows/fixups.py: improve comments and reference some relevant Python bugs. david-sarah@jacaranda.org**20100727181921 Ignore-this: 32e61cf98dfc2e3dac60b750dda6429b ] [misc/build_helpers/run-with-pythonpath.py: fix stale comment, and remove 'trial' example that is not the right way to run trial. david-sarah@jacaranda.org**20100726225729 Ignore-this: a61f55557ad69a1633bfb2b8172cce97 ] [windows/fixups.py: make errors reported to original_stderr have enough information to debug even if we can't see the traceback. david-sarah@jacaranda.org**20100726221904 Ignore-this: e30b4629a7aa5d71554237c7e809c080 ] [windows/fixups.py: fix paste-o in name of Unicode stderr wrapper. david-sarah@jacaranda.org**20100726214736 Ignore-this: cb220931f1683eb53b0c7269e18a38be ] [windows/fixups.py: Don't rely on buggy MSVCRT library for Unicode output, use the Win32 API instead. This should make it work on XP. Also, change how we handle the case where sys.stdout and sys.stderr are redirected, since the .encoding attribute isn't necessarily writeable. david-sarah@jacaranda.org**20100726045019 Ignore-this: 69267abc5065cbd5b86ca71fe4921fb6 ] [fileutil: change WindowsError to OSError in abspath_expanduser_unicode, because WindowsError might not exist. david-sarah@jacaranda.org**20100725222603 Ignore-this: e125d503670ed049a9ade0322faa0c51 ] [test_runner.py: change to code for locating the bin/tahoe script that was missed when rebasing the patch for #1074. david-sarah@jacaranda.org**20100725182008 Ignore-this: d891a93989ecc3f4301a17110c3d196c ] [Add missing windows/fixups.py (for setting up Unicode args and output on Windows). david-sarah@jacaranda.org**20100725092849 Ignore-this: 35a1e8aeb4e1dea6e81433bf0825a6f6 ] [bundled zetuptoolz: add missing scriptsetup.py, and remove cli.exe. david-sarah@jacaranda.org**20100725090203 Ignore-this: 64810149ed7f25babfb123690191920b ] [Move bundled setuptools egg directory to reflect its version (0.6c16dev). david-sarah@jacaranda.org**20100725084629 Ignore-this: b37969282dfd5d1f705e61780e8d62b9 ] [Upgrade bundled zetuptoolz to 0.6c16dev. david-sarah@jacaranda.org**20100725083728 Ignore-this: ecca879c0c6d8ee5473db770a522c2f4 ] [Changes to Tahoe needed to work with new zetuptoolz (that does not use .exe wrappers on Windows), and to support Unicode arguments and stdout/stderr -- v5 david-sarah@jacaranda.org**20100725083216 Ignore-this: 5041a634b1328f041130658233f6a7ce ] [Fix test failures due to Unicode basedir patches. david-sarah@jacaranda.org**20100725010318 Ignore-this: fe92cd439eb3e60a56c007ae452784ed ] [test_system: correct a failure in _test_runner caused by Unicode basedir patch on non-Unicode platforms. david-sarah@jacaranda.org**20100724032123 Ignore-this: 399b3953104fdd1bbed3f7564d163553 ] [util.encodingutil: change quote_output to do less unnecessary escaping, and to use double-quotes more consistently when needed. This version avoids u-escaping for characters that are representable in the output encoding, when double quotes are used, and includes tests. fixes #1135 david-sarah@jacaranda.org**20100723075314 Ignore-this: b82205834d17db61612dd16436b7c5a2 ] [_auto_deps.py: make it easier to build with earliest versions of all dependencies. david-sarah@jacaranda.org**20100722001725 Ignore-this: 90a1bf1d489aa5a98ae39bb2e16c6f66 ] [Replace uses of os.path.abspath with abspath_expanduser_unicode where necessary. This makes basedir paths consistently represented as Unicode. david-sarah@jacaranda.org**20100722001418 Ignore-this: 9f8cb706540e695550e0dbe303c01f52 ] [Basedir/node directory option improvements. addresses #188, #706, #715, #772, #890 david-sarah@jacaranda.org**20100721234834 Ignore-this: 92d52f3af4acb0d659cb49e3306fef6c ] [util.fileutil, test.test_util: add abspath_expanduser_unicode function, to work around . util.encodingutil: add a convenience function argv_to_abspath. david-sarah@jacaranda.org**20100721231507 Ignore-this: eee6904d1f65a733ff35190879844d08 ] [docs/specifications/dirnodes.txt: 'mesh'->'grid'. david-sarah@jacaranda.org**20100723061616 Ignore-this: 887bcf921ef00afba8e05e9239035bca ] [docs/specifications/dirnodes.txt: bring layer terminology up-to-date with architecture.txt, and a few other updates (e.g. note that the MAC is no longer verified, and that URIs can be unknown). Also 'Tahoe'->'Tahoe-LAFS'. david-sarah@jacaranda.org**20100723054703 Ignore-this: f3b98183e7d0a0f391225b8b93ac6c37 ] [docs: use current cap to Zooko's wiki page in example text zooko@zooko.com**20100721010543 Ignore-this: 4f36f36758f9fdbaf9eb73eac23b6652 fixes #1134 ] [__init__.py: silence DeprecationWarning about BaseException.message globally. fixes #1129 david-sarah@jacaranda.org**20100720011939 Ignore-this: 38808986ba79cb2786b010504a22f89 ] [test_runner: test that 'tahoe --version' outputs no noise (e.g. DeprecationWarnings). david-sarah@jacaranda.org**20100720011345 Ignore-this: dd358b7b2e5d57282cbe133e8069702e ] [TAG allmydata-tahoe-1.7.1 zooko@zooko.com**20100719131352 Ignore-this: 6942056548433dc653a746703819ad8c ] Patch bundle hash: 2980d8a66f7d6778e0e8076504d00dd5caf1b3b5