diff --git a/Makefile b/Makefile index 3e4be60..723d656 100644 --- a/Makefile +++ b/Makefile @@ -125,7 +125,7 @@ quicktest: # quicktest-coverage" to do a unit test run with coverage-gathering enabled, # then use "make coverate-output-text" for a brief report, or "make # coverage-output" for a pretty HTML report. Also see "make .coverage.el" and -# misc/coding_helpers/coverage.el for emacs integration. +# misc/coding_tools/coverage.el for emacs integration. quicktest-coverage: rm -f .coverage @@ -134,7 +134,7 @@ quicktest-coverage: coverage-output: rm -rf coverage-html - coverage html -d coverage-html + coverage html -i -d coverage-html $(COVERAGE_OMIT) cp .coverage coverage-html/coverage.data @echo "now point your browser at coverage-html/index.html" @@ -154,7 +154,7 @@ coverage-output: .PHONY: repl test-darcs-boringfile test-clean clean find-trailing-spaces .coverage.el: .coverage - $(PYTHON) misc/coding_helpers/coverage2el.py + $(PYTHON) misc/coding_tools/coverage2el.py # 'upload-coverage' is meant to be run with an UPLOAD_TARGET=host:/dir setting ifdef UPLOAD_TARGET @@ -178,6 +178,8 @@ endif pyflakes: $(PYTHON) -OOu `which pyflakes` src/allmydata |sort |uniq +check-umids: + $(PYTHON) misc/coding_tools/check-umids.py `find src/allmydata -name '*.py'` count-lines: @echo -n "files: " diff --git a/misc/coding_tools/check-umids.py b/misc/coding_tools/check-umids.py new file mode 100755 index 0000000..05e8825 --- /dev/null +++ b/misc/coding_tools/check-umids.py @@ -0,0 +1,30 @@ +#! /usr/bin/python + +# ./rumid.py foo.py + +import sys, re, os + +ok = True +umids = {} + +for fn in sys.argv[1:]: + fn = os.path.abspath(fn) + for lineno,line in enumerate(open(fn, "r").readlines()): + lineno = lineno+1 + if "umid" not in line: + continue + mo = re.search("umid=[\"\']([^\"\']+)[\"\']", line) + if mo: + umid = mo.group(1) + if umid in umids: + oldfn, oldlineno = umids[umid] + print "%s:%d: duplicate umid '%s'" % (fn, lineno, umid) + print "%s:%d: first used here" % (oldfn, oldlineno) + ok = False + umids[umid] = (fn,lineno) + +if ok: + print "all umids are unique" +else: + print "some umids were duplicates" + sys.exit(1) diff --git a/misc/coding_tools/coverage.el b/misc/coding_tools/coverage.el index bad490f..8d69d5d 100644 --- a/misc/coding_tools/coverage.el +++ b/misc/coding_tools/coverage.el @@ -84,7 +84,8 @@ 'face '(:box "red") ) ) - (message "Added annotations") + (message (format "Added annotations: %d uncovered lines" + (safe-length uncovered-code-lines))) ) ) (message "unable to find coverage for this file")) diff --git a/misc/coding_tools/coverage2el.py b/misc/coding_tools/coverage2el.py index ed94bd0..7d03a27 100644 --- a/misc/coding_tools/coverage2el.py +++ b/misc/coding_tools/coverage2el.py @@ -1,5 +1,5 @@ -from coverage import coverage, summary +from coverage import coverage, summary, misc class ElispReporter(summary.SummaryReporter): def report(self): @@ -21,7 +21,10 @@ class ElispReporter(summary.SummaryReporter): out.write("(let ((results (make-hash-table :test 'equal)))\n") for cu in self.code_units: f = cu.filename - (fn, executable, missing, mf) = self.coverage.analysis(cu) + try: + (fn, executable, missing, mf) = self.coverage.analysis(cu) + except misc.NoSource: + continue code_linenumbers = executable uncovered_code = missing covered_linenumbers = sorted(set(executable) - set(missing)) diff --git a/misc/simulators/sizes.py b/misc/simulators/sizes.py index d9c230a..7910946 100644 --- a/misc/simulators/sizes.py +++ b/misc/simulators/sizes.py @@ -60,22 +60,22 @@ class Sizes: self.block_arity = 0 self.block_tree_depth = 0 self.block_overhead = 0 - self.bytes_until_some_data = 20 + share_size + self.bytes_until_some_data = 32 + share_size self.share_storage_overhead = 0 self.share_transmission_overhead = 0 elif mode == "beta": # k=num_blocks, d=1 - # each block has a 20-byte hash + # each block has a 32-byte hash self.block_arity = num_blocks self.block_tree_depth = 1 - self.block_overhead = 20 + self.block_overhead = 32 # the share has a list of hashes, one for each block self.share_storage_overhead = (self.block_overhead * num_blocks) # we can get away with not sending the hash of the share that # we're sending in full, once - self.share_transmission_overhead = self.share_storage_overhead - 20 + self.share_transmission_overhead = self.share_storage_overhead - 32 # we must get the whole list (so it can be validated) before # any data can be validated self.bytes_until_some_data = (self.share_transmission_overhead + @@ -89,7 +89,7 @@ class Sizes: # to make things easier, we make the pessimistic assumption that # we have to store hashes for all the empty places in the tree # (when the number of shares is not an exact exponent of k) - self.block_overhead = 20 + self.block_overhead = 32 # the block hashes are organized into a k-ary tree, which # means storing (and eventually transmitting) more hashes. This # count includes all the low-level share hashes and the root. @@ -98,18 +98,18 @@ class Sizes: #print "num_leaves", num_leaves #print "hash_nodes", hash_nodes # the storage overhead is this - self.share_storage_overhead = 20 * (hash_nodes - 1) + self.share_storage_overhead = 32 * (hash_nodes - 1) # the transmission overhead is smaller: if we actually transmit # every block, we don't have to transmit 1/k of the # lowest-level block hashes, and we don't have to transmit the # root because it was already sent with the share-level hash tree - self.share_transmission_overhead = 20 * (hash_nodes + self.share_transmission_overhead = 32 * (hash_nodes - 1 # the root - num_leaves / k) # we must get a full sibling hash chain before we can validate # any data sibling_length = d * (k-1) - self.bytes_until_some_data = 20 * sibling_length + block_size + self.bytes_until_some_data = 32 * sibling_length + block_size diff --git a/misc/simulators/storage-overhead.py b/misc/simulators/storage-overhead.py index 75a0bf6..a294b8d 100644 --- a/misc/simulators/storage-overhead.py +++ b/misc/simulators/storage-overhead.py @@ -1,7 +1,9 @@ #!/usr/bin/env python import sys, math -from allmydata import upload, uri, encode, storage +from allmydata import uri, storage +from allmydata.immutable import upload +from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE from allmydata.util import mathutil def roundup(size, blocksize=4096): @@ -22,14 +24,14 @@ class BigFakeString: def tell(self): return self.fp -def calc(filesize, params=(3,7,10), segsize=encode.Encoder.MAX_SEGMENT_SIZE): +def calc(filesize, params=(3,7,10), segsize=DEFAULT_MAX_SEGMENT_SIZE): num_shares = params[2] if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD: - urisize = len(uri.pack_lit("A"*filesize)) + urisize = len(uri.LiteralFileURI("A"*filesize).to_string()) sharesize = 0 sharespace = 0 else: - u = upload.FileUploader(None) + u = upload.FileUploader(None) # XXX changed u.set_params(params) # unfortunately, Encoder doesn't currently lend itself to answering # this question without measuring a filesize, so we have to give it a diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 12e7473..c914ec4 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -1,9 +1,10 @@ -import os, stat, time +import os, stat, time, weakref from allmydata.interfaces import RIStorageServer from allmydata import node from zope.interface import implements from twisted.internet import reactor, defer +from twisted.application import service from twisted.application.internet import TimerService from foolscap.api import Referenceable from pycryptopp.publickey import rsa @@ -12,11 +13,10 @@ import allmydata from allmydata.storage.server import StorageServer from allmydata import storage_client from allmydata.immutable.upload import Uploader -from allmydata.immutable.download import Downloader from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient -from allmydata.util import hashutil, base32, pollmixin, cachedir, log +from allmydata.util import hashutil, base32, pollmixin, log from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date from allmydata.stats import StatsProvider @@ -94,6 +94,16 @@ class KeyGenerator: verifier = signer.get_verifying_key() return defer.succeed( (verifier, signer) ) +class Terminator(service.Service): + def __init__(self): + self._clients = weakref.WeakKeyDictionary() + def register(self, c): + self._clients[c] = None + def stopService(self): + for c in self._clients: + c.stop() + return service.Service.stopService(self) + class Client(node.Node, pollmixin.PollMixin): implements(IStatsProducer) @@ -278,12 +288,9 @@ class Client(node.Node, pollmixin.PollMixin): self.init_client_storage_broker() self.history = History(self.stats_provider) + self.terminator = Terminator() + self.terminator.setServiceParent(self) self.add_service(Uploader(helper_furl, self.stats_provider)) - download_cachedir = os.path.join(self.basedir, - "private", "cache", "download") - self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir) - self.download_cache_dirman.setServiceParent(self) - self.downloader = Downloader(self.storage_broker, self.stats_provider) self.init_stub_client() self.init_nodemaker() @@ -342,8 +349,7 @@ class Client(node.Node, pollmixin.PollMixin): self._secret_holder, self.get_history(), self.getServiceNamed("uploader"), - self.downloader, - self.download_cache_dirman, + self.terminator, self.get_encoding_parameters(), self._key_generator) diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 2f2d8f1..cd5c556 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -1,16 +1,444 @@ +from zope.interface import implements +from twisted.internet import defer from foolscap.api import DeadReferenceError, RemoteException +from allmydata import hashtree, codec, uri +from allmydata.interfaces import IValidatedThingProxy, IVerifierURI from allmydata.hashtree import IncompleteHashTree from allmydata.check_results import CheckResults -from allmydata.immutable import download from allmydata.uri import CHKFileVerifierURI from allmydata.util.assertutil import precondition -from allmydata.util import base32, idlib, deferredutil, dictutil, log +from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil from allmydata.util.hashutil import file_renewal_secret_hash, \ file_cancel_secret_hash, bucket_renewal_secret_hash, \ - bucket_cancel_secret_hash + bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \ + block_hash from allmydata.immutable import layout +class IntegrityCheckReject(Exception): + pass +class BadURIExtension(IntegrityCheckReject): + pass +class BadURIExtensionHashValue(IntegrityCheckReject): + pass +class BadOrMissingHash(IntegrityCheckReject): + pass +class UnsupportedErasureCodec(BadURIExtension): + pass + +class ValidatedExtendedURIProxy: + implements(IValidatedThingProxy) + """ I am a front-end for a remote UEB (using a local ReadBucketProxy), + responsible for retrieving and validating the elements from the UEB.""" + + def __init__(self, readbucketproxy, verifycap, fetch_failures=None): + # fetch_failures is for debugging -- see test_encode.py + self._fetch_failures = fetch_failures + self._readbucketproxy = readbucketproxy + precondition(IVerifierURI.providedBy(verifycap), verifycap) + self._verifycap = verifycap + + # required + self.segment_size = None + self.crypttext_root_hash = None + self.share_root_hash = None + + # computed + self.block_size = None + self.share_size = None + self.num_segments = None + self.tail_data_size = None + self.tail_segment_size = None + + # optional + self.crypttext_hash = None + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string()) + + def _check_integrity(self, data): + h = uri_extension_hash(data) + if h != self._verifycap.uri_extension_hash: + msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % + (self._readbucketproxy, + base32.b2a(self._verifycap.uri_extension_hash), + base32.b2a(h))) + if self._fetch_failures is not None: + self._fetch_failures["uri_extension"] += 1 + raise BadURIExtensionHashValue(msg) + else: + return data + + def _parse_and_validate(self, data): + self.share_size = mathutil.div_ceil(self._verifycap.size, + self._verifycap.needed_shares) + + d = uri.unpack_extension(data) + + # There are several kinds of things that can be found in a UEB. + # First, things that we really need to learn from the UEB in order to + # do this download. Next: things which are optional but not redundant + # -- if they are present in the UEB they will get used. Next, things + # that are optional and redundant. These things are required to be + # consistent: they don't have to be in the UEB, but if they are in + # the UEB then they will be checked for consistency with the + # already-known facts, and if they are inconsistent then an exception + # will be raised. These things aren't actually used -- they are just + # tested for consistency and ignored. Finally: things which are + # deprecated -- they ought not be in the UEB at all, and if they are + # present then a warning will be logged but they are otherwise + # ignored. + + # First, things that we really need to learn from the UEB: + # segment_size, crypttext_root_hash, and share_root_hash. + self.segment_size = d['segment_size'] + + self.block_size = mathutil.div_ceil(self.segment_size, + self._verifycap.needed_shares) + self.num_segments = mathutil.div_ceil(self._verifycap.size, + self.segment_size) + + self.tail_data_size = self._verifycap.size % self.segment_size + if not self.tail_data_size: + self.tail_data_size = self.segment_size + # padding for erasure code + self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, + self._verifycap.needed_shares) + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. + self.crypttext_root_hash = d['crypttext_root_hash'] + + self.share_root_hash = d['share_root_hash'] + + + # Next: things that are optional and not redundant: crypttext_hash + if d.has_key('crypttext_hash'): + self.crypttext_hash = d['crypttext_hash'] + if len(self.crypttext_hash) != CRYPTO_VAL_SIZE: + raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) + + + # Next: things that are optional, redundant, and required to be + # consistent: codec_name, codec_params, tail_codec_params, + # num_segments, size, needed_shares, total_shares + if d.has_key('codec_name'): + if d['codec_name'] != "crs": + raise UnsupportedErasureCodec(d['codec_name']) + + if d.has_key('codec_params'): + ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) + if ucpss != self.segment_size: + raise BadURIExtension("inconsistent erasure code params: " + "ucpss: %s != self.segment_size: %s" % + (ucpss, self.segment_size)) + if ucpns != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " + "self._verifycap.needed_shares: %s" % + (ucpns, self._verifycap.needed_shares)) + if ucpts != self._verifycap.total_shares: + raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " + "self._verifycap.total_shares: %s" % + (ucpts, self._verifycap.total_shares)) + + if d.has_key('tail_codec_params'): + utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) + if utcpss != self.tail_segment_size: + raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " + "self.tail_segment_size: %s, self._verifycap.size: %s, " + "self.segment_size: %s, self._verifycap.needed_shares: %s" + % (utcpss, self.tail_segment_size, self._verifycap.size, + self.segment_size, self._verifycap.needed_shares)) + if utcpns != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " + "self._verifycap.needed_shares: %s" % (utcpns, + self._verifycap.needed_shares)) + if utcpts != self._verifycap.total_shares: + raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " + "self._verifycap.total_shares: %s" % (utcpts, + self._verifycap.total_shares)) + + if d.has_key('num_segments'): + if d['num_segments'] != self.num_segments: + raise BadURIExtension("inconsistent num_segments: size: %s, " + "segment_size: %s, computed_num_segments: %s, " + "ueb_num_segments: %s" % (self._verifycap.size, + self.segment_size, + self.num_segments, d['num_segments'])) + + if d.has_key('size'): + if d['size'] != self._verifycap.size: + raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % + (self._verifycap.size, d['size'])) + + if d.has_key('needed_shares'): + if d['needed_shares'] != self._verifycap.needed_shares: + raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " + "needed shares: %s" % (self._verifycap.total_shares, + d['needed_shares'])) + + if d.has_key('total_shares'): + if d['total_shares'] != self._verifycap.total_shares: + raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " + "total shares: %s" % (self._verifycap.total_shares, + d['total_shares'])) + + # Finally, things that are deprecated and ignored: plaintext_hash, + # plaintext_root_hash + if d.get('plaintext_hash'): + log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " + "and is no longer used. Ignoring. %s" % (self,)) + if d.get('plaintext_root_hash'): + log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " + "reasons and is no longer used. Ignoring. %s" % (self,)) + + return self + + def start(self): + """Fetch the UEB from bucket, compare its hash to the hash from + verifycap, then parse it. Returns a deferred which is called back + with self once the fetch is successful, or is erred back if it + fails.""" + d = self._readbucketproxy.get_uri_extension() + d.addCallback(self._check_integrity) + d.addCallback(self._parse_and_validate) + return d + +class ValidatedReadBucketProxy(log.PrefixingLogMixin): + """I am a front-end for a remote storage bucket, responsible for + retrieving and validating data from that bucket. + + My get_block() method is used by BlockDownloaders. + """ + + def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, + block_size, share_size): + """ share_hash_tree is required to have already been initialized with + the root hash (the number-0 hash), using the share_root_hash from the + UEB""" + precondition(share_hash_tree[0] is not None, share_hash_tree) + prefix = "%d-%s-%s" % (sharenum, bucket, + base32.b2a_l(share_hash_tree[0][:8], 60)) + log.PrefixingLogMixin.__init__(self, + facility="tahoe.immutable.download", + prefix=prefix) + self.sharenum = sharenum + self.bucket = bucket + self.share_hash_tree = share_hash_tree + self.num_blocks = num_blocks + self.block_size = block_size + self.share_size = share_size + self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) + + def get_all_sharehashes(self): + """Retrieve and validate all the share-hash-tree nodes that are + included in this share, regardless of whether we need them to + validate the share or not. Each share contains a minimal Merkle tree + chain, but there is lots of overlap, so usually we'll be using hashes + from other shares and not reading every single hash from this share. + The Verifier uses this function to read and validate every single + hash from this share. + + Call this (and wait for the Deferred it returns to fire) before + calling get_block() for the first time: this lets us check that the + share share contains enough hashes to validate its own data, and + avoids downloading any share hash twice. + + I return a Deferred which errbacks upon failure, probably with + BadOrMissingHash.""" + + d = self.bucket.get_share_hashes() + def _got_share_hashes(sh): + sharehashes = dict(sh) + try: + self.share_hash_tree.set_hashes(sharehashes) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_share_hashes) + return d + + def get_all_blockhashes(self): + """Retrieve and validate all the block-hash-tree nodes that are + included in this share. Each share contains a full Merkle tree, but + we usually only fetch the minimal subset necessary for any particular + block. This function fetches everything at once. The Verifier uses + this function to validate the block hash tree. + + Call this (and wait for the Deferred it returns to fire) after + calling get_all_sharehashes() and before calling get_block() for the + first time: this lets us check that the share contains all block + hashes and avoids downloading them multiple times. + + I return a Deferred which errbacks upon failure, probably with + BadOrMissingHash. + """ + + # get_block_hashes(anything) currently always returns everything + needed = list(range(len(self.block_hash_tree))) + d = self.bucket.get_block_hashes(needed) + def _got_block_hashes(blockhashes): + if len(blockhashes) < len(self.block_hash_tree): + raise BadOrMissingHash() + bh = dict(enumerate(blockhashes)) + + try: + self.block_hash_tree.set_hashes(bh) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_block_hashes) + return d + + def get_all_crypttext_hashes(self, crypttext_hash_tree): + """Retrieve and validate all the crypttext-hash-tree nodes that are + in this share. Normally we don't look at these at all: the download + process fetches them incrementally as needed to validate each segment + of ciphertext. But this is a convenient place to give the Verifier a + function to validate all of these at once. + + Call this with a new hashtree object for each share, initialized with + the crypttext hash tree root. I return a Deferred which errbacks upon + failure, probably with BadOrMissingHash. + """ + + # get_crypttext_hashes() always returns everything + d = self.bucket.get_crypttext_hashes() + def _got_crypttext_hashes(hashes): + if len(hashes) < len(crypttext_hash_tree): + raise BadOrMissingHash() + ct_hashes = dict(enumerate(hashes)) + try: + crypttext_hash_tree.set_hashes(ct_hashes) + except IndexError, le: + raise BadOrMissingHash(le) + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + raise BadOrMissingHash(le) + d.addCallback(_got_crypttext_hashes) + return d + + def get_block(self, blocknum): + # the first time we use this bucket, we need to fetch enough elements + # of the share hash tree to validate it from our share hash up to the + # hashroot. + if self.share_hash_tree.needed_hashes(self.sharenum): + d1 = self.bucket.get_share_hashes() + else: + d1 = defer.succeed([]) + + # We might need to grab some elements of our block hash tree, to + # validate the requested block up to the share hash. + blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) + # We don't need the root of the block hash tree, as that comes in the + # share tree. + blockhashesneeded.discard(0) + d2 = self.bucket.get_block_hashes(blockhashesneeded) + + if blocknum < self.num_blocks-1: + thisblocksize = self.block_size + else: + thisblocksize = self.share_size % self.block_size + if thisblocksize == 0: + thisblocksize = self.block_size + d3 = self.bucket.get_block_data(blocknum, + self.block_size, thisblocksize) + + dl = deferredutil.gatherResults([d1, d2, d3]) + dl.addCallback(self._got_data, blocknum) + return dl + + def _got_data(self, results, blocknum): + precondition(blocknum < self.num_blocks, + self, blocknum, self.num_blocks) + sharehashes, blockhashes, blockdata = results + try: + sharehashes = dict(sharehashes) + except ValueError, le: + le.args = tuple(le.args + (sharehashes,)) + raise + blockhashes = dict(enumerate(blockhashes)) + + candidate_share_hash = None # in case we log it in the except block below + blockhash = None # in case we log it in the except block below + + try: + if self.share_hash_tree.needed_hashes(self.sharenum): + # This will raise exception if the values being passed do not + # match the root node of self.share_hash_tree. + try: + self.share_hash_tree.set_hashes(sharehashes) + except IndexError, le: + # Weird -- sharehashes contained index numbers outside of + # the range that fit into this hash tree. + raise BadOrMissingHash(le) + + # To validate a block we need the root of the block hash tree, + # which is also one of the leafs of the share hash tree, and is + # called "the share hash". + if not self.block_hash_tree[0]: # empty -- no root node yet + # Get the share hash from the share hash tree. + share_hash = self.share_hash_tree.get_leaf(self.sharenum) + if not share_hash: + # No root node in block_hash_tree and also the share hash + # wasn't sent by the server. + raise hashtree.NotEnoughHashesError + self.block_hash_tree.set_hashes({0: share_hash}) + + if self.block_hash_tree.needed_hashes(blocknum): + self.block_hash_tree.set_hashes(blockhashes) + + blockhash = block_hash(blockdata) + self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) + #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " + # "%r .. %r: %s" % + # (self.sharenum, blocknum, len(blockdata), + # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) + + except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: + # log.WEIRD: indicates undetected disk/network error, or more + # likely a programming error + self.log("hash failure in block=%d, shnum=%d on %s" % + (blocknum, self.sharenum, self.bucket)) + if self.block_hash_tree.needed_hashes(blocknum): + self.log(""" failure occurred when checking the block_hash_tree. + This suggests that either the block data was bad, or that the + block hashes we received along with it were bad.""") + else: + self.log(""" the failure probably occurred when checking the + share_hash_tree, which suggests that the share hashes we + received from the remote peer were bad.""") + self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) + self.log(" block length: %d" % len(blockdata)) + self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) + if len(blockdata) < 100: + self.log(" block data: %r" % (blockdata,)) + else: + self.log(" block data start/end: %r .. %r" % + (blockdata[:50], blockdata[-50:])) + self.log(" share hash tree:\n" + self.share_hash_tree.dump()) + self.log(" block hash tree:\n" + self.block_hash_tree.dump()) + lines = [] + for i,h in sorted(sharehashes.items()): + lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) + self.log(" sharehashes:\n" + "\n".join(lines) + "\n") + lines = [] + for i,h in blockhashes.items(): + lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) + log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") + raise BadOrMissingHash(le) + + # If we made it here, the block is good. If the hash trees didn't + # like what they saw, they would have raised a BadHashError, causing + # our caller to see a Failure and thus ignore this block (as well as + # dropping this bucket). + return blockdata + + class Checker(log.PrefixingLogMixin): """I query all servers to see if M uniquely-numbered shares are available. @@ -85,7 +513,9 @@ class Checker(log.PrefixingLogMixin): level = log.WEIRD if f.check(DeadReferenceError): level = log.UNUSUAL - self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ") + self.log("failure from server on 'get_buckets' the REMOTE failure was:", + facility="tahoe.immutable.checker", + failure=f, level=level, umid="AX7wZQ") return ({}, serverid, False) d.addCallbacks(_wrap_results, _trap_errs) @@ -146,18 +576,18 @@ class Checker(log.PrefixingLogMixin): vcap = self._verifycap b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index()) - veup = download.ValidatedExtendedURIProxy(b, vcap) + veup = ValidatedExtendedURIProxy(b, vcap) d = veup.start() def _got_ueb(vup): share_hash_tree = IncompleteHashTree(vcap.total_shares) share_hash_tree.set_hashes({0: vup.share_root_hash}) - vrbp = download.ValidatedReadBucketProxy(sharenum, b, - share_hash_tree, - vup.num_segments, - vup.block_size, - vup.share_size) + vrbp = ValidatedReadBucketProxy(sharenum, b, + share_hash_tree, + vup.num_segments, + vup.block_size, + vup.share_size) # note: normal download doesn't use get_all_sharehashes(), # because it gets more data than necessary. We've discussed the @@ -216,8 +646,8 @@ class Checker(log.PrefixingLogMixin): return (False, sharenum, 'incompatible') elif f.check(layout.LayoutInvalid, layout.RidiculouslyLargeURIExtensionBlock, - download.BadOrMissingHash, - download.BadURIExtensionHashValue): + BadOrMissingHash, + BadURIExtensionHashValue): return (False, sharenum, 'corrupt') # if it wasn't one of those reasons, re-raise the error diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py deleted file mode 100644 index eb02c6a..0000000 --- a/src/allmydata/immutable/download.py +++ /dev/null @@ -1,1321 +0,0 @@ -import random, weakref, itertools, time -from zope.interface import implements -from twisted.internet import defer, reactor -from twisted.internet.interfaces import IPushProducer, IConsumer -from foolscap.api import DeadReferenceError, RemoteException, eventually - -from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib -from allmydata.util.assertutil import _assert, precondition -from allmydata import codec, hashtree, uri -from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \ - IDownloadStatus, IDownloadResults, IValidatedThingProxy, \ - IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \ - UnableToFetchCriticalDownloadDataError -from allmydata.immutable import layout -from allmydata.monitor import Monitor -from pycryptopp.cipher.aes import AES - -class IntegrityCheckReject(Exception): - pass - -class BadURIExtensionHashValue(IntegrityCheckReject): - pass -class BadURIExtension(IntegrityCheckReject): - pass -class UnsupportedErasureCodec(BadURIExtension): - pass -class BadCrypttextHashValue(IntegrityCheckReject): - pass -class BadOrMissingHash(IntegrityCheckReject): - pass - -class DownloadStopped(Exception): - pass - -class DownloadResults: - implements(IDownloadResults) - - def __init__(self): - self.servers_used = set() - self.server_problems = {} - self.servermap = {} - self.timings = {} - self.file_size = None - -class DecryptingTarget(log.PrefixingLogMixin): - implements(IDownloadTarget, IConsumer) - def __init__(self, target, key, _log_msg_id=None): - precondition(IDownloadTarget.providedBy(target), target) - self.target = target - self._decryptor = AES(key) - prefix = str(target) - log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix) - # methods to satisfy the IConsumer interface - def registerProducer(self, producer, streaming): - if IConsumer.providedBy(self.target): - self.target.registerProducer(producer, streaming) - def unregisterProducer(self): - if IConsumer.providedBy(self.target): - self.target.unregisterProducer() - def write(self, ciphertext): - plaintext = self._decryptor.process(ciphertext) - self.target.write(plaintext) - def open(self, size): - self.target.open(size) - def close(self): - self.target.close() - def finish(self): - return self.target.finish() - # The following methods is just to pass through to the next target, and - # just because that target might be a repairer.DownUpConnector, and just - # because the current CHKUpload object expects to find the storage index - # in its Uploadable. - def set_storageindex(self, storageindex): - self.target.set_storageindex(storageindex) - def set_encodingparams(self, encodingparams): - self.target.set_encodingparams(encodingparams) - -class ValidatedThingObtainer: - def __init__(self, validatedthingproxies, debugname, log_id): - self._validatedthingproxies = validatedthingproxies - self._debugname = debugname - self._log_id = log_id - - def _bad(self, f, validatedthingproxy): - f.trap(RemoteException, DeadReferenceError, - IntegrityCheckReject, layout.LayoutInvalid, - layout.ShareVersionIncompatible) - level = log.WEIRD - if f.check(DeadReferenceError): - level = log.UNUSUAL - elif f.check(RemoteException): - level = log.WEIRD - else: - level = log.SCARY - log.msg(parent=self._log_id, facility="tahoe.immutable.download", - format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed", - op=self._debugname, validatedthingproxy=str(validatedthingproxy), - failure=f, level=level, umid="JGXxBA") - if not self._validatedthingproxies: - raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,)) - # try again with a different one - d = self._try_the_next_one() - return d - - def _try_the_next_one(self): - vtp = self._validatedthingproxies.pop(0) - # start() obtains, validates, and callsback-with the thing or else - # errbacks - d = vtp.start() - d.addErrback(self._bad, vtp) - return d - - def start(self): - return self._try_the_next_one() - -class ValidatedCrypttextHashTreeProxy: - implements(IValidatedThingProxy) - """ I am a front-end for a remote crypttext hash tree using a local - ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the - Validated Thing protocol (i.e., I have a start() method that fires with - self once I get a valid one).""" - def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, - fetch_failures=None): - # fetch_failures is for debugging -- see test_encode.py - self._readbucketproxy = readbucketproxy - self._num_segments = num_segments - self._fetch_failures = fetch_failures - self._crypttext_hash_tree = crypttext_hash_tree - - def _validate(self, proposal): - ct_hashes = dict(list(enumerate(proposal))) - try: - self._crypttext_hash_tree.set_hashes(ct_hashes) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - if self._fetch_failures is not None: - self._fetch_failures["crypttext_hash_tree"] += 1 - raise BadOrMissingHash(le) - # If we now have enough of the crypttext hash tree to integrity-check - # *any* segment of ciphertext, then we are done. TODO: It would have - # better alacrity if we downloaded only part of the crypttext hash - # tree at a time. - for segnum in range(self._num_segments): - if self._crypttext_hash_tree.needed_hashes(segnum): - raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,)) - return self - - def start(self): - d = self._readbucketproxy.get_crypttext_hashes() - d.addCallback(self._validate) - return d - -class ValidatedExtendedURIProxy: - implements(IValidatedThingProxy) - """ I am a front-end for a remote UEB (using a local ReadBucketProxy), - responsible for retrieving and validating the elements from the UEB.""" - - def __init__(self, readbucketproxy, verifycap, fetch_failures=None): - # fetch_failures is for debugging -- see test_encode.py - self._fetch_failures = fetch_failures - self._readbucketproxy = readbucketproxy - precondition(IVerifierURI.providedBy(verifycap), verifycap) - self._verifycap = verifycap - - # required - self.segment_size = None - self.crypttext_root_hash = None - self.share_root_hash = None - - # computed - self.block_size = None - self.share_size = None - self.num_segments = None - self.tail_data_size = None - self.tail_segment_size = None - - # optional - self.crypttext_hash = None - - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string()) - - def _check_integrity(self, data): - h = hashutil.uri_extension_hash(data) - if h != self._verifycap.uri_extension_hash: - msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % - (self._readbucketproxy, - base32.b2a(self._verifycap.uri_extension_hash), - base32.b2a(h))) - if self._fetch_failures is not None: - self._fetch_failures["uri_extension"] += 1 - raise BadURIExtensionHashValue(msg) - else: - return data - - def _parse_and_validate(self, data): - self.share_size = mathutil.div_ceil(self._verifycap.size, - self._verifycap.needed_shares) - - d = uri.unpack_extension(data) - - # There are several kinds of things that can be found in a UEB. - # First, things that we really need to learn from the UEB in order to - # do this download. Next: things which are optional but not redundant - # -- if they are present in the UEB they will get used. Next, things - # that are optional and redundant. These things are required to be - # consistent: they don't have to be in the UEB, but if they are in - # the UEB then they will be checked for consistency with the - # already-known facts, and if they are inconsistent then an exception - # will be raised. These things aren't actually used -- they are just - # tested for consistency and ignored. Finally: things which are - # deprecated -- they ought not be in the UEB at all, and if they are - # present then a warning will be logged but they are otherwise - # ignored. - - # First, things that we really need to learn from the UEB: - # segment_size, crypttext_root_hash, and share_root_hash. - self.segment_size = d['segment_size'] - - self.block_size = mathutil.div_ceil(self.segment_size, - self._verifycap.needed_shares) - self.num_segments = mathutil.div_ceil(self._verifycap.size, - self.segment_size) - - self.tail_data_size = self._verifycap.size % self.segment_size - if not self.tail_data_size: - self.tail_data_size = self.segment_size - # padding for erasure code - self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, - self._verifycap.needed_shares) - - # Ciphertext hash tree root is mandatory, so that there is at most - # one ciphertext that matches this read-cap or verify-cap. The - # integrity check on the shares is not sufficient to prevent the - # original encoder from creating some shares of file A and other - # shares of file B. - self.crypttext_root_hash = d['crypttext_root_hash'] - - self.share_root_hash = d['share_root_hash'] - - - # Next: things that are optional and not redundant: crypttext_hash - if d.has_key('crypttext_hash'): - self.crypttext_hash = d['crypttext_hash'] - if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE: - raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) - - - # Next: things that are optional, redundant, and required to be - # consistent: codec_name, codec_params, tail_codec_params, - # num_segments, size, needed_shares, total_shares - if d.has_key('codec_name'): - if d['codec_name'] != "crs": - raise UnsupportedErasureCodec(d['codec_name']) - - if d.has_key('codec_params'): - ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) - if ucpss != self.segment_size: - raise BadURIExtension("inconsistent erasure code params: " - "ucpss: %s != self.segment_size: %s" % - (ucpss, self.segment_size)) - if ucpns != self._verifycap.needed_shares: - raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " - "self._verifycap.needed_shares: %s" % - (ucpns, self._verifycap.needed_shares)) - if ucpts != self._verifycap.total_shares: - raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " - "self._verifycap.total_shares: %s" % - (ucpts, self._verifycap.total_shares)) - - if d.has_key('tail_codec_params'): - utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) - if utcpss != self.tail_segment_size: - raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " - "self.tail_segment_size: %s, self._verifycap.size: %s, " - "self.segment_size: %s, self._verifycap.needed_shares: %s" - % (utcpss, self.tail_segment_size, self._verifycap.size, - self.segment_size, self._verifycap.needed_shares)) - if utcpns != self._verifycap.needed_shares: - raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " - "self._verifycap.needed_shares: %s" % (utcpns, - self._verifycap.needed_shares)) - if utcpts != self._verifycap.total_shares: - raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " - "self._verifycap.total_shares: %s" % (utcpts, - self._verifycap.total_shares)) - - if d.has_key('num_segments'): - if d['num_segments'] != self.num_segments: - raise BadURIExtension("inconsistent num_segments: size: %s, " - "segment_size: %s, computed_num_segments: %s, " - "ueb_num_segments: %s" % (self._verifycap.size, - self.segment_size, - self.num_segments, d['num_segments'])) - - if d.has_key('size'): - if d['size'] != self._verifycap.size: - raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % - (self._verifycap.size, d['size'])) - - if d.has_key('needed_shares'): - if d['needed_shares'] != self._verifycap.needed_shares: - raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " - "needed shares: %s" % (self._verifycap.total_shares, - d['needed_shares'])) - - if d.has_key('total_shares'): - if d['total_shares'] != self._verifycap.total_shares: - raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " - "total shares: %s" % (self._verifycap.total_shares, - d['total_shares'])) - - # Finally, things that are deprecated and ignored: plaintext_hash, - # plaintext_root_hash - if d.get('plaintext_hash'): - log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " - "and is no longer used. Ignoring. %s" % (self,)) - if d.get('plaintext_root_hash'): - log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " - "reasons and is no longer used. Ignoring. %s" % (self,)) - - return self - - def start(self): - """Fetch the UEB from bucket, compare its hash to the hash from - verifycap, then parse it. Returns a deferred which is called back - with self once the fetch is successful, or is erred back if it - fails.""" - d = self._readbucketproxy.get_uri_extension() - d.addCallback(self._check_integrity) - d.addCallback(self._parse_and_validate) - return d - -class ValidatedReadBucketProxy(log.PrefixingLogMixin): - """I am a front-end for a remote storage bucket, responsible for - retrieving and validating data from that bucket. - - My get_block() method is used by BlockDownloaders. - """ - - def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, - block_size, share_size): - """ share_hash_tree is required to have already been initialized with - the root hash (the number-0 hash), using the share_root_hash from the - UEB""" - precondition(share_hash_tree[0] is not None, share_hash_tree) - prefix = "%d-%s-%s" % (sharenum, bucket, - base32.b2a_l(share_hash_tree[0][:8], 60)) - log.PrefixingLogMixin.__init__(self, - facility="tahoe.immutable.download", - prefix=prefix) - self.sharenum = sharenum - self.bucket = bucket - self.share_hash_tree = share_hash_tree - self.num_blocks = num_blocks - self.block_size = block_size - self.share_size = share_size - self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) - - def get_all_sharehashes(self): - """Retrieve and validate all the share-hash-tree nodes that are - included in this share, regardless of whether we need them to - validate the share or not. Each share contains a minimal Merkle tree - chain, but there is lots of overlap, so usually we'll be using hashes - from other shares and not reading every single hash from this share. - The Verifier uses this function to read and validate every single - hash from this share. - - Call this (and wait for the Deferred it returns to fire) before - calling get_block() for the first time: this lets us check that the - share share contains enough hashes to validate its own data, and - avoids downloading any share hash twice. - - I return a Deferred which errbacks upon failure, probably with - BadOrMissingHash.""" - - d = self.bucket.get_share_hashes() - def _got_share_hashes(sh): - sharehashes = dict(sh) - try: - self.share_hash_tree.set_hashes(sharehashes) - except IndexError, le: - raise BadOrMissingHash(le) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - raise BadOrMissingHash(le) - d.addCallback(_got_share_hashes) - return d - - def get_all_blockhashes(self): - """Retrieve and validate all the block-hash-tree nodes that are - included in this share. Each share contains a full Merkle tree, but - we usually only fetch the minimal subset necessary for any particular - block. This function fetches everything at once. The Verifier uses - this function to validate the block hash tree. - - Call this (and wait for the Deferred it returns to fire) after - calling get_all_sharehashes() and before calling get_block() for the - first time: this lets us check that the share contains all block - hashes and avoids downloading them multiple times. - - I return a Deferred which errbacks upon failure, probably with - BadOrMissingHash. - """ - - # get_block_hashes(anything) currently always returns everything - needed = list(range(len(self.block_hash_tree))) - d = self.bucket.get_block_hashes(needed) - def _got_block_hashes(blockhashes): - if len(blockhashes) < len(self.block_hash_tree): - raise BadOrMissingHash() - bh = dict(enumerate(blockhashes)) - - try: - self.block_hash_tree.set_hashes(bh) - except IndexError, le: - raise BadOrMissingHash(le) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - raise BadOrMissingHash(le) - d.addCallback(_got_block_hashes) - return d - - def get_all_crypttext_hashes(self, crypttext_hash_tree): - """Retrieve and validate all the crypttext-hash-tree nodes that are - in this share. Normally we don't look at these at all: the download - process fetches them incrementally as needed to validate each segment - of ciphertext. But this is a convenient place to give the Verifier a - function to validate all of these at once. - - Call this with a new hashtree object for each share, initialized with - the crypttext hash tree root. I return a Deferred which errbacks upon - failure, probably with BadOrMissingHash. - """ - - # get_crypttext_hashes() always returns everything - d = self.bucket.get_crypttext_hashes() - def _got_crypttext_hashes(hashes): - if len(hashes) < len(crypttext_hash_tree): - raise BadOrMissingHash() - ct_hashes = dict(enumerate(hashes)) - try: - crypttext_hash_tree.set_hashes(ct_hashes) - except IndexError, le: - raise BadOrMissingHash(le) - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - raise BadOrMissingHash(le) - d.addCallback(_got_crypttext_hashes) - return d - - def get_block(self, blocknum): - # the first time we use this bucket, we need to fetch enough elements - # of the share hash tree to validate it from our share hash up to the - # hashroot. - if self.share_hash_tree.needed_hashes(self.sharenum): - d1 = self.bucket.get_share_hashes() - else: - d1 = defer.succeed([]) - - # We might need to grab some elements of our block hash tree, to - # validate the requested block up to the share hash. - blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) - # We don't need the root of the block hash tree, as that comes in the - # share tree. - blockhashesneeded.discard(0) - d2 = self.bucket.get_block_hashes(blockhashesneeded) - - if blocknum < self.num_blocks-1: - thisblocksize = self.block_size - else: - thisblocksize = self.share_size % self.block_size - if thisblocksize == 0: - thisblocksize = self.block_size - d3 = self.bucket.get_block_data(blocknum, - self.block_size, thisblocksize) - - dl = deferredutil.gatherResults([d1, d2, d3]) - dl.addCallback(self._got_data, blocknum) - return dl - - def _got_data(self, results, blocknum): - precondition(blocknum < self.num_blocks, - self, blocknum, self.num_blocks) - sharehashes, blockhashes, blockdata = results - try: - sharehashes = dict(sharehashes) - except ValueError, le: - le.args = tuple(le.args + (sharehashes,)) - raise - blockhashes = dict(enumerate(blockhashes)) - - candidate_share_hash = None # in case we log it in the except block below - blockhash = None # in case we log it in the except block below - - try: - if self.share_hash_tree.needed_hashes(self.sharenum): - # This will raise exception if the values being passed do not - # match the root node of self.share_hash_tree. - try: - self.share_hash_tree.set_hashes(sharehashes) - except IndexError, le: - # Weird -- sharehashes contained index numbers outside of - # the range that fit into this hash tree. - raise BadOrMissingHash(le) - - # To validate a block we need the root of the block hash tree, - # which is also one of the leafs of the share hash tree, and is - # called "the share hash". - if not self.block_hash_tree[0]: # empty -- no root node yet - # Get the share hash from the share hash tree. - share_hash = self.share_hash_tree.get_leaf(self.sharenum) - if not share_hash: - # No root node in block_hash_tree and also the share hash - # wasn't sent by the server. - raise hashtree.NotEnoughHashesError - self.block_hash_tree.set_hashes({0: share_hash}) - - if self.block_hash_tree.needed_hashes(blocknum): - self.block_hash_tree.set_hashes(blockhashes) - - blockhash = hashutil.block_hash(blockdata) - self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) - #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " - # "%r .. %r: %s" % - # (self.sharenum, blocknum, len(blockdata), - # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) - - except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: - # log.WEIRD: indicates undetected disk/network error, or more - # likely a programming error - self.log("hash failure in block=%d, shnum=%d on %s" % - (blocknum, self.sharenum, self.bucket)) - if self.block_hash_tree.needed_hashes(blocknum): - self.log(""" failure occurred when checking the block_hash_tree. - This suggests that either the block data was bad, or that the - block hashes we received along with it were bad.""") - else: - self.log(""" the failure probably occurred when checking the - share_hash_tree, which suggests that the share hashes we - received from the remote peer were bad.""") - self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) - self.log(" block length: %d" % len(blockdata)) - self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) - if len(blockdata) < 100: - self.log(" block data: %r" % (blockdata,)) - else: - self.log(" block data start/end: %r .. %r" % - (blockdata[:50], blockdata[-50:])) - self.log(" share hash tree:\n" + self.share_hash_tree.dump()) - self.log(" block hash tree:\n" + self.block_hash_tree.dump()) - lines = [] - for i,h in sorted(sharehashes.items()): - lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) - self.log(" sharehashes:\n" + "\n".join(lines) + "\n") - lines = [] - for i,h in blockhashes.items(): - lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) - log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") - raise BadOrMissingHash(le) - - # If we made it here, the block is good. If the hash trees didn't - # like what they saw, they would have raised a BadHashError, causing - # our caller to see a Failure and thus ignore this block (as well as - # dropping this bucket). - return blockdata - - - -class BlockDownloader(log.PrefixingLogMixin): - """I am responsible for downloading a single block (from a single bucket) - for a single segment. - - I am a child of the SegmentDownloader. - """ - - def __init__(self, vbucket, blocknum, parent, results): - precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket) - prefix = "%s-%d" % (vbucket, blocknum) - log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) - self.vbucket = vbucket - self.blocknum = blocknum - self.parent = parent - self.results = results - - def start(self, segnum): - self.log("get_block(segnum=%d)" % segnum) - started = time.time() - d = self.vbucket.get_block(segnum) - d.addCallbacks(self._hold_block, self._got_block_error, - callbackArgs=(started,)) - return d - - def _hold_block(self, data, started): - if self.results: - elapsed = time.time() - started - peerid = self.vbucket.bucket.get_peerid() - if peerid not in self.results.timings["fetch_per_server"]: - self.results.timings["fetch_per_server"][peerid] = [] - self.results.timings["fetch_per_server"][peerid].append(elapsed) - self.log("got block") - self.parent.hold_block(self.blocknum, data) - - def _got_block_error(self, f): - f.trap(RemoteException, DeadReferenceError, - IntegrityCheckReject, layout.LayoutInvalid, - layout.ShareVersionIncompatible) - if f.check(RemoteException, DeadReferenceError): - level = log.UNUSUAL - else: - level = log.WEIRD - self.log("failure to get block", level=level, umid="5Z4uHQ") - if self.results: - peerid = self.vbucket.bucket.get_peerid() - self.results.server_problems[peerid] = str(f) - self.parent.bucket_failed(self.vbucket) - -class SegmentDownloader: - """I am responsible for downloading all the blocks for a single segment - of data. - - I am a child of the CiphertextDownloader. - """ - - def __init__(self, parent, segmentnumber, needed_shares, results): - self.parent = parent - self.segmentnumber = segmentnumber - self.needed_blocks = needed_shares - self.blocks = {} # k: blocknum, v: data - self.results = results - self._log_number = self.parent.log("starting segment %d" % - segmentnumber) - - def log(self, *args, **kwargs): - if "parent" not in kwargs: - kwargs["parent"] = self._log_number - return self.parent.log(*args, **kwargs) - - def start(self): - return self._download() - - def _download(self): - d = self._try() - def _done(res): - if len(self.blocks) >= self.needed_blocks: - # we only need self.needed_blocks blocks - # we want to get the smallest blockids, because they are - # more likely to be fast "primary blocks" - blockids = sorted(self.blocks.keys())[:self.needed_blocks] - blocks = [] - for blocknum in blockids: - blocks.append(self.blocks[blocknum]) - return (blocks, blockids) - else: - return self._download() - d.addCallback(_done) - return d - - def _try(self): - # fill our set of active buckets, maybe raising NotEnoughSharesError - active_buckets = self.parent._activate_enough_buckets() - # Now we have enough buckets, in self.parent.active_buckets. - - # in test cases, bd.start might mutate active_buckets right away, so - # we need to put off calling start() until we've iterated all the way - # through it. - downloaders = [] - for blocknum, vbucket in active_buckets.iteritems(): - assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket - bd = BlockDownloader(vbucket, blocknum, self, self.results) - downloaders.append(bd) - if self.results: - self.results.servers_used.add(vbucket.bucket.get_peerid()) - l = [bd.start(self.segmentnumber) for bd in downloaders] - return defer.DeferredList(l, fireOnOneErrback=True) - - def hold_block(self, blocknum, data): - self.blocks[blocknum] = data - - def bucket_failed(self, vbucket): - self.parent.bucket_failed(vbucket) - -class DownloadStatus: - implements(IDownloadStatus) - statusid_counter = itertools.count(0) - - def __init__(self): - self.storage_index = None - self.size = None - self.helper = False - self.status = "Not started" - self.progress = 0.0 - self.paused = False - self.stopped = False - self.active = True - self.results = None - self.counter = self.statusid_counter.next() - self.started = time.time() - - def get_started(self): - return self.started - def get_storage_index(self): - return self.storage_index - def get_size(self): - return self.size - def using_helper(self): - return self.helper - def get_status(self): - status = self.status - if self.paused: - status += " (output paused)" - if self.stopped: - status += " (output stopped)" - return status - def get_progress(self): - return self.progress - def get_active(self): - return self.active - def get_results(self): - return self.results - def get_counter(self): - return self.counter - - def set_storage_index(self, si): - self.storage_index = si - def set_size(self, size): - self.size = size - def set_helper(self, helper): - self.helper = helper - def set_status(self, status): - self.status = status - def set_paused(self, paused): - self.paused = paused - def set_stopped(self, stopped): - self.stopped = stopped - def set_progress(self, value): - self.progress = value - def set_active(self, value): - self.active = value - def set_results(self, value): - self.results = value - -class CiphertextDownloader(log.PrefixingLogMixin): - """ I download shares, check their integrity, then decode them, check the - integrity of the resulting ciphertext, then and write it to my target. - Before I send any new request to a server, I always ask the 'monitor' - object that was passed into my constructor whether this task has been - cancelled (by invoking its raise_if_cancelled() method).""" - implements(IPushProducer) - _status = None - - def __init__(self, storage_broker, v, target, monitor): - - precondition(IStorageBroker.providedBy(storage_broker), storage_broker) - precondition(IVerifierURI.providedBy(v), v) - precondition(IDownloadTarget.providedBy(target), target) - - self._storage_broker = storage_broker - self._verifycap = v - self._storage_index = v.get_storage_index() - self._uri_extension_hash = v.uri_extension_hash - - prefix=base32.b2a_l(self._storage_index[:8], 60) - log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) - - self._started = time.time() - self._status = s = DownloadStatus() - s.set_status("Starting") - s.set_storage_index(self._storage_index) - s.set_size(self._verifycap.size) - s.set_helper(False) - s.set_active(True) - - self._results = DownloadResults() - s.set_results(self._results) - self._results.file_size = self._verifycap.size - self._results.timings["servers_peer_selection"] = {} - self._results.timings["fetch_per_server"] = {} - self._results.timings["cumulative_fetch"] = 0.0 - self._results.timings["cumulative_decode"] = 0.0 - self._results.timings["cumulative_decrypt"] = 0.0 - self._results.timings["paused"] = 0.0 - - self._paused = False - self._stopped = False - if IConsumer.providedBy(target): - target.registerProducer(self, True) - self._target = target - # Repairer (uploader) needs the storageindex. - self._target.set_storageindex(self._storage_index) - self._monitor = monitor - self._opened = False - - self.active_buckets = {} # k: shnum, v: bucket - self._share_buckets = {} # k: sharenum, v: list of buckets - - # _download_all_segments() will set this to: - # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets - self._share_vbuckets = None - - self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } - - self._ciphertext_hasher = hashutil.crypttext_hasher() - - self._bytes_done = 0 - self._status.set_progress(float(self._bytes_done)/self._verifycap.size) - - # _got_uri_extension() will create the following: - # self._crypttext_hash_tree - # self._share_hash_tree - # self._current_segnum = 0 - # self._vup # ValidatedExtendedURIProxy - - # _get_all_shareholders() will create the following: - # self._total_queries - # self._responses_received = 0 - # self._queries_failed = 0 - - # This is solely for the use of unit tests. It will be triggered when - # we start downloading shares. - self._stage_4_d = defer.Deferred() - - def pauseProducing(self): - if self._paused: - return - self._paused = defer.Deferred() - self._paused_at = time.time() - if self._status: - self._status.set_paused(True) - - def resumeProducing(self): - if self._paused: - paused_for = time.time() - self._paused_at - self._results.timings['paused'] += paused_for - p = self._paused - self._paused = None - eventually(p.callback, None) - if self._status: - self._status.set_paused(False) - - def stopProducing(self): - self.log("Download.stopProducing") - self._stopped = True - self.resumeProducing() - if self._status: - self._status.set_stopped(True) - self._status.set_active(False) - - def start(self): - self.log("starting download") - - # first step: who should we download from? - d = defer.maybeDeferred(self._get_all_shareholders) - d.addBoth(self._got_all_shareholders) - # now get the uri_extension block from somebody and integrity check - # it and parse and validate its contents - d.addCallback(self._obtain_uri_extension) - d.addCallback(self._get_crypttext_hash_tree) - # once we know that, we can download blocks from everybody - d.addCallback(self._download_all_segments) - def _finished(res): - if self._status: - self._status.set_status("Finished") - self._status.set_active(False) - self._status.set_paused(False) - if IConsumer.providedBy(self._target): - self._target.unregisterProducer() - return res - d.addBoth(_finished) - def _failed(why): - if self._status: - self._status.set_status("Failed") - self._status.set_active(False) - if why.check(DownloadStopped): - # DownloadStopped just means the consumer aborted the - # download; not so scary. - self.log("download stopped", level=log.UNUSUAL) - else: - # This is really unusual, and deserves maximum forensics. - self.log("download failed!", failure=why, level=log.SCARY, - umid="lp1vaQ") - return why - d.addErrback(_failed) - d.addCallback(self._done) - return d - - def _get_all_shareholders(self): - """ Once the number of buckets that I know about is >= K then I - callback the Deferred that I return. - - If all of the get_buckets deferreds have fired (whether callback - or errback) and I still don't have enough buckets then I'll also - callback -- not errback -- the Deferred that I return. - """ - wait_for_enough_buckets_d = defer.Deferred() - self._wait_for_enough_buckets_d = wait_for_enough_buckets_d - - sb = self._storage_broker - servers = sb.get_servers_for_index(self._storage_index) - if not servers: - raise NoServersError("broker gave us no servers!") - - self._total_queries = len(servers) - self._responses_received = 0 - self._queries_failed = 0 - for (peerid,ss) in servers: - self.log(format="sending DYHB to [%(peerid)s]", - peerid=idlib.shortnodeid_b2a(peerid), - level=log.NOISY, umid="rT03hg") - d = ss.callRemote("get_buckets", self._storage_index) - d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(peerid,)) - d.addBoth(self._check_got_all_responses) - - if self._status: - self._status.set_status("Locating Shares (%d/%d)" % - (self._responses_received, - self._total_queries)) - return wait_for_enough_buckets_d - - def _check_got_all_responses(self, ignored=None): - assert (self._responses_received+self._queries_failed) <= self._total_queries - if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries: - reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False) - self._wait_for_enough_buckets_d = None - - def _got_response(self, buckets, peerid): - # Note that this can continue to receive responses after _wait_for_enough_buckets_d - # has fired. - self._responses_received += 1 - self.log(format="got results from [%(peerid)s]: shnums %(shnums)s", - peerid=idlib.shortnodeid_b2a(peerid), - shnums=sorted(buckets.keys()), - level=log.NOISY, umid="o4uwFg") - if self._results: - elapsed = time.time() - self._started - self._results.timings["servers_peer_selection"][peerid] = elapsed - if self._status: - self._status.set_status("Locating Shares (%d/%d)" % - (self._responses_received, - self._total_queries)) - for sharenum, bucket in buckets.iteritems(): - b = layout.ReadBucketProxy(bucket, peerid, self._storage_index) - self.add_share_bucket(sharenum, b) - # If we just got enough buckets for the first time, then fire the - # deferred. Then remove it from self so that we don't fire it - # again. - if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares: - reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True) - self._wait_for_enough_buckets_d = None - - if self._share_vbuckets is not None: - vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) - self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) - - if self._results: - if peerid not in self._results.servermap: - self._results.servermap[peerid] = set() - self._results.servermap[peerid].add(sharenum) - - def add_share_bucket(self, sharenum, bucket): - # this is split out for the benefit of test_encode.py - self._share_buckets.setdefault(sharenum, []).append(bucket) - - def _got_error(self, f): - self._queries_failed += 1 - level = log.WEIRD - if f.check(DeadReferenceError): - level = log.UNUSUAL - self.log("Error during get_buckets", failure=f, level=level, - umid="3uuBUQ") - - def bucket_failed(self, vbucket): - shnum = vbucket.sharenum - del self.active_buckets[shnum] - s = self._share_vbuckets[shnum] - # s is a set of ValidatedReadBucketProxy instances - s.remove(vbucket) - # ... which might now be empty - if not s: - # there are no more buckets which can provide this share, so - # remove the key. This may prompt us to use a different share. - del self._share_vbuckets[shnum] - - def _got_all_shareholders(self, res): - if self._results: - now = time.time() - self._results.timings["peer_selection"] = now - self._started - - if len(self._share_buckets) < self._verifycap.needed_shares: - msg = "Failed to get enough shareholders: have %d, need %d" \ - % (len(self._share_buckets), self._verifycap.needed_shares) - if self._share_buckets: - raise NotEnoughSharesError(msg) - else: - raise NoSharesError(msg) - - #for s in self._share_vbuckets.values(): - # for vb in s: - # assert isinstance(vb, ValidatedReadBucketProxy), \ - # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,) - - def _obtain_uri_extension(self, ignored): - # all shareholders are supposed to have a copy of uri_extension, and - # all are supposed to be identical. We compute the hash of the data - # that comes back, and compare it against the version in our URI. If - # they don't match, ignore their data and try someone else. - if self._status: - self._status.set_status("Obtaining URI Extension") - - uri_extension_fetch_started = time.time() - - vups = [] - for sharenum, buckets in self._share_buckets.iteritems(): - for bucket in buckets: - vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) - vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid) - d = vto.start() - - def _got_uri_extension(vup): - precondition(isinstance(vup, ValidatedExtendedURIProxy), vup) - if self._results: - elapsed = time.time() - uri_extension_fetch_started - self._results.timings["uri_extension"] = elapsed - - self._vup = vup - self._codec = codec.CRSDecoder() - self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares) - self._tail_codec = codec.CRSDecoder() - self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares) - - self._current_segnum = 0 - - self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares) - self._share_hash_tree.set_hashes({0: vup.share_root_hash}) - - self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments) - self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash}) - - # Repairer (uploader) needs the encodingparams. - self._target.set_encodingparams(( - self._verifycap.needed_shares, - 0, # see ticket #778 for why this is - self._verifycap.total_shares, - self._vup.segment_size - )) - d.addCallback(_got_uri_extension) - return d - - def _get_crypttext_hash_tree(self, res): - vchtps = [] - for sharenum, buckets in self._share_buckets.iteritems(): - for bucket in buckets: - vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) - vchtps.append(vchtp) - - _get_crypttext_hash_tree_started = time.time() - if self._status: - self._status.set_status("Retrieving crypttext hash tree") - - vto = ValidatedThingObtainer(vchtps, debugname="vchtps", - log_id=self._parentmsgid) - d = vto.start() - - def _got_crypttext_hash_tree(res): - # Good -- the self._crypttext_hash_tree that we passed to vchtp - # is now populated with hashes. - if self._results: - elapsed = time.time() - _get_crypttext_hash_tree_started - self._results.timings["hashtrees"] = elapsed - d.addCallback(_got_crypttext_hash_tree) - return d - - def _activate_enough_buckets(self): - """either return a mapping from shnum to a ValidatedReadBucketProxy - that can provide data for that share, or raise NotEnoughSharesError""" - - while len(self.active_buckets) < self._verifycap.needed_shares: - # need some more - handled_shnums = set(self.active_buckets.keys()) - available_shnums = set(self._share_vbuckets.keys()) - potential_shnums = list(available_shnums - handled_shnums) - if len(potential_shnums) < (self._verifycap.needed_shares - - len(self.active_buckets)): - have = len(potential_shnums) + len(self.active_buckets) - msg = "Unable to activate enough shares: have %d, need %d" \ - % (have, self._verifycap.needed_shares) - if have: - raise NotEnoughSharesError(msg) - else: - raise NoSharesError(msg) - # For the next share, choose a primary share if available, else a - # randomly chosen secondary share. - potential_shnums.sort() - if potential_shnums[0] < self._verifycap.needed_shares: - shnum = potential_shnums[0] - else: - shnum = random.choice(potential_shnums) - # and a random bucket that will provide it - validated_bucket = random.choice(list(self._share_vbuckets[shnum])) - self.active_buckets[shnum] = validated_bucket - return self.active_buckets - - - def _download_all_segments(self, res): - # From now on if new buckets are received then I will notice that - # self._share_vbuckets is not None and generate a vbucket for that new - # bucket and add it in to _share_vbuckets. (We had to wait because we - # didn't have self._vup and self._share_hash_tree earlier. We didn't - # need validated buckets until now -- now that we are ready to download - # shares.) - self._share_vbuckets = {} - for sharenum, buckets in self._share_buckets.iteritems(): - for bucket in buckets: - vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) - self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) - - # after the above code, self._share_vbuckets contains enough - # buckets to complete the download, and some extra ones to - # tolerate some buckets dropping out or having - # errors. self._share_vbuckets is a dictionary that maps from - # shnum to a set of ValidatedBuckets, which themselves are - # wrappers around RIBucketReader references. - self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance - - self._started_fetching = time.time() - - d = defer.succeed(None) - for segnum in range(self._vup.num_segments): - d.addCallback(self._download_segment, segnum) - # this pause, at the end of write, prevents pre-fetch from - # happening until the consumer is ready for more data. - d.addCallback(self._check_for_pause) - - self._stage_4_d.callback(None) - return d - - def _check_for_pause(self, res): - if self._paused: - d = defer.Deferred() - self._paused.addCallback(lambda ignored: d.callback(res)) - return d - if self._stopped: - raise DownloadStopped("our Consumer called stopProducing()") - self._monitor.raise_if_cancelled() - return res - - def _download_segment(self, res, segnum): - if self._status: - self._status.set_status("Downloading segment %d of %d" % - (segnum+1, self._vup.num_segments)) - self.log("downloading seg#%d of %d (%d%%)" - % (segnum, self._vup.num_segments, - 100.0 * segnum / self._vup.num_segments)) - # memory footprint: when the SegmentDownloader finishes pulling down - # all shares, we have 1*segment_size of usage. - segmentdler = SegmentDownloader(self, segnum, - self._verifycap.needed_shares, - self._results) - started = time.time() - d = segmentdler.start() - def _finished_fetching(res): - elapsed = time.time() - started - self._results.timings["cumulative_fetch"] += elapsed - return res - if self._results: - d.addCallback(_finished_fetching) - # pause before using more memory - d.addCallback(self._check_for_pause) - # while the codec does its job, we hit 2*segment_size - def _started_decode(res): - self._started_decode = time.time() - return res - if self._results: - d.addCallback(_started_decode) - if segnum + 1 == self._vup.num_segments: - codec = self._tail_codec - else: - codec = self._codec - d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids)) - # once the codec is done, we drop back to 1*segment_size, because - # 'shares' goes out of scope. The memory usage is all in the - # plaintext now, spread out into a bunch of tiny buffers. - def _finished_decode(res): - elapsed = time.time() - self._started_decode - self._results.timings["cumulative_decode"] += elapsed - return res - if self._results: - d.addCallback(_finished_decode) - - # pause/check-for-stop just before writing, to honor stopProducing - d.addCallback(self._check_for_pause) - d.addCallback(self._got_segment) - return d - - def _got_segment(self, buffers): - precondition(self._crypttext_hash_tree) - started_decrypt = time.time() - self._status.set_progress(float(self._current_segnum)/self._verifycap.size) - - if self._current_segnum + 1 == self._vup.num_segments: - # This is the last segment. - # Trim off any padding added by the upload side. We never send - # empty segments. If the data was an exact multiple of the - # segment size, the last segment will be full. - tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares) - num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size) - # Remove buffers which don't contain any part of the tail. - del buffers[num_buffers_used:] - # Remove the past-the-tail-part of the last buffer. - tail_in_last_buf = self._vup.tail_data_size % tail_buf_size - if tail_in_last_buf == 0: - tail_in_last_buf = tail_buf_size - buffers[-1] = buffers[-1][:tail_in_last_buf] - - # First compute the hash of this segment and check that it fits. - ch = hashutil.crypttext_segment_hasher() - for buffer in buffers: - self._ciphertext_hasher.update(buffer) - ch.update(buffer) - self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()}) - - # Then write this segment to the target. - if not self._opened: - self._opened = True - self._target.open(self._verifycap.size) - - for buffer in buffers: - self._target.write(buffer) - self._bytes_done += len(buffer) - - self._status.set_progress(float(self._bytes_done)/self._verifycap.size) - self._current_segnum += 1 - - if self._results: - elapsed = time.time() - started_decrypt - self._results.timings["cumulative_decrypt"] += elapsed - - def _done(self, res): - self.log("download done") - if self._results: - now = time.time() - self._results.timings["total"] = now - self._started - self._results.timings["segments"] = now - self._started_fetching - if self._vup.crypttext_hash: - _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(), - "bad crypttext_hash: computed=%s, expected=%s" % - (base32.b2a(self._ciphertext_hasher.digest()), - base32.b2a(self._vup.crypttext_hash))) - _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size) - self._status.set_progress(1) - self._target.close() - return self._target.finish() - def get_download_status(self): - return self._status - - -class ConsumerAdapter: - implements(IDownloadTarget, IConsumer) - def __init__(self, consumer): - self._consumer = consumer - - def registerProducer(self, producer, streaming): - self._consumer.registerProducer(producer, streaming) - def unregisterProducer(self): - self._consumer.unregisterProducer() - - def open(self, size): - pass - def write(self, data): - self._consumer.write(data) - def close(self): - pass - - def fail(self, why): - pass - def register_canceller(self, cb): - pass - def finish(self): - return self._consumer - # The following methods are just because the target might be a - # repairer.DownUpConnector, and just because the current CHKUpload object - # expects to find the storage index and encoding parameters in its - # Uploadable. - def set_storageindex(self, storageindex): - pass - def set_encodingparams(self, encodingparams): - pass - - -class Downloader: - """I am a service that allows file downloading. - """ - # TODO: in fact, this service only downloads immutable files (URI:CHK:). - # It is scheduled to go away, to be replaced by filenode.download() - implements(IDownloader) - - def __init__(self, storage_broker, stats_provider): - self.storage_broker = storage_broker - self.stats_provider = stats_provider - self._all_downloads = weakref.WeakKeyDictionary() # for debugging - - def download(self, u, t, _log_msg_id=None, monitor=None, history=None): - assert isinstance(u, uri.CHKFileURI) - t = IDownloadTarget(t) - assert t.write - assert t.close - - if self.stats_provider: - # these counters are meant for network traffic, and don't - # include LIT files - self.stats_provider.count('downloader.files_downloaded', 1) - self.stats_provider.count('downloader.bytes_downloaded', u.get_size()) - - target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id) - if not monitor: - monitor=Monitor() - dl = CiphertextDownloader(self.storage_broker, - u.get_verify_cap(), target, - monitor=monitor) - self._all_downloads[dl] = None - if history: - history.add_download(dl.get_download_status()) - d = dl.start() - return d diff --git a/src/allmydata/immutable/downloader/__init__.py b/src/allmydata/immutable/downloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/allmydata/immutable/downloader/common.py b/src/allmydata/immutable/downloader/common.py new file mode 100644 index 0000000..e9dd271 --- /dev/null +++ b/src/allmydata/immutable/downloader/common.py @@ -0,0 +1,13 @@ + +(AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \ + ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM") + +class BadSegmentNumberError(Exception): + pass +class WrongSegmentError(Exception): + pass +class BadCiphertextHashError(Exception): + pass + +class DownloadStopped(Exception): + pass diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py new file mode 100644 index 0000000..3918f65 --- /dev/null +++ b/src/allmydata/immutable/downloader/fetcher.py @@ -0,0 +1,229 @@ + +from twisted.python.failure import Failure +from foolscap.api import eventually +from allmydata.interfaces import NotEnoughSharesError, NoSharesError +from allmydata.util import log +from allmydata.util.dictutil import DictOfSets +from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \ + BADSEGNUM, BadSegmentNumberError + +class SegmentFetcher: + """I am responsible for acquiring blocks for a single segment. I will use + the Share instances passed to my add_shares() method to locate, retrieve, + and validate those blocks. I expect my parent node to call my + no_more_shares() method when there are no more shares available. I will + call my parent's want_more_shares() method when I want more: I expect to + see at least one call to add_shares or no_more_shares afterwards. + + When I have enough validated blocks, I will call my parent's + process_blocks() method with a dictionary that maps shnum to blockdata. + If I am unable to provide enough blocks, I will call my parent's + fetch_failed() method with (self, f). After either of these events, I + will shut down and do no further work. My parent can also call my stop() + method to have me shut down early.""" + + def __init__(self, node, segnum, k): + self._node = node # _Node + self.segnum = segnum + self._k = k + self._shares = {} # maps non-dead Share instance to a state, one of + # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). + # State transition map is: + # AVAILABLE -(send-read)-> PENDING + # PENDING -(timer)-> OVERDUE + # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM + # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM + # If a share becomes DEAD, it is removed from the + # dict. If it becomes BADSEGNUM, the whole fetch is + # terminated. + self._share_observers = {} # maps Share to EventStreamObserver for + # active ones + self._shnums = DictOfSets() # maps shnum to the shares that provide it + self._blocks = {} # maps shnum to validated block data + self._no_more_shares = False + self._bad_segnum = False + self._last_failure = None + self._running = True + + def stop(self): + log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix, + level=log.NOISY, umid="LWyqpg") + self._cancel_all_requests() + self._running = False + self._shares.clear() # let GC work # ??? XXX + + + # called by our parent _Node + + def add_shares(self, shares): + # called when ShareFinder locates a new share, and when a non-initial + # segment fetch is started and we already know about shares from the + # previous segment + for s in shares: + self._shares[s] = AVAILABLE + self._shnums.add(s._shnum, s) + eventually(self.loop) + + def no_more_shares(self): + # ShareFinder tells us it's reached the end of its list + self._no_more_shares = True + eventually(self.loop) + + # internal methods + + def _count_shnums(self, *states): + """shnums for which at least one state is in the following list""" + shnums = [] + for shnum,shares in self._shnums.iteritems(): + matches = [s for s in shares if self._shares.get(s) in states] + if matches: + shnums.append(shnum) + return len(shnums) + + def loop(self): + try: + # if any exception occurs here, kill the download + self._do_loop() + except BaseException: + self._node.fetch_failed(self, Failure()) + raise + + def _do_loop(self): + k = self._k + if not self._running: + return + if self._bad_segnum: + # oops, we were asking for a segment number beyond the end of the + # file. This is an error. + self.stop() + e = BadSegmentNumberError("segnum=%d, numsegs=%d" % + (self.segnum, self._node.num_segments)) + f = Failure(e) + self._node.fetch_failed(self, f) + return + + # are we done? + if self._count_shnums(COMPLETE) >= k: + # yay! + self.stop() + self._node.process_blocks(self.segnum, self._blocks) + return + + # we may have exhausted everything + if (self._no_more_shares and + self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): + # no more new shares are coming, and the remaining hopeful shares + # aren't going to be enough. boo! + + log.msg("share states: %r" % (self._shares,), + level=log.NOISY, umid="0ThykQ") + if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: + format = ("no shares (need %(k)d)." + " Last failure: %(last_failure)s") + args = { "k": k, + "last_failure": self._last_failure } + error = NoSharesError + else: + format = ("ran out of shares: %(complete)d complete," + " %(pending)d pending, %(overdue)d overdue," + " %(unused)d unused, need %(k)d." + " Last failure: %(last_failure)s") + args = {"complete": self._count_shnums(COMPLETE), + "pending": self._count_shnums(PENDING), + "overdue": self._count_shnums(OVERDUE), + # 'unused' should be zero + "unused": self._count_shnums(AVAILABLE), + "k": k, + "last_failure": self._last_failure, + } + error = NotEnoughSharesError + log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) + e = error(format % args) + f = Failure(e) + self.stop() + self._node.fetch_failed(self, f) + return + + # nope, not done. Are we "block-hungry" (i.e. do we want to send out + # more read requests, or do we think we have enough in flight + # already?) + while self._count_shnums(PENDING, COMPLETE) < k: + # we're hungry.. are there any unused shares? + sent = self._send_new_request() + if not sent: + break + + # ok, now are we "share-hungry" (i.e. do we have enough known shares + # to make us happy, or should we ask the ShareFinder to get us more?) + if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: + # we're hungry for more shares + self._node.want_more_shares() + # that will trigger the ShareFinder to keep looking + + def _find_one(self, shares, state): + # TODO could choose fastest + for s in shares: + if self._shares[s] == state: + return s + # can never get here, caller has assert in case of code bug + + def _send_new_request(self): + for shnum,shares in sorted(self._shnums.iteritems()): + states = [self._shares[s] for s in shares] + if COMPLETE in states or PENDING in states: + # don't send redundant requests + continue + if AVAILABLE not in states: + # no candidates for this shnum, move on + continue + # here's a candidate. Send a request. + s = self._find_one(shares, AVAILABLE) + assert s + self._shares[s] = PENDING + self._share_observers[s] = o = s.get_block(self.segnum) + o.subscribe(self._block_request_activity, share=s, shnum=shnum) + # TODO: build up a list of candidates, then walk through the + # list, sending requests to the most desireable servers, + # re-checking our block-hunger each time. For non-initial segment + # fetches, this would let us stick with faster servers. + return True + # nothing was sent: don't call us again until you have more shares to + # work with, or one of the existing shares has been declared OVERDUE + return False + + def _cancel_all_requests(self): + for o in self._share_observers.values(): + o.cancel() + self._share_observers = {} + + def _block_request_activity(self, share, shnum, state, block=None, f=None): + # called by Shares, in response to our s.send_request() calls. + if not self._running: + return + log.msg("SegmentFetcher(%s)._block_request_activity:" + " Share(sh%d-on-%s) -> %s" % + (self._node._si_prefix, shnum, share._peerid_s, state), + level=log.NOISY, umid="vilNWA") + # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. + if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): + self._share_observers.pop(share, None) + if state is COMPLETE: + # 'block' is fully validated + self._shares[share] = COMPLETE + self._blocks[shnum] = block + elif state is OVERDUE: + self._shares[share] = OVERDUE + # OVERDUE is not terminal: it will eventually transition to + # COMPLETE, CORRUPT, or DEAD. + elif state is CORRUPT: + self._shares[share] = CORRUPT + elif state is DEAD: + del self._shares[share] + self._shnums[shnum].remove(share) + self._last_failure = f + elif state is BADSEGNUM: + self._shares[share] = BADSEGNUM # ??? + self._bad_segnum = True + eventually(self.loop) + + diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py new file mode 100644 index 0000000..9adee99 --- /dev/null +++ b/src/allmydata/immutable/downloader/finder.py @@ -0,0 +1,227 @@ + +import time +now = time.time +from foolscap.api import eventually +from allmydata.util import base32, log, idlib +from twisted.internet import reactor + +from share import Share, CommonShare + +def incidentally(res, f, *args, **kwargs): + """Add me to a Deferred chain like this: + d.addBoth(incidentally, func, arg) + and I'll behave as if you'd added the following function: + def _(res): + func(arg) + return res + This is useful if you want to execute an expression when the Deferred + fires, but don't care about its value. + """ + f(*args, **kwargs) + return res + +class RequestToken: + def __init__(self, peerid): + self.peerid = peerid + +class ShareFinder: + OVERDUE_TIMEOUT = 10.0 + + def __init__(self, storage_broker, verifycap, node, download_status, + logparent=None, max_outstanding_requests=10): + self.running = True # stopped by Share.stop, from Terminator + self.verifycap = verifycap + self._started = False + self._storage_broker = storage_broker + self.share_consumer = self.node = node + self.max_outstanding_requests = max_outstanding_requests + + self._hungry = False + + self._commonshares = {} # shnum to CommonShare instance + self.undelivered_shares = [] + self.pending_requests = set() + self.overdue_requests = set() # subset of pending_requests + self.overdue_timers = {} + + self._storage_index = verifycap.storage_index + self._si_prefix = base32.b2a_l(self._storage_index[:8], 60) + self._node_logparent = logparent + self._download_status = download_status + self._lp = log.msg(format="ShareFinder[si=%(si)s] starting", + si=self._si_prefix, + level=log.NOISY, parent=logparent, umid="2xjj2A") + + def start_finding_servers(self): + # don't get servers until somebody uses us: creating the + # ImmutableFileNode should not cause work to happen yet. Test case is + # test_dirnode, which creates us with storage_broker=None + if not self._started: + si = self.verifycap.storage_index + s = self._storage_broker.get_servers_for_index(si) + self._servers = iter(s) + self._started = True + + def log(self, *args, **kwargs): + if "parent" not in kwargs: + kwargs["parent"] = self._lp + return log.msg(*args, **kwargs) + + def stop(self): + self.running = False + while self.overdue_timers: + req,t = self.overdue_timers.popitem() + t.cancel() + + # called by our parent CiphertextDownloader + def hungry(self): + self.log(format="ShareFinder[si=%(si)s] hungry", + si=self._si_prefix, level=log.NOISY, umid="NywYaQ") + self.start_finding_servers() + self._hungry = True + eventually(self.loop) + + # internal methods + def loop(self): + undelivered_s = ",".join(["sh%d@%s" % + (s._shnum, idlib.shortnodeid_b2a(s._peerid)) + for s in self.undelivered_shares]) + pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) + for rt in self.pending_requests]) # sort? + self.log(format="ShareFinder loop: running=%(running)s" + " hungry=%(hungry)s, undelivered=%(undelivered)s," + " pending=%(pending)s", + running=self.running, hungry=self._hungry, + undelivered=undelivered_s, pending=pending_s, + level=log.NOISY, umid="kRtS4Q") + if not self.running: + return + if not self._hungry: + return + if self.undelivered_shares: + sh = self.undelivered_shares.pop(0) + # they will call hungry() again if they want more + self._hungry = False + self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)", + shnum=sh._shnum, peerid=sh._peerid_s, + level=log.NOISY, umid="2n1qQw") + eventually(self.share_consumer.got_shares, [sh]) + return + + non_overdue = self.pending_requests - self.overdue_requests + if len(non_overdue) >= self.max_outstanding_requests: + # cannot send more requests, must wait for some to retire + return + + server = None + try: + if self._servers: + server = self._servers.next() + except StopIteration: + self._servers = None + + if server: + self.send_request(server) + # we loop again to get parallel queries. The check above will + # prevent us from looping forever. + eventually(self.loop) + return + + if self.pending_requests: + # no server, but there are still requests in flight: maybe one of + # them will make progress + return + + self.log(format="ShareFinder.loop: no_more_shares, ever", + level=log.UNUSUAL, umid="XjQlzg") + # we've run out of servers (so we can't send any more requests), and + # we have nothing in flight. No further progress can be made. They + # are destined to remain hungry. + self.share_consumer.no_more_shares() + + def send_request(self, server): + peerid, rref = server + req = RequestToken(peerid) + self.pending_requests.add(req) + lp = self.log(format="sending DYHB to [%(peerid)s]", + peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, umid="Io7pyg") + d_ev = self._download_status.add_dyhb_sent(peerid, now()) + # TODO: get the timer from a Server object, it knows best + self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, + self.overdue, req) + d = rref.callRemote("get_buckets", self._storage_index) + d.addBoth(incidentally, self._request_retired, req) + d.addCallbacks(self._got_response, self._got_error, + callbackArgs=(rref.version, peerid, req, d_ev, lp), + errbackArgs=(peerid, req, d_ev, lp)) + d.addErrback(log.err, format="error in send_request", + level=log.WEIRD, parent=lp, umid="rpdV0w") + d.addCallback(incidentally, eventually, self.loop) + + def _request_retired(self, req): + self.pending_requests.discard(req) + self.overdue_requests.discard(req) + if req in self.overdue_timers: + self.overdue_timers[req].cancel() + del self.overdue_timers[req] + + def overdue(self, req): + del self.overdue_timers[req] + assert req in self.pending_requests # paranoia, should never be false + self.overdue_requests.add(req) + eventually(self.loop) + + def _got_response(self, buckets, server_version, peerid, req, d_ev, lp): + shnums = sorted([shnum for shnum in buckets]) + d_ev.finished(shnums, now()) + if buckets: + shnums_s = ",".join([str(shnum) for shnum in shnums]) + self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", + shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="0fcEZw") + else: + self.log(format="no shares from [%(peerid)s]", + peerid=idlib.shortnodeid_b2a(peerid), + level=log.NOISY, parent=lp, umid="U7d4JA") + if self.node.num_segments is None: + best_numsegs = self.node.guessed_num_segments + else: + best_numsegs = self.node.num_segments + for shnum, bucket in buckets.iteritems(): + self._create_share(best_numsegs, shnum, bucket, server_version, + peerid) + + def _create_share(self, best_numsegs, shnum, bucket, server_version, + peerid): + if shnum in self._commonshares: + cs = self._commonshares[shnum] + else: + cs = CommonShare(best_numsegs, self._si_prefix, shnum, + self._node_logparent) + # Share._get_satisfaction is responsible for updating + # CommonShare.set_numsegs after we know the UEB. Alternatives: + # 1: d = self.node.get_num_segments() + # d.addCallback(cs.got_numsegs) + # the problem is that the OneShotObserverList I was using + # inserts an eventual-send between _get_satisfaction's + # _satisfy_UEB and _satisfy_block_hash_tree, and the + # CommonShare didn't get the num_segs message before + # being asked to set block hash values. To resolve this + # would require an immediate ObserverList instead of + # an eventual-send -based one + # 2: break _get_satisfaction into Deferred-attached pieces. + # Yuck. + self._commonshares[shnum] = cs + s = Share(bucket, server_version, self.verifycap, cs, self.node, + self._download_status, peerid, shnum, + self._node_logparent) + self.undelivered_shares.append(s) + + def _got_error(self, f, peerid, req, d_ev, lp): + d_ev.finished("error", now()) + self.log(format="got error from [%(peerid)s]", + peerid=idlib.shortnodeid_b2a(peerid), failure=f, + level=log.UNUSUAL, parent=lp, umid="zUKdCw") + + diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py new file mode 100644 index 0000000..2991c9e --- /dev/null +++ b/src/allmydata/immutable/downloader/node.py @@ -0,0 +1,471 @@ + +import time +now = time.time +from twisted.python.failure import Failure +from twisted.internet import defer +from foolscap.api import eventually +from allmydata import uri +from allmydata.codec import CRSDecoder +from allmydata.util import base32, log, hashutil, mathutil, observer +from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE +from allmydata.hashtree import IncompleteHashTree, BadHashError, \ + NotEnoughHashesError + +# local imports +from finder import ShareFinder +from fetcher import SegmentFetcher +from segmentation import Segmentation +from common import BadCiphertextHashError + +class Cancel: + def __init__(self, f): + self._f = f + self.cancelled = False + def cancel(self): + if not self.cancelled: + self.cancelled = True + self._f(self) + +class DownloadNode: + """Internal class which manages downloads and holds state. External + callers use CiphertextFileNode instead.""" + + # Share._node points to me + def __init__(self, verifycap, storage_broker, secret_holder, + terminator, history, download_status): + assert isinstance(verifycap, uri.CHKFileVerifierURI) + self._verifycap = verifycap + self._storage_broker = storage_broker + self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60) + self.running = True + if terminator: + terminator.register(self) # calls self.stop() at stopService() + # the rules are: + # 1: Only send network requests if you're active (self.running is True) + # 2: Use TimerService, not reactor.callLater + # 3: You can do eventual-sends any time. + # These rules should mean that once + # stopService()+flushEventualQueue() fires, everything will be done. + self._secret_holder = secret_holder + self._history = history + self._download_status = download_status + + k, N = self._verifycap.needed_shares, self._verifycap.total_shares + self.share_hash_tree = IncompleteHashTree(N) + + # we guess the segment size, so Segmentation can pull non-initial + # segments in a single roundtrip. This populates + # .guessed_segment_size, .guessed_num_segments, and + # .ciphertext_hash_tree (with a dummy, to let us guess which hashes + # we'll need) + self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE) + + # filled in when we parse a valid UEB + self.have_UEB = False + self.segment_size = None + self.tail_segment_size = None + self.tail_segment_padded = None + self.num_segments = None + self.block_size = None + self.tail_block_size = None + + # things to track callers that want data + + # _segment_requests can have duplicates + self._segment_requests = [] # (segnum, d, cancel_handle) + self._active_segment = None # a SegmentFetcher, with .segnum + + self._segsize_observers = observer.OneShotObserverList() + + # we create one top-level logparent for this _Node, and another one + # for each read() call. Segmentation and get_segment() messages are + # associated with the read() call, everything else is tied to the + # _Node's log entry. + lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d," + " guessed_segsize=%(guessed_segsize)d," + " guessed_numsegs=%(guessed_numsegs)d", + si=self._si_prefix, size=verifycap.size, + guessed_segsize=self.guessed_segment_size, + guessed_numsegs=self.guessed_num_segments, + level=log.OPERATIONAL, umid="uJ0zAQ") + self._lp = lp + + self._sharefinder = ShareFinder(storage_broker, verifycap, self, + self._download_status, lp) + self._shares = set() + + def _build_guessed_tables(self, max_segment_size): + size = min(self._verifycap.size, max_segment_size) + s = mathutil.next_multiple(size, self._verifycap.needed_shares) + self.guessed_segment_size = s + r = self._calculate_sizes(self.guessed_segment_size) + self.guessed_num_segments = r["num_segments"] + # as with CommonShare, our ciphertext_hash_tree is a stub until we + # get the real num_segments + self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) + + def __repr__(self): + return "Imm_Node(%s)" % (self._si_prefix,) + + def stop(self): + # called by the Terminator at shutdown, mostly for tests + if self._active_segment: + self._active_segment.stop() + self._active_segment = None + self._sharefinder.stop() + + # things called by outside callers, via CiphertextFileNode. get_segment() + # may also be called by Segmentation. + + def read(self, consumer, offset=0, size=None, read_ev=None): + """I am the main entry point, from which FileNode.read() can get + data. I feed the consumer with the desired range of ciphertext. I + return a Deferred that fires (with the consumer) when the read is + finished. + + Note that there is no notion of a 'file pointer': each call to read() + uses an independent offset= value.""" + # for concurrent operations: each gets its own Segmentation manager + if size is None: + size = self._verifycap.size + # clip size so offset+size does not go past EOF + size = min(size, self._verifycap.size-offset) + if read_ev is None: + read_ev = self._download_status.add_read_event(offset, size, now()) + + lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)", + si=base32.b2a(self._verifycap.storage_index)[:8], + offset=offset, size=size, + level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww") + if self._history: + sp = self._history.stats_provider + sp.count("downloader.files_downloaded", 1) # really read() calls + sp.count("downloader.bytes_downloaded", size) + s = Segmentation(self, offset, size, consumer, read_ev, lp) + # this raises an interesting question: what segments to fetch? if + # offset=0, always fetch the first segment, and then allow + # Segmentation to be responsible for pulling the subsequent ones if + # the first wasn't large enough. If offset>0, we're going to need an + # extra roundtrip to get the UEB (and therefore the segment size) + # before we can figure out which segment to get. TODO: allow the + # offset-table-guessing code (which starts by guessing the segsize) + # to assist the offset>0 process. + d = s.start() + def _done(res): + read_ev.finished(now()) + return res + d.addBoth(_done) + return d + + def get_segment(self, segnum, logparent=None): + """Begin downloading a segment. I return a tuple (d, c): 'd' is a + Deferred that fires with (offset,data) when the desired segment is + available, and c is an object on which c.cancel() can be called to + disavow interest in the segment (after which 'd' will never fire). + + You probably need to know the segment size before calling this, + unless you want the first few bytes of the file. If you ask for a + segment number which turns out to be too large, the Deferred will + errback with BadSegmentNumberError. + + The Deferred fires with the offset of the first byte of the data + segment, so that you can call get_segment() before knowing the + segment size, and still know which data you received. + + The Deferred can also errback with other fatal problems, such as + NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. + """ + log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", + si=base32.b2a(self._verifycap.storage_index)[:8], + segnum=segnum, + level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") + self._download_status.add_segment_request(segnum, now()) + d = defer.Deferred() + c = Cancel(self._cancel_request) + self._segment_requests.append( (segnum, d, c) ) + self._start_new_segment() + return (d, c) + + def get_segsize(self): + """Return a Deferred that fires when we know the real segment size.""" + if self.segment_size: + return defer.succeed(self.segment_size) + # TODO: this downloads (and discards) the first segment of the file. + # We could make this more efficient by writing + # fetcher.SegmentSizeFetcher, with the job of finding a single valid + # share and extracting the UEB. We'd add Share.get_UEB() to request + # just the UEB. + (d,c) = self.get_segment(0) + # this ensures that an error during get_segment() will errback the + # caller, so Repair won't wait forever on completely missing files + d.addCallback(lambda ign: self._segsize_observers.when_fired()) + return d + + # things called by the Segmentation object used to transform + # arbitrary-sized read() calls into quantized segment fetches + + def _start_new_segment(self): + if self._active_segment is None and self._segment_requests: + segnum = self._segment_requests[0][0] + k = self._verifycap.needed_shares + log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", + node=repr(self), segnum=segnum, + level=log.NOISY, umid="wAlnHQ") + self._active_segment = fetcher = SegmentFetcher(self, segnum, k) + active_shares = [s for s in self._shares if s.is_alive()] + fetcher.add_shares(active_shares) # this triggers the loop + + + # called by our child ShareFinder + def got_shares(self, shares): + self._shares.update(shares) + if self._active_segment: + self._active_segment.add_shares(shares) + def no_more_shares(self): + self._no_more_shares = True + if self._active_segment: + self._active_segment.no_more_shares() + + # things called by our Share instances + + def validate_and_store_UEB(self, UEB_s): + log.msg("validate_and_store_UEB", + level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw") + h = hashutil.uri_extension_hash(UEB_s) + if h != self._verifycap.uri_extension_hash: + raise BadHashError + UEB_dict = uri.unpack_extension(UEB_s) + self._parse_and_store_UEB(UEB_dict) # sets self._stuff + # TODO: a malformed (but authentic) UEB could throw an assertion in + # _parse_and_store_UEB, and we should abandon the download. + self.have_UEB = True + + def _parse_and_store_UEB(self, d): + # Note: the UEB contains needed_shares and total_shares. These are + # redundant and inferior (the filecap contains the authoritative + # values). However, because it is possible to encode the same file in + # multiple ways, and the encoders might choose (poorly) to use the + # same key for both (therefore getting the same SI), we might + # encounter shares for both types. The UEB hashes will be different, + # however, and we'll disregard the "other" encoding's shares as + # corrupted. + + # therefore, we ignore d['total_shares'] and d['needed_shares']. + + log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", + ueb=repr(d), vcap=self._verifycap.to_string(), + level=log.NOISY, parent=self._lp, umid="cVqZnA") + + k, N = self._verifycap.needed_shares, self._verifycap.total_shares + + self.segment_size = d['segment_size'] + self._segsize_observers.fire(self.segment_size) + + r = self._calculate_sizes(self.segment_size) + self.tail_segment_size = r["tail_segment_size"] + self.tail_segment_padded = r["tail_segment_padded"] + self.num_segments = r["num_segments"] + self.block_size = r["block_size"] + self.tail_block_size = r["tail_block_size"] + log.msg("actual sizes: %s" % (r,), + level=log.NOISY, parent=self._lp, umid="PY6P5Q") + if (self.segment_size == self.guessed_segment_size + and self.num_segments == self.guessed_num_segments): + log.msg("my guess was right!", + level=log.NOISY, parent=self._lp, umid="x340Ow") + else: + log.msg("my guess was wrong! Extra round trips for me.", + level=log.NOISY, parent=self._lp, umid="tb7RJw") + + # zfec.Decode() instantiation is fast, but still, let's use the same + # codec instance for all but the last segment. 3-of-10 takes 15us on + # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is + # 2.5ms, worst-case 254-of-255 is 9.3ms + self._codec = CRSDecoder() + self._codec.set_params(self.segment_size, k, N) + + + # Ciphertext hash tree root is mandatory, so that there is at most + # one ciphertext that matches this read-cap or verify-cap. The + # integrity check on the shares is not sufficient to prevent the + # original encoder from creating some shares of file A and other + # shares of file B. self.ciphertext_hash_tree was a guess before: + # this is where we create it for real. + self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) + self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) + + self.share_hash_tree.set_hashes({0: d['share_root_hash']}) + + # Our job is a fast download, not verification, so we ignore any + # redundant fields. The Verifier uses a different code path which + # does not ignore them. + + def _calculate_sizes(self, segment_size): + # segments of ciphertext + size = self._verifycap.size + k = self._verifycap.needed_shares + + # this assert matches the one in encode.py:127 inside + # Encoded._got_all_encoding_parameters, where the UEB is constructed + assert segment_size % k == 0 + + # the last segment is usually short. We don't store a whole segsize, + # but we do pad the segment up to a multiple of k, because the + # encoder requires that. + tail_segment_size = size % segment_size + if tail_segment_size == 0: + tail_segment_size = segment_size + padded = mathutil.next_multiple(tail_segment_size, k) + tail_segment_padded = padded + + num_segments = mathutil.div_ceil(size, segment_size) + + # each segment is turned into N blocks. All but the last are of size + # block_size, and the last is of size tail_block_size + block_size = segment_size / k + tail_block_size = tail_segment_padded / k + + return { "tail_segment_size": tail_segment_size, + "tail_segment_padded": tail_segment_padded, + "num_segments": num_segments, + "block_size": block_size, + "tail_block_size": tail_block_size, + } + + + def process_share_hashes(self, share_hashes): + for hashnum in share_hashes: + if hashnum >= len(self.share_hash_tree): + # "BadHashError" is normally for e.g. a corrupt block. We + # sort of abuse it here to mean a badly numbered hash (which + # indicates corruption in the number bytes, rather than in + # the data bytes). + raise BadHashError("hashnum %d doesn't fit in hashtree(%d)" + % (hashnum, len(self.share_hash_tree))) + self.share_hash_tree.set_hashes(share_hashes) + + def get_needed_ciphertext_hashes(self, segnum): + cht = self.ciphertext_hash_tree + return cht.needed_hashes(segnum, include_leaf=True) + def process_ciphertext_hashes(self, hashes): + assert self.num_segments is not None + # this may raise BadHashError or NotEnoughHashesError + self.ciphertext_hash_tree.set_hashes(hashes) + + + # called by our child SegmentFetcher + + def want_more_shares(self): + self._sharefinder.hungry() + + def fetch_failed(self, sf, f): + assert sf is self._active_segment + self._active_segment = None + # deliver error upwards + for (d,c) in self._extract_requests(sf.segnum): + eventually(self._deliver, d, c, f) + + def process_blocks(self, segnum, blocks): + d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) + d.addCallback(self._check_ciphertext_hash, segnum) + def _deliver(result): + ds = self._download_status + if isinstance(result, Failure): + ds.add_segment_error(segnum, now()) + else: + (offset, segment, decodetime) = result + ds.add_segment_delivery(segnum, now(), + offset, len(segment), decodetime) + log.msg(format="delivering segment(%(segnum)d)", + segnum=segnum, + level=log.OPERATIONAL, parent=self._lp, + umid="j60Ojg") + for (d,c) in self._extract_requests(segnum): + eventually(self._deliver, d, c, result) + self._active_segment = None + self._start_new_segment() + d.addBoth(_deliver) + d.addErrback(lambda f: + log.err("unhandled error during process_blocks", + failure=f, level=log.WEIRD, + parent=self._lp, umid="MkEsCg")) + + def _decode_blocks(self, segnum, blocks): + tail = (segnum == self.num_segments-1) + codec = self._codec + block_size = self.block_size + decoded_size = self.segment_size + if tail: + # account for the padding in the last segment + codec = CRSDecoder() + k, N = self._verifycap.needed_shares, self._verifycap.total_shares + codec.set_params(self.tail_segment_padded, k, N) + block_size = self.tail_block_size + decoded_size = self.tail_segment_padded + + shares = [] + shareids = [] + for (shareid, share) in blocks.iteritems(): + assert len(share) == block_size + shareids.append(shareid) + shares.append(share) + del blocks + + start = now() + d = codec.decode(shares, shareids) # segment + del shares + def _process(buffers): + decodetime = now() - start + segment = "".join(buffers) + assert len(segment) == decoded_size + del buffers + if tail: + segment = segment[:self.tail_segment_size] + return (segment, decodetime) + d.addCallback(_process) + return d + + def _check_ciphertext_hash(self, (segment, decodetime), segnum): + assert self._active_segment.segnum == segnum + assert self.segment_size is not None + offset = segnum * self.segment_size + + h = hashutil.crypttext_segment_hash(segment) + try: + self.ciphertext_hash_tree.set_hashes(leaves={segnum: h}) + return (offset, segment, decodetime) + except (BadHashError, NotEnoughHashesError): + format = ("hash failure in ciphertext_hash_tree:" + " segnum=%(segnum)d, SI=%(si)s") + log.msg(format=format, segnum=segnum, si=self._si_prefix, + failure=Failure(), + level=log.WEIRD, parent=self._lp, umid="MTwNnw") + # this is especially weird, because we made it past the share + # hash tree. It implies that we're using the wrong encoding, or + # that the uploader deliberately constructed a bad UEB. + msg = format % {"segnum": segnum, "si": self._si_prefix} + raise BadCiphertextHashError(msg) + + def _deliver(self, d, c, result): + # this method exists to handle cancel() that occurs between + # _got_segment and _deliver + if not c.cancelled: + d.callback(result) # might actually be an errback + + def _extract_requests(self, segnum): + """Remove matching requests and return their (d,c) tuples so that the + caller can retire them.""" + retire = [(d,c) for (segnum0, d, c) in self._segment_requests + if segnum0 == segnum] + self._segment_requests = [t for t in self._segment_requests + if t[0] != segnum] + return retire + + def _cancel_request(self, c): + self._segment_requests = [t for t in self._segment_requests + if t[2] != c] + segnums = [segnum for (segnum,d,c) in self._segment_requests] + if self._active_segment.segnum not in segnums: + self._active_segment.stop() + self._active_segment = None + self._start_new_segment() diff --git a/src/allmydata/immutable/downloader/segmentation.py b/src/allmydata/immutable/downloader/segmentation.py new file mode 100644 index 0000000..4890195 --- /dev/null +++ b/src/allmydata/immutable/downloader/segmentation.py @@ -0,0 +1,160 @@ + +import time +now = time.time +from zope.interface import implements +from twisted.internet import defer +from twisted.internet.interfaces import IPushProducer +from foolscap.api import eventually +from allmydata.util import log +from allmydata.util.spans import overlap + +from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped + +class Segmentation: + """I am responsible for a single offset+size read of the file. I handle + segmentation: I figure out which segments are necessary, request them + (from my CiphertextDownloader) in order, and trim the segments down to + match the offset+size span. I use the Producer/Consumer interface to only + request one segment at a time. + """ + implements(IPushProducer) + def __init__(self, node, offset, size, consumer, read_ev, logparent=None): + self._node = node + self._hungry = True + self._active_segnum = None + self._cancel_segment_request = None + # these are updated as we deliver data. At any given time, we still + # want to download file[offset:offset+size] + self._offset = offset + self._size = size + assert offset+size <= node._verifycap.size + self._consumer = consumer + self._read_ev = read_ev + self._start_pause = None + self._lp = logparent + + def start(self): + self._alive = True + self._deferred = defer.Deferred() + self._consumer.registerProducer(self, True) + self._maybe_fetch_next() + return self._deferred + + def _maybe_fetch_next(self): + if not self._alive or not self._hungry: + return + if self._active_segnum is not None: + return + self._fetch_next() + + def _fetch_next(self): + if self._size == 0: + # done! + self._alive = False + self._hungry = False + self._consumer.unregisterProducer() + self._deferred.callback(self._consumer) + return + n = self._node + have_actual_segment_size = n.segment_size is not None + guess_s = "" + if not have_actual_segment_size: + guess_s = "probably " + segment_size = n.segment_size or n.guessed_segment_size + if self._offset == 0: + # great! we want segment0 for sure + wanted_segnum = 0 + else: + # this might be a guess + wanted_segnum = self._offset // segment_size + log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d", + offset=self._offset, guess=guess_s, segnum=wanted_segnum, + level=log.NOISY, parent=self._lp, umid="5WfN0w") + self._active_segnum = wanted_segnum + d,c = n.get_segment(wanted_segnum, self._lp) + self._cancel_segment_request = c + d.addBoth(self._request_retired) + d.addCallback(self._got_segment, wanted_segnum) + if not have_actual_segment_size: + # we can retry once + d.addErrback(self._retry_bad_segment) + d.addErrback(self._error) + + def _request_retired(self, res): + self._active_segnum = None + self._cancel_segment_request = None + return res + + def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum): + self._cancel_segment_request = None + # we got file[segment_start:segment_start+len(segment)] + # we want file[self._offset:self._offset+self._size] + log.msg(format="Segmentation got data:" + " want [%(wantstart)d-%(wantend)d)," + " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d", + wantstart=self._offset, wantend=self._offset+self._size, + segstart=segment_start, segend=segment_start+len(segment), + segnum=wanted_segnum, + level=log.OPERATIONAL, parent=self._lp, umid="32dHcg") + + o = overlap(segment_start, len(segment), self._offset, self._size) + # the overlap is file[o[0]:o[0]+o[1]] + if not o or o[0] != self._offset: + # we didn't get the first byte, so we can't use this segment + log.msg("Segmentation handed wrong data:" + " want [%d-%d), given [%d-%d), for segnum=%d," + " for si=%s" + % (self._offset, self._offset+self._size, + segment_start, segment_start+len(segment), + wanted_segnum, self._node._si_prefix), + level=log.UNUSUAL, parent=self._lp, umid="STlIiA") + # we may retry if the segnum we asked was based on a guess + raise WrongSegmentError("I was given the wrong data.") + offset_in_segment = self._offset - segment_start + desired_data = segment[offset_in_segment:offset_in_segment+o[1]] + + self._offset += len(desired_data) + self._size -= len(desired_data) + self._consumer.write(desired_data) + # the consumer might call our .pauseProducing() inside that write() + # call, setting self._hungry=False + self._read_ev.update(len(desired_data), 0, 0) + self._maybe_fetch_next() + + def _retry_bad_segment(self, f): + f.trap(WrongSegmentError, BadSegmentNumberError) + # we guessed the segnum wrong: either one that doesn't overlap with + # the start of our desired region, or one that's beyond the end of + # the world. Now that we have the right information, we're allowed to + # retry once. + assert self._node.segment_size is not None + return self._maybe_fetch_next() + + def _error(self, f): + log.msg("Error in Segmentation", failure=f, + level=log.WEIRD, parent=self._lp, umid="EYlXBg") + self._alive = False + self._hungry = False + self._consumer.unregisterProducer() + self._deferred.errback(f) + + def stopProducing(self): + self._hungry = False + self._alive = False + # cancel any outstanding segment request + if self._cancel_segment_request: + self._cancel_segment_request.cancel() + self._cancel_segment_request = None + e = DownloadStopped("our Consumer called stopProducing()") + self._deferred.errback(e) + + def pauseProducing(self): + self._hungry = False + self._start_pause = now() + def resumeProducing(self): + self._hungry = True + eventually(self._maybe_fetch_next) + if self._start_pause is not None: + paused = now() - self._start_pause + self._read_ev.update(0, 0, paused) + self._start_pause = None diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py new file mode 100644 index 0000000..e3c9017 --- /dev/null +++ b/src/allmydata/immutable/downloader/share.py @@ -0,0 +1,848 @@ + +import struct +import time +now = time.time + +from twisted.python.failure import Failure +from foolscap.api import eventually +from allmydata.util import base32, log, hashutil, mathutil +from allmydata.util.spans import Spans, DataSpans +from allmydata.interfaces import HASH_SIZE +from allmydata.hashtree import IncompleteHashTree, BadHashError, \ + NotEnoughHashesError + +from allmydata.immutable.layout import make_write_bucket_proxy +from allmydata.util.observer import EventStreamObserver +from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM + + +class LayoutInvalid(Exception): + pass +class DataUnavailable(Exception): + pass + +class Share: + """I represent a single instance of a single share (e.g. I reference the + shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). + I am associated with a CommonShare that remembers data that is held in + common among e.g. SI=abcde/shnum2 across all servers. I am also + associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all + servers). + """ + # this is a specific implementation of IShare for tahoe's native storage + # servers. A different backend would use a different class. + + def __init__(self, rref, server_version, verifycap, commonshare, node, + download_status, peerid, shnum, logparent): + self._rref = rref + self._server_version = server_version + self._node = node # holds share_hash_tree and UEB + self.actual_segment_size = node.segment_size # might still be None + # XXX change node.guessed_segment_size to + # node.best_guess_segment_size(), which should give us the real ones + # if known, else its guess. + self._guess_offsets(verifycap, node.guessed_segment_size) + self.actual_offsets = None + self._UEB_length = None + self._commonshare = commonshare # holds block_hash_tree + self._download_status = download_status + self._peerid = peerid + self._peerid_s = base32.b2a(peerid)[:5] + self._storage_index = verifycap.storage_index + self._si_prefix = base32.b2a(verifycap.storage_index)[:8] + self._shnum = shnum + # self._alive becomes False upon fatal corruption or server error + self._alive = True + self._lp = log.msg(format="%(share)s created", share=repr(self), + level=log.NOISY, parent=logparent, umid="P7hv2w") + + self._pending = Spans() # request sent but no response received yet + self._received = DataSpans() # ACK response received, with data + self._unavailable = Spans() # NAK response received, no data + + # any given byte of the share can be in one of four states: + # in: _wanted, _requested, _received + # FALSE FALSE FALSE : don't care about it at all + # TRUE FALSE FALSE : want it, haven't yet asked for it + # TRUE TRUE FALSE : request is in-flight + # or didn't get it + # FALSE TRUE TRUE : got it, haven't used it yet + # FALSE TRUE FALSE : got it and used it + # FALSE FALSE FALSE : block consumed, ready to ask again + # + # when we request data and get a NAK, we leave it in _requested + # to remind ourself to not ask for it again. We don't explicitly + # remove it from anything (maybe this should change). + # + # We retain the hashtrees in the Node, so we leave those spans in + # _requested (and never ask for them again, as long as the Node is + # alive). But we don't retain data blocks (too big), so when we + # consume a data block, we remove it from _requested, so a later + # download can re-fetch it. + + self._requested_blocks = [] # (segnum, set(observer2..)) + ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"] + self._overrun_ok = ver["tolerates-immutable-read-overrun"] + # If _overrun_ok and we guess the offsets correctly, we can get + # everything in one RTT. If _overrun_ok and we guess wrong, we might + # need two RTT (but we could get lucky and do it in one). If overrun + # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version, + # 2=offset table, 3=UEB_length and everything else (hashes, block), + # 4=UEB. + + self.had_corruption = False # for unit tests + + def __repr__(self): + return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s) + + def is_alive(self): + # XXX: reconsider. If the share sees a single error, should it remain + # dead for all time? Or should the next segment try again? This DEAD + # state is stored elsewhere too (SegmentFetcher per-share states?) + # and needs to be consistent. We clear _alive in self._fail(), which + # is called upon a network error, or layout failure, or hash failure + # in the UEB or a hash tree. We do not _fail() for a hash failure in + # a block, but of course we still tell our callers about + # state=CORRUPT so they'll find a different share. + return self._alive + + def _guess_offsets(self, verifycap, guessed_segment_size): + self.guessed_segment_size = guessed_segment_size + size = verifycap.size + k = verifycap.needed_shares + N = verifycap.total_shares + r = self._node._calculate_sizes(guessed_segment_size) + # num_segments, block_size/tail_block_size + # guessed_segment_size/tail_segment_size/tail_segment_padded + share_size = mathutil.div_ceil(size, k) + # share_size is the amount of block data that will be put into each + # share, summed over all segments. It does not include hashes, the + # UEB, or other overhead. + + # use the upload-side code to get this as accurate as possible + ht = IncompleteHashTree(N) + num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) + wbp = make_write_bucket_proxy(None, share_size, r["block_size"], + r["num_segments"], num_share_hashes, 0, + None) + self._fieldsize = wbp.fieldsize + self._fieldstruct = wbp.fieldstruct + self.guessed_offsets = wbp._offsets + + # called by our client, the SegmentFetcher + def get_block(self, segnum): + """Add a block number to the list of requests. This will eventually + result in a fetch of the data necessary to validate the block, then + the block itself. The fetch order is generally + first-come-first-served, but requests may be answered out-of-order if + data becomes available sooner. + + I return an EventStreamObserver, which has two uses. The first is to + call o.subscribe(), which gives me a place to send state changes and + eventually the data block. The second is o.cancel(), which removes + the request (if it is still active). + + I will distribute the following events through my EventStreamObserver: + - state=OVERDUE: ?? I believe I should have had an answer by now. + You may want to ask another share instead. + - state=BADSEGNUM: the segnum you asked for is too large. I must + fetch a valid UEB before I can determine this, + so the notification is asynchronous + - state=COMPLETE, block=data: here is a valid block + - state=CORRUPT: this share contains corrupted data + - state=DEAD, f=Failure: the server reported an error, this share + is unusable + """ + log.msg("%s.get_block(%d)" % (repr(self), segnum), + level=log.NOISY, parent=self._lp, umid="RTo9MQ") + assert segnum >= 0 + o = EventStreamObserver() + o.set_canceler(self, "_cancel_block_request") + for i,(segnum0,observers) in enumerate(self._requested_blocks): + if segnum0 == segnum: + observers.add(o) + break + else: + self._requested_blocks.append( (segnum, set([o])) ) + eventually(self.loop) + return o + + def _cancel_block_request(self, o): + new_requests = [] + for e in self._requested_blocks: + (segnum0, observers) = e + observers.discard(o) + if observers: + new_requests.append(e) + self._requested_blocks = new_requests + + # internal methods + def _active_segnum_and_observers(self): + if self._requested_blocks: + # we only retrieve information for one segment at a time, to + # minimize alacrity (first come, first served) + return self._requested_blocks[0] + return None, [] + + def loop(self): + try: + # if any exceptions occur here, kill the download + log.msg("%s.loop, reqs=[%s], pending=%s, received=%s," + " unavailable=%s" % + (repr(self), + ",".join([str(req[0]) for req in self._requested_blocks]), + self._pending.dump(), self._received.dump(), + self._unavailable.dump() ), + level=log.NOISY, parent=self._lp, umid="BaL1zw") + self._do_loop() + # all exception cases call self._fail(), which clears self._alive + except (BadHashError, NotEnoughHashesError, LayoutInvalid), e: + # Abandon this share. We do this if we see corruption in the + # offset table, the UEB, or a hash tree. We don't abandon the + # whole share if we see corruption in a data block (we abandon + # just the one block, and still try to get data from other blocks + # on the same server). In theory, we could get good data from a + # share with a corrupt UEB (by first getting the UEB from some + # other share), or corrupt hash trees, but the logic to decide + # when this is safe is non-trivial. So for now, give up at the + # first sign of corruption. + # + # _satisfy_*() code which detects corruption should first call + # self._signal_corruption(), and then raise the exception. + log.msg(format="corruption detected in %(share)s", + share=repr(self), + level=log.UNUSUAL, parent=self._lp, umid="gWspVw") + self._fail(Failure(e), log.UNUSUAL) + except DataUnavailable, e: + # Abandon this share. + log.msg(format="need data that will never be available" + " from %s: pending=%s, received=%s, unavailable=%s" % + (repr(self), + self._pending.dump(), self._received.dump(), + self._unavailable.dump() ), + level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ") + self._fail(Failure(e), log.UNUSUAL) + except BaseException: + self._fail(Failure()) + raise + log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s," + " unavailable=%s" % + (repr(self), + ",".join([str(req[0]) for req in self._requested_blocks]), + self._pending.dump(), self._received.dump(), + self._unavailable.dump() ), + level=log.NOISY, parent=self._lp, umid="9lRaRA") + + def _do_loop(self): + # we are (eventually) called after all state transitions: + # new segments added to self._requested_blocks + # new data received from servers (responses to our read() calls) + # impatience timer fires (server appears slow) + if not self._alive: + return + + # First, consume all of the information that we currently have, for + # all the segments people currently want. + while self._get_satisfaction(): + pass + + # When we get no satisfaction (from the data we've received so far), + # we determine what data we desire (to satisfy more requests). The + # number of segments is finite, so I can't get no satisfaction + # forever. + wanted, needed = self._desire() + + # Finally, send out requests for whatever we need (desire minus + # have). You can't always get what you want, but if you try + # sometimes, you just might find, you get what you need. + self._send_requests(wanted + needed) + + # and sometimes you can't even get what you need + disappointment = needed & self._unavailable + if len(disappointment): + self.had_corruption = True + raise DataUnavailable("need %s but will never get it" % + disappointment.dump()) + + def _get_satisfaction(self): + # return True if we retired a data block, and should therefore be + # called again. Return False if we don't retire a data block (even if + # we do retire some other data, like hash chains). + + if self.actual_offsets is None: + if not self._satisfy_offsets(): + # can't even look at anything without the offset table + return False + + if not self._node.have_UEB: + if not self._satisfy_UEB(): + # can't check any hashes without the UEB + return False + self.actual_segment_size = self._node.segment_size # might be updated + assert self.actual_segment_size is not None + + # knowing the UEB means knowing num_segments. Despite the redundancy, + # this is the best place to set this. CommonShare.set_numsegs will + # ignore duplicate calls. + assert self._node.num_segments is not None + cs = self._commonshare + cs.set_numsegs(self._node.num_segments) + + segnum, observers = self._active_segnum_and_observers() + # if segnum is None, we don't really need to do anything (we have no + # outstanding readers right now), but we'll fill in the bits that + # aren't tied to any particular segment. + + if segnum is not None and segnum >= self._node.num_segments: + for o in observers: + o.notify(state=BADSEGNUM) + self._requested_blocks.pop(0) + return True + + if self._node.share_hash_tree.needed_hashes(self._shnum): + if not self._satisfy_share_hash_tree(): + # can't check block_hash_tree without a root + return False + + if cs.need_block_hash_root(): + block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) + cs.set_block_hash_root(block_hash_root) + + if segnum is None: + return False # we don't want any particular segment right now + + # block_hash_tree + needed_hashes = self._commonshare.get_needed_block_hashes(segnum) + if needed_hashes: + if not self._satisfy_block_hash_tree(needed_hashes): + # can't check block without block_hash_tree + return False + + # ciphertext_hash_tree + needed_hashes = self._node.get_needed_ciphertext_hashes(segnum) + if needed_hashes: + if not self._satisfy_ciphertext_hash_tree(needed_hashes): + # can't check decoded blocks without ciphertext_hash_tree + return False + + # data blocks + return self._satisfy_data_block(segnum, observers) + + def _satisfy_offsets(self): + version_s = self._received.get(0, 4) + if version_s is None: + return False + (version,) = struct.unpack(">L", version_s) + if version == 1: + table_start = 0x0c + self._fieldsize = 0x4 + self._fieldstruct = "L" + elif version == 2: + table_start = 0x14 + self._fieldsize = 0x8 + self._fieldstruct = "Q" + else: + self.had_corruption = True + raise LayoutInvalid("unknown version %d (I understand 1 and 2)" + % version) + offset_table_size = 6 * self._fieldsize + table_s = self._received.pop(table_start, offset_table_size) + if table_s is None: + return False + fields = struct.unpack(">"+6*self._fieldstruct, table_s) + offsets = {} + for i,field in enumerate(['data', + 'plaintext_hash_tree', # UNUSED + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension', + ] ): + offsets[field] = fields[i] + self.actual_offsets = offsets + log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) + self._received.remove(0, 4) # don't need this anymore + + # validate the offsets a bit + share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"] + if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0: + # the share hash chain is stored as (hashnum,hash) pairs + self.had_corruption = True + raise LayoutInvalid("share hashes malformed -- should be a" + " multiple of %d bytes -- not %d" % + (2+HASH_SIZE, share_hashes_size)) + block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"] + if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0: + # the block hash tree is stored as a list of hashes + self.had_corruption = True + raise LayoutInvalid("block hashes malformed -- should be a" + " multiple of %d bytes -- not %d" % + (HASH_SIZE, block_hashes_size)) + # we only look at 'crypttext_hash_tree' if the UEB says we're + # actually using it. Same with 'plaintext_hash_tree'. This gives us + # some wiggle room: a place to stash data for later extensions. + + return True + + def _satisfy_UEB(self): + o = self.actual_offsets + fsize = self._fieldsize + UEB_length_s = self._received.get(o["uri_extension"], fsize) + if not UEB_length_s: + return False + (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) + UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length) + if not UEB_s: + return False + self._received.remove(o["uri_extension"], fsize) + try: + self._node.validate_and_store_UEB(UEB_s) + return True + except (LayoutInvalid, BadHashError), e: + # TODO: if this UEB was bad, we'll keep trying to validate it + # over and over again. Only log.err on the first one, or better + # yet skip all but the first + f = Failure(e) + self._signal_corruption(f, o["uri_extension"], fsize+UEB_length) + self.had_corruption = True + raise + + def _satisfy_share_hash_tree(self): + # the share hash chain is stored as (hashnum,hash) tuples, so you + # can't fetch just the pieces you need, because you don't know + # exactly where they are. So fetch everything, and parse the results + # later. + o = self.actual_offsets + hashlen = o["uri_extension"] - o["share_hashes"] + assert hashlen % (2+HASH_SIZE) == 0 + hashdata = self._received.get(o["share_hashes"], hashlen) + if not hashdata: + return False + share_hashes = {} + for i in range(0, hashlen, 2+HASH_SIZE): + (hashnum,) = struct.unpack(">H", hashdata[i:i+2]) + hashvalue = hashdata[i+2:i+2+HASH_SIZE] + share_hashes[hashnum] = hashvalue + # TODO: if they give us an empty set of hashes, + # process_share_hashes() won't fail. We must ensure that this + # situation doesn't allow unverified shares through. Manual testing + # shows that set_block_hash_root() throws an assert because an + # internal node is None instead of an actual hash, but we want + # something better. It's probably best to add a method to + # IncompleteHashTree which takes a leaf number and raises an + # exception unless that leaf is present and fully validated. + try: + self._node.process_share_hashes(share_hashes) + # adds to self._node.share_hash_tree + except (BadHashError, NotEnoughHashesError), e: + f = Failure(e) + self._signal_corruption(f, o["share_hashes"], hashlen) + self.had_corruption = True + raise + self._received.remove(o["share_hashes"], hashlen) + return True + + def _signal_corruption(self, f, start, offset): + # there was corruption somewhere in the given range + reason = "corruption in share[%d-%d): %s" % (start, start+offset, + str(f.value)) + self._rref.callRemoteOnly("advise_corrupt_share", reason) + + def _satisfy_block_hash_tree(self, needed_hashes): + o_bh = self.actual_offsets["block_hashes"] + block_hashes = {} + for hashnum in needed_hashes: + hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE) + if hashdata: + block_hashes[hashnum] = hashdata + else: + return False # missing some hashes + # note that we don't submit any hashes to the block_hash_tree until + # we've gotten them all, because the hash tree will throw an + # exception if we only give it a partial set (which it therefore + # cannot validate) + try: + self._commonshare.process_block_hashes(block_hashes) + except (BadHashError, NotEnoughHashesError), e: + f = Failure(e) + hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())]) + log.msg(format="hash failure in block_hashes=(%(hashnums)s)," + " from %(share)s", + hashnums=hashnums, shnum=self._shnum, share=repr(self), + failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA") + hsize = max(0, max(needed_hashes)) * HASH_SIZE + self._signal_corruption(f, o_bh, hsize) + self.had_corruption = True + raise + for hashnum in needed_hashes: + self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE) + return True + + def _satisfy_ciphertext_hash_tree(self, needed_hashes): + start = self.actual_offsets["crypttext_hash_tree"] + hashes = {} + for hashnum in needed_hashes: + hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE) + if hashdata: + hashes[hashnum] = hashdata + else: + return False # missing some hashes + # we don't submit any hashes to the ciphertext_hash_tree until we've + # gotten them all + try: + self._node.process_ciphertext_hashes(hashes) + except (BadHashError, NotEnoughHashesError), e: + f = Failure(e) + hashnums = ",".join([str(n) for n in sorted(hashes.keys())]) + log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s)," + " from %(share)s", + hashnums=hashnums, share=repr(self), failure=f, + level=log.WEIRD, parent=self._lp, umid="iZI0TA") + hsize = max(0, max(needed_hashes))*HASH_SIZE + self._signal_corruption(f, start, hsize) + self.had_corruption = True + raise + for hashnum in needed_hashes: + self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE) + return True + + def _satisfy_data_block(self, segnum, observers): + tail = (segnum == self._node.num_segments-1) + datastart = self.actual_offsets["data"] + blockstart = datastart + segnum * self._node.block_size + blocklen = self._node.block_size + if tail: + blocklen = self._node.tail_block_size + + block = self._received.pop(blockstart, blocklen) + if not block: + log.msg("no data for block %s (want [%d:+%d])" % (repr(self), + blockstart, blocklen)) + return False + log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]", + share=repr(self), start=blockstart, length=blocklen, + level=log.NOISY, parent=self._lp, umid="uTDNZg") + # this block is being retired, either as COMPLETE or CORRUPT, since + # no further data reads will help + assert self._requested_blocks[0][0] == segnum + try: + self._commonshare.check_block(segnum, block) + # hurrah, we have a valid block. Deliver it. + for o in observers: + # goes to SegmentFetcher._block_request_activity + o.notify(state=COMPLETE, block=block) + except (BadHashError, NotEnoughHashesError), e: + # rats, we have a corrupt block. Notify our clients that they + # need to look elsewhere, and advise the server. Unlike + # corruption in other parts of the share, this doesn't cause us + # to abandon the whole share. + f = Failure(e) + log.msg(format="hash failure in block %(segnum)d, from %(share)s", + segnum=segnum, share=repr(self), failure=f, + level=log.WEIRD, parent=self._lp, umid="mZjkqA") + for o in observers: + o.notify(state=CORRUPT) + self._signal_corruption(f, blockstart, blocklen) + self.had_corruption = True + # in either case, we've retired this block + self._requested_blocks.pop(0) + # popping the request keeps us from turning around and wanting the + # block again right away + return True # got satisfaction + + def _desire(self): + segnum, observers = self._active_segnum_and_observers() # maybe None + + # 'want_it' is for data we merely want: we know that we don't really + # need it. This includes speculative reads, like the first 1KB of the + # share (for the offset table) and the first 2KB of the UEB. + # + # 'need_it' is for data that, if we have the real offset table, we'll + # need. If we are only guessing at the offset table, it's merely + # wanted. (The share is abandoned if we can't get data that we really + # need). + # + # 'gotta_gotta_have_it' is for data that we absolutely need, + # independent of whether we're still guessing about the offset table: + # the version number and the offset table itself. + # + # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww.. + + desire = Spans(), Spans(), Spans() + (want_it, need_it, gotta_gotta_have_it) = desire + + self.actual_segment_size = self._node.segment_size # might be updated + o = self.actual_offsets or self.guessed_offsets + segsize = self.actual_segment_size or self.guessed_segment_size + r = self._node._calculate_sizes(segsize) + + if not self.actual_offsets: + # all _desire functions add bits to the three desire[] spans + self._desire_offsets(desire) + + # we can use guessed offsets as long as this server tolerates + # overrun. Otherwise, we must wait for the offsets to arrive before + # we try to read anything else. + if self.actual_offsets or self._overrun_ok: + if not self._node.have_UEB: + self._desire_UEB(desire, o) + # They might ask for a segment that doesn't look right. + # _satisfy() will catch+reject bad segnums once we know the UEB + # (and therefore segsize and numsegs), so we'll only fail this + # test if we're still guessing. We want to avoid asking the + # hashtrees for needed_hashes() for bad segnums. So don't enter + # _desire_hashes or _desire_data unless the segnum looks + # reasonable. + if segnum < r["num_segments"]: + # XXX somehow we're getting here for sh5. we don't yet know + # the actual_segment_size, we're still working off the guess. + # the ciphertext_hash_tree has been corrected, but the + # commonshare._block_hash_tree is still in the guessed state. + self._desire_share_hashes(desire, o) + if segnum is not None: + self._desire_block_hashes(desire, o, segnum) + self._desire_data(desire, o, r, segnum, segsize) + else: + log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)" + % (segnum, r["num_segments"]), + level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") + + log.msg("end _desire: want_it=%s need_it=%s gotta=%s" + % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump())) + if self.actual_offsets: + return (want_it, need_it+gotta_gotta_have_it) + else: + return (want_it+need_it, gotta_gotta_have_it) + + def _desire_offsets(self, desire): + (want_it, need_it, gotta_gotta_have_it) = desire + if self._overrun_ok: + # easy! this includes version number, sizes, and offsets + want_it.add(0, 1024) + return + + # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44). + # To be conservative, only request the data that we know lives there, + # even if that means more roundtrips. + + gotta_gotta_have_it.add(0, 4) # version number, always safe + version_s = self._received.get(0, 4) + if not version_s: + return + (version,) = struct.unpack(">L", version_s) + # The code in _satisfy_offsets will have checked this version + # already. There is no code path to get this far with version>2. + assert 1 <= version <= 2, "can't get here, version=%d" % version + if version == 1: + table_start = 0x0c + fieldsize = 0x4 + elif version == 2: + table_start = 0x14 + fieldsize = 0x8 + offset_table_size = 6 * fieldsize + gotta_gotta_have_it.add(table_start, offset_table_size) + + def _desire_UEB(self, desire, o): + (want_it, need_it, gotta_gotta_have_it) = desire + + # UEB data is stored as (length,data). + if self._overrun_ok: + # We can pre-fetch 2kb, which should probably cover it. If it + # turns out to be larger, we'll come back here later with a known + # length and fetch the rest. + want_it.add(o["uri_extension"], 2048) + # now, while that is probably enough to fetch the whole UEB, it + # might not be, so we need to do the next few steps as well. In + # most cases, the following steps will not actually add anything + # to need_it + + need_it.add(o["uri_extension"], self._fieldsize) + # only use a length if we're sure it's correct, otherwise we'll + # probably fetch a huge number + if not self.actual_offsets: + return + UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize) + if UEB_length_s: + (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) + # we know the length, so make sure we grab everything + need_it.add(o["uri_extension"]+self._fieldsize, UEB_length) + + def _desire_share_hashes(self, desire, o): + (want_it, need_it, gotta_gotta_have_it) = desire + + if self._node.share_hash_tree.needed_hashes(self._shnum): + hashlen = o["uri_extension"] - o["share_hashes"] + need_it.add(o["share_hashes"], hashlen) + + def _desire_block_hashes(self, desire, o, segnum): + (want_it, need_it, gotta_gotta_have_it) = desire + + # block hash chain + for hashnum in self._commonshare.get_needed_block_hashes(segnum): + need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) + + # ciphertext hash chain + for hashnum in self._node.get_needed_ciphertext_hashes(segnum): + need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) + + def _desire_data(self, desire, o, r, segnum, segsize): + (want_it, need_it, gotta_gotta_have_it) = desire + tail = (segnum == r["num_segments"]-1) + datastart = o["data"] + blockstart = datastart + segnum * r["block_size"] + blocklen = r["block_size"] + if tail: + blocklen = r["tail_block_size"] + need_it.add(blockstart, blocklen) + + def _send_requests(self, desired): + ask = desired - self._pending - self._received.get_spans() + log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" % + (repr(self), desired.dump(), self._pending.dump(), ask.dump()), + level=log.NOISY, parent=self._lp, umid="E94CVA") + # XXX At one time, this code distinguished between data blocks and + # hashes, and made sure to send (small) requests for hashes before + # sending (big) requests for blocks. The idea was to make sure that + # all hashes arrive before the blocks, so the blocks can be consumed + # and released in a single turn. I removed this for simplicity. + # Reconsider the removal: maybe bring it back. + ds = self._download_status + + for (start, length) in ask: + # TODO: quantize to reasonably-large blocks + self._pending.add(start, length) + lp = log.msg(format="%(share)s._send_request" + " [%(start)d:+%(length)d]", + share=repr(self), + start=start, length=length, + level=log.NOISY, parent=self._lp, umid="sgVAyA") + req_ev = ds.add_request_sent(self._peerid, self._shnum, + start, length, now()) + d = self._send_request(start, length) + d.addCallback(self._got_data, start, length, req_ev, lp) + d.addErrback(self._got_error, start, length, req_ev, lp) + d.addCallback(self._trigger_loop) + d.addErrback(lambda f: + log.err(format="unhandled error during send_request", + failure=f, parent=self._lp, + level=log.WEIRD, umid="qZu0wg")) + + def _send_request(self, start, length): + return self._rref.callRemote("read", start, length) + + def _got_data(self, data, start, length, req_ev, lp): + req_ev.finished(len(data), now()) + if not self._alive: + return + log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d", + share=repr(self), start=start, length=length, datalen=len(data), + level=log.NOISY, parent=lp, umid="5Qn6VQ") + self._pending.remove(start, length) + self._received.add(start, data) + + # if we ask for [a:c], and we get back [a:b] (banything - if size is None: - bytes_to_read = self.bytes_left - elif self.bytes_left is None: - bytes_to_read = size - else: - bytes_to_read = min(size, self.bytes_left) - data = self.f.read(bytes_to_read) - if self.bytes_left is not None: - self.bytes_left -= len(data) - return data - -class DownloadCache: - implements(IDownloadTarget) - - def __init__(self, filecap, storage_index, downloader, - cachedirectorymanager): - self._downloader = downloader - self._uri = filecap - self._storage_index = storage_index - self.milestones = set() # of (offset,size,Deferred) - self.cachedirectorymanager = cachedirectorymanager - self.cachefile = None - self.download_in_progress = False - # five states: - # new ImmutableFileNode, no downloads ever performed - # new ImmutableFileNode, leftover file (partial) - # new ImmutableFileNode, leftover file (whole) - # download in progress, not yet complete - # download complete - - def when_range_available(self, offset, size): - assert isinstance(offset, (int,long)) - assert isinstance(size, (int,long)) - - d = defer.Deferred() - self.milestones.add( (offset,size,d) ) - self._check_milestones() - if self.milestones and not self.download_in_progress: - self.download_in_progress = True - log.msg(format=("immutable filenode read [%(si)s]: " + - "starting download"), - si=base32.b2a(self._storage_index), - umid="h26Heg", level=log.OPERATIONAL) - d2 = self._downloader.download(self._uri, self) - d2.addBoth(self._download_done) - d2.addErrback(self._download_failed) - d2.addErrback(log.err, umid="cQaM9g") - return d - - def read(self, consumer, offset, size): - assert offset+size <= self.get_filesize() - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - f = PortionOfFile(self.cachefile.get_filename(), offset, size) - d = basic.FileSender().beginFileTransfer(f, consumer) - d.addCallback(lambda lastSent: consumer) - return d - - def _download_done(self, res): - # clear download_in_progress, so failed downloads can be re-tried - self.download_in_progress = False - return res - - def _download_failed(self, f): - # tell anyone who's waiting that we failed - for m in self.milestones: - (offset,size,d) = m - eventually(d.errback, f) - self.milestones.clear() - - def _check_milestones(self): - current_size = self.get_filesize() - for m in list(self.milestones): - (offset,size,d) = m - if offset+size <= current_size: - log.msg(format=("immutable filenode read [%(si)s] " + - "%(offset)d+%(size)d vs %(filesize)d: " + - "done"), - si=base32.b2a(self._storage_index), - offset=offset, size=size, filesize=current_size, - umid="nuedUg", level=log.NOISY) - self.milestones.discard(m) - eventually(d.callback, None) - else: - log.msg(format=("immutable filenode read [%(si)s] " + - "%(offset)d+%(size)d vs %(filesize)d: " + - "still waiting"), - si=base32.b2a(self._storage_index), - offset=offset, size=size, filesize=current_size, - umid="8PKOhg", level=log.NOISY) - - def get_filesize(self): - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - try: - filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE] - except OSError: - filesize = 0 - return filesize - - - def open(self, size): - if not self.cachefile: - self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index)) - self.f = open(self.cachefile.get_filename(), "wb") - - def write(self, data): - self.f.write(data) - self._check_milestones() - - def close(self): - self.f.close() - self._check_milestones() - - def fail(self, why): - pass - def register_canceller(self, cb): - pass - def finish(self): - return None - # The following methods are just because the target might be a - # repairer.DownUpConnector, and just because the current CHKUpload object - # expects to find the storage index and encoding parameters in its - # Uploadable. - def set_storageindex(self, storageindex): - pass - def set_encodingparams(self, encodingparams): - pass +from twisted.internet.interfaces import IConsumer +from allmydata.interfaces import IImmutableFileNode, IUploadResults +from allmydata import uri +from allmydata.check_results import CheckResults, CheckAndRepairResults +from allmydata.util.dictutil import DictOfSets +from pycryptopp.cipher.aes import AES -class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): - def __init__(self, filecap, storage_broker, secret_holder, - downloader, history, cachedirectorymanager): - assert isinstance(filecap, CHKFileURI) - self.u = filecap +# local imports +from allmydata.immutable.checker import Checker +from allmydata.immutable.repairer import Repairer +from allmydata.immutable.downloader.node import DownloadNode +from allmydata.immutable.downloader.status import DownloadStatus + +class CiphertextFileNode: + def __init__(self, verifycap, storage_broker, secret_holder, + terminator, history, download_status=None): + assert isinstance(verifycap, uri.CHKFileVerifierURI) + self._verifycap = verifycap self._storage_broker = storage_broker self._secret_holder = secret_holder - self._downloader = downloader - self._history = history - storage_index = self.get_storage_index() - self.download_cache = DownloadCache(filecap, storage_index, downloader, - cachedirectorymanager) - prefix = self.u.get_verify_cap().to_string() - log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix) - self.log("starting", level=log.OPERATIONAL) + if download_status is None: + ds = DownloadStatus(verifycap.storage_index, verifycap.size) + if history: + history.add_download(ds) + download_status = ds + self._node = DownloadNode(verifycap, storage_broker, secret_holder, + terminator, history, download_status) + + def read(self, consumer, offset=0, size=None, read_ev=None): + """I am the main entry point, from which FileNode.read() can get + data. I feed the consumer with the desired range of ciphertext. I + return a Deferred that fires (with the consumer) when the read is + finished.""" + return self._node.read(consumer, offset, size, read_ev) + + def get_segment(self, segnum): + """Begin downloading a segment. I return a tuple (d, c): 'd' is a + Deferred that fires with (offset,data) when the desired segment is + available, and c is an object on which c.cancel() can be called to + disavow interest in the segment (after which 'd' will never fire). + + You probably need to know the segment size before calling this, + unless you want the first few bytes of the file. If you ask for a + segment number which turns out to be too large, the Deferred will + errback with BadSegmentNumberError. + + The Deferred fires with the offset of the first byte of the data + segment, so that you can call get_segment() before knowing the + segment size, and still know which data you received. + """ + return self._node.get_segment(segnum) + + def get_segment_size(self): + # return a Deferred that fires with the file's real segment size + return self._node.get_segsize() - def get_size(self): - return self.u.get_size() - def get_current_size(self): - return defer.succeed(self.get_size()) - - def get_cap(self): - return self.u - def get_readcap(self): - return self.u.get_readonly() + def get_storage_index(self): + return self._verifycap.storage_index def get_verify_cap(self): - return self.u.get_verify_cap() - def get_repair_cap(self): - # CHK files can be repaired with just the verifycap - return self.u.get_verify_cap() + return self._verifycap + def get_size(self): + return self._verifycap.size - def get_uri(self): - return self.u.to_string() + def raise_error(self): + pass - def get_storage_index(self): - return self.u.get_storage_index() def check_and_repair(self, monitor, verify=False, add_lease=False): - verifycap = self.get_verify_cap() + verifycap = self._verifycap + storage_index = verifycap.storage_index sb = self._storage_broker servers = sb.get_all_servers() sh = self._secret_holder @@ -238,7 +85,7 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): monitor=monitor) d = c.start() def _maybe_repair(cr): - crr = CheckAndRepairResults(self.u.get_storage_index()) + crr = CheckAndRepairResults(storage_index) crr.pre_repair_results = cr if cr.is_healthy(): crr.post_repair_results = cr @@ -248,24 +95,25 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): crr.repair_successful = False # until proven successful def _gather_repair_results(ur): assert IUploadResults.providedBy(ur), ur - # clone the cr -- check results to form the basic of the prr -- post-repair results + # clone the cr (check results) to form the basis of the + # prr (post-repair results) prr = CheckResults(cr.uri, cr.storage_index) prr.data = copy.deepcopy(cr.data) sm = prr.data['sharemap'] - assert isinstance(sm, dictutil.DictOfSets), sm + assert isinstance(sm, DictOfSets), sm sm.update(ur.sharemap) servers_responding = set(prr.data['servers-responding']) servers_responding.union(ur.sharemap.iterkeys()) prr.data['servers-responding'] = list(servers_responding) prr.data['count-shares-good'] = len(sm) prr.data['count-good-share-hosts'] = len(sm) - is_healthy = bool(len(sm) >= self.u.total_shares) - is_recoverable = bool(len(sm) >= self.u.needed_shares) + is_healthy = bool(len(sm) >= verifycap.total_shares) + is_recoverable = bool(len(sm) >= verifycap.needed_shares) prr.set_healthy(is_healthy) prr.set_recoverable(is_recoverable) crr.repair_successful = is_healthy - prr.set_needs_rebalancing(len(sm) >= self.u.total_shares) + prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares) crr.post_repair_results = prr return crr @@ -275,8 +123,8 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): crr.repair_successful = False crr.repair_failure = f return f - r = Repairer(storage_broker=sb, secret_holder=sh, - verifycap=verifycap, monitor=monitor) + r = Repairer(self, storage_broker=sb, secret_holder=sh, + monitor=monitor) d = r.start() d.addCallbacks(_gather_repair_results, _repair_error) return d @@ -285,7 +133,7 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): return d def check(self, monitor, verify=False, add_lease=False): - verifycap = self.get_verify_cap() + verifycap = self._verifycap sb = self._storage_broker servers = sb.get_all_servers() sh = self._secret_holder @@ -295,81 +143,130 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin): monitor=monitor) return v.start() + +class DecryptingConsumer: + """I sit between a CiphertextDownloader (which acts as a Producer) and + the real Consumer, decrypting everything that passes by. The real + Consumer sees the real Producer, but the Producer sees us instead of the + real consumer.""" + implements(IConsumer) + + def __init__(self, consumer, readkey, offset, read_event): + self._consumer = consumer + self._read_event = read_event + # TODO: pycryptopp CTR-mode needs random-access operations: I want + # either a=AES(readkey, offset) or better yet both of: + # a=AES(readkey, offset=0) + # a.process(ciphertext, offset=xyz) + # For now, we fake it with the existing iv= argument. + offset_big = offset // 16 + offset_small = offset % 16 + iv = binascii.unhexlify("%032x" % offset_big) + self._decryptor = AES(readkey, iv=iv) + self._decryptor.process("\x00"*offset_small) + + def registerProducer(self, producer, streaming): + # this passes through, so the real consumer can flow-control the real + # producer. Therefore we don't need to provide any IPushProducer + # methods. We implement all the IConsumer methods as pass-throughs, + # and only intercept write() to perform decryption. + self._consumer.registerProducer(producer, streaming) + def unregisterProducer(self): + self._consumer.unregisterProducer() + def write(self, ciphertext): + started = now() + plaintext = self._decryptor.process(ciphertext) + elapsed = now() - started + self._read_event.update(0, elapsed, 0) + self._consumer.write(plaintext) + +class ImmutableFileNode: + implements(IImmutableFileNode) + + # I wrap a CiphertextFileNode with a decryption key + def __init__(self, filecap, storage_broker, secret_holder, terminator, + history): + assert isinstance(filecap, uri.CHKFileURI) + verifycap = filecap.get_verify_cap() + ds = DownloadStatus(verifycap.storage_index, verifycap.size) + if history: + history.add_download(ds) + self._download_status = ds + self._cnode = CiphertextFileNode(verifycap, storage_broker, + secret_holder, terminator, history, ds) + assert isinstance(filecap, uri.CHKFileURI) + self.u = filecap + self._readkey = filecap.key + + # TODO: I'm not sure about this.. what's the use case for node==node? If + # we keep it here, we should also put this on CiphertextFileNode + def __hash__(self): + return self.u.__hash__() + def __eq__(self, other): + if isinstance(other, ImmutableFileNode): + return self.u.__eq__(other.u) + else: + return False + def __ne__(self, other): + if isinstance(other, ImmutableFileNode): + return self.u.__eq__(other.u) + else: + return True + def read(self, consumer, offset=0, size=None): - self.log("read", offset=offset, size=size, - umid="UPP8FA", level=log.OPERATIONAL) - if size is None: - size = self.get_size() - offset - size = min(size, self.get_size() - offset) - - if offset == 0 and size == self.get_size(): - # don't use the cache, just do a normal streaming download - self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL) - target = download.ConsumerAdapter(consumer) - return self._downloader.download(self.get_cap(), target, - self._parentmsgid, - history=self._history) - - d = self.download_cache.when_range_available(offset, size) - d.addCallback(lambda res: - self.download_cache.read(consumer, offset, size)) + actual_size = size + if actual_size == None: + actual_size = self.u.size + actual_size = actual_size - offset + read_ev = self._download_status.add_read_event(offset,actual_size, + now()) + decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev) + d = self._cnode.read(decryptor, offset, size, read_ev) + d.addCallback(lambda dc: consumer) return d -class LiteralProducer: - implements(IPushProducer) - def resumeProducing(self): - pass - def stopProducing(self): + def raise_error(self): pass + def get_write_uri(self): + return None -class LiteralFileNode(_ImmutableFileNodeBase): - - def __init__(self, filecap): - assert isinstance(filecap, LiteralFileURI) - self.u = filecap - - def get_size(self): - return len(self.u.data) - def get_current_size(self): - return defer.succeed(self.get_size()) + def get_readonly_uri(self): + return self.get_uri() + def get_uri(self): + return self.u.to_string() def get_cap(self): return self.u def get_readcap(self): - return self.u + return self.u.get_readonly() def get_verify_cap(self): - return None + return self.u.get_verify_cap() def get_repair_cap(self): - return None - - def get_uri(self): - return self.u.to_string() + # CHK files can be repaired with just the verifycap + return self.u.get_verify_cap() def get_storage_index(self): - return None + return self.u.get_storage_index() - def check(self, monitor, verify=False, add_lease=False): - return defer.succeed(None) + def get_size(self): + return self.u.get_size() + def get_current_size(self): + return defer.succeed(self.get_size()) - def check_and_repair(self, monitor, verify=False, add_lease=False): - return defer.succeed(None) + def is_mutable(self): + return False - def read(self, consumer, offset=0, size=None): - if size is None: - data = self.u.data[offset:] - else: - data = self.u.data[offset:offset+size] - - # We use twisted.protocols.basic.FileSender, which only does - # non-streaming, i.e. PullProducer, where the receiver/consumer must - # ask explicitly for each chunk of data. There are only two places in - # the Twisted codebase that can't handle streaming=False, both of - # which are in the upload path for an FTP/SFTP server - # (protocols.ftp.FileConsumer and - # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is - # likely to be used as the target for a Tahoe download. - - d = basic.FileSender().beginFileTransfer(StringIO(data), consumer) - d.addCallback(lambda lastSent: consumer) - return d + def is_readonly(self): + return True + + def is_unknown(self): + return False + + def is_allowed_in_immutable_directory(self): + return True + + def check_and_repair(self, monitor, verify=False, add_lease=False): + return self._cnode.check_and_repair(monitor, verify, add_lease) + def check(self, monitor, verify=False, add_lease=False): + return self._cnode.check(monitor, verify, add_lease) diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 6e07da7..27fb844 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -74,12 +74,16 @@ limitations described in #346. # they are still provided when writing so that older versions of Tahoe can # read them. +FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares + def make_write_bucket_proxy(rref, data_size, block_size, num_segments, num_share_hashes, uri_extension_size_max, nodeid): # Use layout v1 for small files, so they'll be readable by older versions # (= self.next_read_lens[0]) - or self._closed_to_pusher): - nrd = self.next_read_ds.popleft() - nrl = self.next_read_lens.popleft() - - # Pick out the requested number of bytes from self.bufs, turn it - # into a string, and callback the deferred with that. - res = [] - ressize = 0 - while ressize < nrl and self.bufs: - nextbuf = self.bufs.popleft() - res.append(nextbuf) - ressize += len(nextbuf) - if ressize > nrl: - extra = ressize - nrl - self.bufs.appendleft(nextbuf[:-extra]) - res[-1] = nextbuf[:-extra] - assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl) - assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl) - self.bufsiz -= nrl - if self.bufsiz < self.buflim and self.producer: - self.producer.resumeProducing() - nrd.callback(res) - - # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From - # the perspective of a downloader I am an IDownloadTarget and an - # IConsumer.) - def registerProducer(self, producer, streaming): - assert streaming # We know how to handle only streaming producers. - self.producer = producer # the downloader - def unregisterProducer(self): - self.producer = None - def open(self, size): - self.size = size - self._size_osol.fire(self.size) - def set_encodingparams(self, encodingparams): - self.encodingparams = encodingparams - self._encodingparams_osol.fire(self.encodingparams) - def set_storageindex(self, storageindex): - self.storageindex = storageindex - self._storageindex_osol.fire(self.storageindex) - def write(self, data): - precondition(data) # please don't write empty strings - self.bufs.append(data) - self.bufsiz += len(data) - self._satisfy_reads_if_possible() - if self.bufsiz >= self.buflim and self.producer: - self.producer.pauseProducing() - def finish(self): - pass - def close(self): - self._closed_to_pusher = True - # Any reads which haven't been satisfied by now are going to - # have to be satisfied with short reads. - self._satisfy_reads_if_possible() # methods to satisfy the IEncryptedUploader interface # (From the perspective of an uploader I am an IEncryptedUploadable.) def set_upload_status(self, upload_status): self.upload_status = upload_status def get_size(self): - if hasattr(self, 'size'): # attribute created by self.open() - return defer.succeed(self.size) - else: - return self._size_osol.when_fired() + size = self._filenode.get_size() + assert size is not None + return defer.succeed(size) def get_all_encoding_parameters(self): - # We have to learn the encoding params from pusher. - if hasattr(self, 'encodingparams'): - # attribute created by self.set_encodingparams() - return defer.succeed(self.encodingparams) - else: - return self._encodingparams_osol.when_fired() + return defer.succeed(self._encodingparams) def read_encrypted(self, length, hash_only): - """Returns a deferred which eventually fired with the requested - ciphertext.""" + """Returns a deferred which eventually fires with the requested + ciphertext, as a list of strings.""" precondition(length) # please don't ask to read 0 bytes - d = defer.Deferred() - self.next_read_ds.append(d) - self.next_read_lens.append(length) - self._satisfy_reads_if_possible() + mc = consumer.MemoryConsumer() + d = self._filenode.read(mc, self._offset, length) + self._offset += length + d.addCallback(lambda ign: mc.chunks) return d def get_storage_index(self): - # We have to learn the storage index from pusher. - if hasattr(self, 'storageindex'): - # attribute created by self.set_storageindex() - return defer.succeed(self.storageindex) - else: - return self._storageindex.when_fired() + return self._filenode.get_storage_index() + def close(self): + pass diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index dc46800..a3f8c92 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -20,7 +20,8 @@ from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ - NoServersError, InsufficientVersionError, UploadUnhappinessError + NoServersError, InsufficientVersionError, UploadUnhappinessError, \ + DEFAULT_MAX_SEGMENT_SIZE from allmydata.immutable import layout from pycryptopp.cipher.aes import AES @@ -1205,7 +1206,8 @@ class AssistedUploader: return self._upload_status class BaseUploadable: - default_max_segment_size = 128*KiB # overridden by max_segment_size + # this is overridden by max_segment_size + default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE default_encoding_param_k = 3 # overridden by encoding_parameters default_encoding_param_happy = 7 default_encoding_param_n = 10 diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 4cfe9c9..3a7fa7f 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -24,6 +24,9 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests +KiB = 1024 +DEFAULT_MAX_SEGMENT_SIZE = 128*KiB + class RIStubClient(RemoteInterface): """Each client publishes a service announcement for a dummy object called the StubClient. This object doesn't actually offer any services, but the diff --git a/src/allmydata/nodemaker.py b/src/allmydata/nodemaker.py index c852f68..3b74d90 100644 --- a/src/allmydata/nodemaker.py +++ b/src/allmydata/nodemaker.py @@ -1,7 +1,8 @@ import weakref from zope.interface import implements from allmydata.interfaces import INodeMaker -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode from allmydata.immutable.upload import Data from allmydata.mutable.filenode import MutableFileNode from allmydata.dirnode import DirectoryNode, pack_children @@ -12,14 +13,13 @@ class NodeMaker: implements(INodeMaker) def __init__(self, storage_broker, secret_holder, history, - uploader, downloader, download_cache_dirman, + uploader, terminator, default_encoding_parameters, key_generator): self.storage_broker = storage_broker self.secret_holder = secret_holder self.history = history self.uploader = uploader - self.downloader = downloader - self.download_cache_dirman = download_cache_dirman + self.terminator = terminator self.default_encoding_parameters = default_encoding_parameters self.key_generator = key_generator @@ -29,8 +29,10 @@ class NodeMaker: return LiteralFileNode(cap) def _create_immutable(self, cap): return ImmutableFileNode(cap, self.storage_broker, self.secret_holder, - self.downloader, self.history, - self.download_cache_dirman) + self.terminator, self.history) + def _create_immutable_verifier(self, cap): + return CiphertextFileNode(cap, self.storage_broker, self.secret_holder, + self.terminator, self.history) def _create_mutable(self, cap): n = MutableFileNode(self.storage_broker, self.secret_holder, self.default_encoding_parameters, @@ -73,6 +75,8 @@ class NodeMaker: return self._create_lit(cap) if isinstance(cap, uri.CHKFileURI): return self._create_immutable(cap) + if isinstance(cap, uri.CHKFileVerifierURI): + return self._create_immutable_verifier(cap) if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)): return self._create_mutable(cap) if isinstance(cap, (uri.DirectoryURI, diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 771dffd..a1c475d 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -223,6 +223,7 @@ class NoNetworkGrid(service.MultiService): fileutil.make_dirs(serverdir) ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(), readonly_storage=readonly) + ss._no_network_server_number = i return ss def add_server(self, i, ss): @@ -319,6 +320,16 @@ class GridTestMixin: pass return sorted(shares) + def copy_shares(self, uri): + shares = {} + for (shnum, serverid, sharefile) in self.find_shares(uri): + shares[sharefile] = open(sharefile, "rb").read() + return shares + + def restore_all_shares(self, shares): + for sharefile, data in shares.items(): + open(sharefile, "wb").write(data) + def delete_share(self, (shnum, serverid, sharefile)): os.unlink(sharefile) @@ -339,6 +350,12 @@ class GridTestMixin: corruptdata = corruptor(sharedata, debug=debug) open(i_sharefile, "wb").write(corruptdata) + def corrupt_all_shares(self, uri, corruptor, debug=False): + for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri): + sharedata = open(i_sharefile, "rb").read() + corruptdata = corruptor(sharedata, debug=debug) + open(i_sharefile, "wb").write(corruptdata) + def GET(self, urlpath, followRedirect=False, return_response=False, method="GET", clientnum=0, **kwargs): # if return_response=True, this fires with (data, statuscode, diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index cec32e4..1e88053 100644 --- a/src/allmydata/test/test_cli.py +++ b/src/allmydata/test/test_cli.py @@ -2300,12 +2300,19 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): self.delete_shares_numbered(ur.uri, range(1,10)) d.addCallback(_stash_bad) + # the download is abandoned as soon as it's clear that we won't get + # enough shares. The one remaining share might be in either the + # COMPLETE or the PENDING state. + in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" + in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" + d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) def _check1((rc, out, err)): self.failIfEqual(rc, 0) self.failUnless("410 Gone" in err, err) self.failUnlessIn("NotEnoughSharesError: ", err) - self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err) + self.failUnless(in_complete_msg in err or in_pending_msg in err, + err) d.addCallback(_check1) targetf = os.path.join(self.basedir, "output") @@ -2314,7 +2321,8 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): self.failIfEqual(rc, 0) self.failUnless("410 Gone" in err, err) self.failUnlessIn("NotEnoughSharesError: ", err) - self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err) + self.failUnless(in_complete_msg in err or in_pending_msg in err, + err) self.failIf(os.path.exists(targetf)) d.addCallback(_check2) diff --git a/src/allmydata/test/test_dirnode.py b/src/allmydata/test/test_dirnode.py index 7d8d66d..8122def 100644 --- a/src/allmydata/test/test_dirnode.py +++ b/src/allmydata/test/test_dirnode.py @@ -1202,7 +1202,7 @@ class Packing(testutil.ReallyEqualMixin, unittest.TestCase): def test_unpack_and_pack_behavior(self): known_tree = b32decode(self.known_tree) nodemaker = NodeMaker(None, None, None, - None, None, None, + None, None, {"k": 3, "n": 10}, None) write_uri = "URI:SSK-RO:e3mdrzfwhoq42hy5ubcz6rp3o4:ybyibhnp3vvwuq2vaw2ckjmesgkklfs6ghxleztqidihjyofgw7q" filenode = nodemaker.create_from_cap(write_uri) @@ -1264,8 +1264,7 @@ class Packing(testutil.ReallyEqualMixin, unittest.TestCase): return kids def test_deep_immutable(self): - nm = NodeMaker(None, None, None, None, None, None, {"k": 3, "n": 10}, - None) + nm = NodeMaker(None, None, None, None, None, {"k": 3, "n": 10}, None) fn = MinimalFakeMutableFile() kids = self._make_kids(nm, ["imm", "lit", "write", "read", @@ -1359,7 +1358,7 @@ class FakeNodeMaker(NodeMaker): class FakeClient2(Client): def __init__(self): self.nodemaker = FakeNodeMaker(None, None, None, - None, None, None, + None, None, {"k":3,"n":10}, None) def create_node_from_uri(self, rwcap, rocap): return self.nodemaker.create_from_cap(rwcap, rocap) @@ -1643,8 +1642,7 @@ class Deleter(GridTestMixin, testutil.ReallyEqualMixin, unittest.TestCase): def _do_delete(ignored): nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder, c0.get_history(), c0.getServiceNamed("uploader"), - c0.downloader, - c0.download_cache_dirman, + c0.terminator, c0.get_encoding_parameters(), c0._key_generator) n = nm.create_from_cap(self.root_uri) diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index b54bf01..520eaf2 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -5,12 +5,19 @@ import os from twisted.trial import unittest +from twisted.internet import defer, reactor from allmydata import uri from allmydata.storage.server import storage_index_to_dir -from allmydata.util import base32, fileutil -from allmydata.util.consumer import download_to_data -from allmydata.immutable import upload +from allmydata.util import base32, fileutil, spans, log +from allmydata.util.consumer import download_to_data, MemoryConsumer +from allmydata.immutable import upload, layout from allmydata.test.no_network import GridTestMixin +from allmydata.test.common import ShouldFailMixin +from allmydata.interfaces import NotEnoughSharesError, NoSharesError +from allmydata.immutable.downloader.common import BadSegmentNumberError, \ + BadCiphertextHashError, DownloadStopped +from allmydata.codec import CRSDecoder +from foolscap.eventual import fireEventually, flushEventualQueue plaintext = "This is a moderate-sized file.\n" * 10 mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10 @@ -68,20 +75,7 @@ mutable_shares = { } #--------- END stored_shares.py ---------------- -class DownloadTest(GridTestMixin, unittest.TestCase): - timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. - def test_download(self): - self.basedir = self.mktemp() - self.set_up_grid() - self.c0 = self.g.clients[0] - - # do this to create the shares - #return self.create_shares() - - self.load_shares() - d = self.download_immutable() - d.addCallback(self.download_mutable) - return d +class _Base(GridTestMixin, ShouldFailMixin): def create_shares(self, ignored=None): u = upload.Data(plaintext, None) @@ -178,6 +172,9 @@ class DownloadTest(GridTestMixin, unittest.TestCase): def _got_data(data): self.failUnlessEqual(data, plaintext) d.addCallback(_got_data) + # make sure we can use the same node twice + d.addCallback(lambda ign: download_to_data(n)) + d.addCallback(_got_data) return d def download_mutable(self, ignored=None): @@ -188,3 +185,867 @@ class DownloadTest(GridTestMixin, unittest.TestCase): d.addCallback(_got_data) return d +class DownloadTest(_Base, unittest.TestCase): + timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. + def test_download(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # do this to create the shares + #return self.create_shares() + + self.load_shares() + d = self.download_immutable() + d.addCallback(self.download_mutable) + return d + + def test_download_failover(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + si = uri.from_string(immutable_uri).get_storage_index() + si_dir = storage_index_to_dir(si) + + n = self.c0.create_node_from_uri(immutable_uri) + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + + def _clobber_some_shares(ign): + # find the three shares that were used, and delete them. Then + # download again, forcing the downloader to fail over to other + # shares + for s in n._cnode._node._shares: + for clientnum in immutable_shares: + for shnum in immutable_shares[clientnum]: + if s._shnum == shnum: + fn = os.path.join(self.get_serverdir(clientnum), + "shares", si_dir, str(shnum)) + os.unlink(fn) + d.addCallback(_clobber_some_shares) + d.addCallback(lambda ign: download_to_data(n)) + d.addCallback(_got_data) + + def _clobber_most_shares(ign): + # delete all but one of the shares that are still alive + live_shares = [s for s in n._cnode._node._shares if s.is_alive()] + save_me = live_shares[0]._shnum + for clientnum in immutable_shares: + for shnum in immutable_shares[clientnum]: + if shnum == save_me: + continue + fn = os.path.join(self.get_serverdir(clientnum), + "shares", si_dir, str(shnum)) + if os.path.exists(fn): + os.unlink(fn) + # now the download should fail with NotEnoughSharesError + return self.shouldFail(NotEnoughSharesError, "1shares", None, + download_to_data, n) + d.addCallback(_clobber_most_shares) + + def _clobber_all_shares(ign): + # delete the last remaining share + for clientnum in immutable_shares: + for shnum in immutable_shares[clientnum]: + fn = os.path.join(self.get_serverdir(clientnum), + "shares", si_dir, str(shnum)) + if os.path.exists(fn): + os.unlink(fn) + # now a new download should fail with NoSharesError. We want a + # new ImmutableFileNode so it will forget about the old shares. + # If we merely called create_node_from_uri() without first + # dereferencing the original node, the NodeMaker's _node_cache + # would give us back the old one. + n = None + n = self.c0.create_node_from_uri(immutable_uri) + return self.shouldFail(NoSharesError, "0shares", None, + download_to_data, n) + d.addCallback(_clobber_all_shares) + return d + + def test_lost_servers(self): + # while downloading a file (after seg[0], before seg[1]), lose the + # three servers that we were using. The download should switch over + # to other servers. + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, so we can catch the download + # in the middle. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs + d = self.c0.upload(u) + def _uploaded(ur): + self.uri = ur.uri + self.n = self.c0.create_node_from_uri(self.uri) + return download_to_data(self.n) + d.addCallback(_uploaded) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + def _kill_some_servers(): + # find the three shares that were used, and delete them. Then + # download again, forcing the downloader to fail over to other + # shares + servers = [] + shares = sorted([s._shnum for s in self.n._cnode._node._shares]) + self.failUnlessEqual(shares, [0,1,2]) + # break the RIBucketReader references + for s in self.n._cnode._node._shares: + s._rref.broken = True + for servernum in immutable_shares: + for shnum in immutable_shares[servernum]: + if s._shnum == shnum: + ss = self.g.servers_by_number[servernum] + servers.append(ss) + # and, for good measure, break the RIStorageServer references + # too, just in case the downloader gets more aggressive in the + # future and tries to re-fetch the same share. + for ss in servers: + wrapper = self.g.servers_by_id[ss.my_nodeid] + wrapper.broken = True + def _download_again(ign): + c = StallingConsumer(_kill_some_servers) + return self.n.read(c) + d.addCallback(_download_again) + def _check_failover(c): + self.failUnlessEqual("".join(c.chunks), plaintext) + shares = sorted([s._shnum for s in self.n._cnode._node._shares]) + # we should now be using more shares than we were before + self.failIfEqual(shares, [0,1,2]) + d.addCallback(_check_failover) + return d + + def test_badguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + + # Cause the downloader to guess a segsize that's too low, so it will + # ask for a segment number that's too high (beyond the end of the + # real list, causing BadSegmentNumberError), to exercise + # Segmentation._retry_bad_segment + + con1 = MemoryConsumer() + n._cnode._node._build_guessed_tables(90) + # plaintext size of 310 bytes, wrong-segsize of 90 bytes, will make + # us think that file[180:200] is in the third segment (segnum=2), but + # really there's only one segment + d = n.read(con1, 180, 20) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[180:200]) + d.addCallback(_done) + return d + + def test_simultaneous_badguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. Because we don't tell the + # downloader about the unusual segsize, it will guess wrong, and have + # to do extra roundtrips to get the correct data. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + d1 = n.read(con1, 70, 20) + d2 = n.read(con2, 140, 20) + return defer.gatherResults([d1,d2]) + d.addCallback(_uploaded) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + d.addCallback(_done) + return d + + def test_simultaneous_goodguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. This time we *do* tell the + # downloader about the unusual segsize, so it can guess right. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + d1 = n.read(con1, 70, 20) + #d2 = n.read(con2, 140, 20) # XXX + d2 = defer.succeed(None) + return defer.gatherResults([d1,d2]) + d.addCallback(_uploaded) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + #d.addCallback(_done) + return d + + def test_sequential_goodguess(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + data = (plaintext*100)[:30000] # multiple of k + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. This time we *do* tell the + # downloader about the unusual segsize, so it can guess right. + u = upload.Data(data, None) + u.max_segment_size = 6000 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + d = n.read(con1, 12000, 20) + def _read1(ign): + self.failUnlessEqual("".join(con1.chunks), data[12000:12020]) + return n.read(con2, 24000, 20) + d.addCallback(_read1) + def _read2(ign): + self.failUnlessEqual("".join(con2.chunks), data[24000:24020]) + d.addCallback(_read2) + return d + d.addCallback(_uploaded) + return d + + + def test_simultaneous_get_blocks(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + stay_empty = [] + + n = self.c0.create_node_from_uri(immutable_uri) + d = download_to_data(n) + def _use_shares(ign): + shares = list(n._cnode._node._shares) + s0 = shares[0] + # make sure .cancel works too + o0 = s0.get_block(0) + o0.subscribe(lambda **kwargs: stay_empty.append(kwargs)) + o1 = s0.get_block(0) + o2 = s0.get_block(0) + o0.cancel() + o3 = s0.get_block(1) # state=BADSEGNUM + d1 = defer.Deferred() + d2 = defer.Deferred() + d3 = defer.Deferred() + o1.subscribe(lambda **kwargs: d1.callback(kwargs)) + o2.subscribe(lambda **kwargs: d2.callback(kwargs)) + o3.subscribe(lambda **kwargs: d3.callback(kwargs)) + return defer.gatherResults([d1,d2,d3]) + d.addCallback(_use_shares) + def _done(res): + r1,r2,r3 = res + self.failUnlessEqual(r1["state"], "COMPLETE") + self.failUnlessEqual(r2["state"], "COMPLETE") + self.failUnlessEqual(r3["state"], "BADSEGNUM") + self.failUnless("block" in r1) + self.failUnless("block" in r2) + self.failIf(stay_empty) + d.addCallback(_done) + return d + + def test_download_no_overrun(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + + # tweak the client's copies of server-version data, so it believes + # that they're old and can't handle reads that overrun the length of + # the share. This exercises a different code path. + for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v1["tolerates-immutable-read-overrun"] = False + + n = self.c0.create_node_from_uri(immutable_uri) + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + return d + + def test_download_segment(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + (d,c) = cn.get_segment(0) + def _got_segment((offset,data,decodetime)): + self.failUnlessEqual(offset, 0) + self.failUnlessEqual(len(data), len(plaintext)) + d.addCallback(_got_segment) + return d + + def test_download_segment_cancel(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + (d,c) = cn.get_segment(0) + fired = [] + d.addCallback(fired.append) + c.cancel() + d = fireEventually() + d.addCallback(flushEventualQueue) + def _check(ign): + self.failUnlessEqual(fired, []) + d.addCallback(_check) + return d + + def test_download_bad_segment(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + def _try_download(): + (d,c) = cn.get_segment(1) + return d + d = self.shouldFail(BadSegmentNumberError, "badseg", + "segnum=1, numsegs=1", + _try_download) + return d + + def test_download_segment_terminate(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + cn = n._cnode + (d,c) = cn.get_segment(0) + fired = [] + d.addCallback(fired.append) + self.c0.terminator.disownServiceParent() + d = fireEventually() + d.addCallback(flushEventualQueue) + def _check(ign): + self.failUnlessEqual(fired, []) + d.addCallback(_check) + return d + + def test_pause(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + c = PausingConsumer() + d = n.read(c) + def _downloaded(mc): + newdata = "".join(mc.chunks) + self.failUnlessEqual(newdata, plaintext) + d.addCallback(_downloaded) + return d + + def test_pause_then_stop(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + c = PausingAndStoppingConsumer() + d = self.shouldFail(DownloadStopped, "test_pause_then_stop", + "our Consumer called stopProducing()", + n.read, c) + return d + + def test_stop(self): + # use a download targetthat does an immediate stop (ticket #473) + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + n = self.c0.create_node_from_uri(immutable_uri) + c = StoppingConsumer() + d = self.shouldFail(DownloadStopped, "test_stop", + "our Consumer called stopProducing()", + n.read, c) + return d + + def test_download_segment_bad_ciphertext_hash(self): + # The crypttext_hash_tree asserts the integrity of the decoded + # ciphertext, and exists to detect two sorts of problems. The first + # is a bug in zfec decode. The second is the "two-sided t-shirt" + # attack (found by Christian Grothoff), in which a malicious uploader + # creates two sets of shares (one for file A, second for file B), + # uploads a combination of them (shares 0-4 of A, 5-9 of B), and then + # builds an otherwise normal UEB around those shares: their goal is + # to give their victim a filecap which sometimes downloads the good A + # contents, and sometimes the bad B contents, depending upon which + # servers/shares they can get to. Having a hash of the ciphertext + # forces them to commit to exactly one version. (Christian's prize + # for finding this problem was a t-shirt with two sides: the shares + # of file A on the front, B on the back). + + # creating a set of shares with this property is too hard, although + # it'd be nice to do so and confirm our fix. (it requires a lot of + # tampering with the uploader). So instead, we just damage the + # decoder. The tail decoder is rebuilt each time, so we need to use a + # file with multiple segments. + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + u = upload.Data(plaintext, None) + u.max_segment_size = 60 # 6 segs + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + + d = download_to_data(n) + def _break_codec(data): + # the codec isn't created until the UEB is retrieved + node = n._cnode._node + vcap = node._verifycap + k, N = vcap.needed_shares, vcap.total_shares + bad_codec = BrokenDecoder() + bad_codec.set_params(node.segment_size, k, N) + node._codec = bad_codec + d.addCallback(_break_codec) + # now try to download it again. The broken codec will provide + # ciphertext that fails the hash test. + d.addCallback(lambda ign: + self.shouldFail(BadCiphertextHashError, "badhash", + "hash failure in " + "ciphertext_hash_tree: segnum=0", + download_to_data, n)) + return d + d.addCallback(_uploaded) + return d + + def OFFtest_download_segment_XXX(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file with multiple segments, and a non-default segsize, to + # exercise the offset-guessing code. This time we *do* tell the + # downloader about the unusual segsize, so it can guess right. + u = upload.Data(plaintext, None) + u.max_segment_size = 70 # 5 segs, 8-wide hashtree + con1 = MemoryConsumer() + con2 = MemoryConsumer() + d = self.c0.upload(u) + def _uploaded(ur): + n = self.c0.create_node_from_uri(ur.uri) + n._cnode._node._build_guessed_tables(u.max_segment_size) + d1 = n.read(con1, 70, 20) + #d2 = n.read(con2, 140, 20) + d2 = defer.succeed(None) + return defer.gatherResults([d1,d2]) + d.addCallback(_uploaded) + def _done(res): + self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) + self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) + #d.addCallback(_done) + return d + + def test_duplicate_shares(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + self.load_shares() + # make sure everybody has a copy of sh0. The second server contacted + # will report two shares, and the ShareFinder will handle the + # duplicate by attaching both to the same CommonShare instance. + si = uri.from_string(immutable_uri).get_storage_index() + si_dir = storage_index_to_dir(si) + sh0_file = [sharefile + for (shnum, serverid, sharefile) + in self.find_shares(immutable_uri) + if shnum == 0][0] + sh0_data = open(sh0_file, "rb").read() + for clientnum in immutable_shares: + if 0 in immutable_shares[clientnum]: + continue + cdir = self.get_serverdir(clientnum) + target = os.path.join(cdir, "shares", si_dir, "0") + outf = open(target, "wb") + outf.write(sh0_data) + outf.close() + + d = self.download_immutable() + return d + + def test_verifycap(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + self.load_shares() + + n = self.c0.create_node_from_uri(immutable_uri) + vcap = n.get_verify_cap().to_string() + vn = self.c0.create_node_from_uri(vcap) + d = download_to_data(vn) + def _got_ciphertext(ciphertext): + self.failUnlessEqual(len(ciphertext), len(plaintext)) + self.failIfEqual(ciphertext, plaintext) + d.addCallback(_got_ciphertext) + return d + +class BrokenDecoder(CRSDecoder): + def decode(self, shares, shareids): + d = CRSDecoder.decode(self, shares, shareids) + def _decoded(buffers): + def _corruptor(s, which): + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + buffers[0] = _corruptor(buffers[0], 0) # flip lsb of first byte + return buffers + d.addCallback(_decoded) + return d + + +class PausingConsumer(MemoryConsumer): + def __init__(self): + MemoryConsumer.__init__(self) + self.size = 0 + self.writes = 0 + def write(self, data): + self.size += len(data) + self.writes += 1 + if self.writes <= 2: + # we happen to use 4 segments, and want to avoid pausing on the + # last one (since then the _unpause timer will still be running) + self.producer.pauseProducing() + reactor.callLater(0.1, self._unpause) + return MemoryConsumer.write(self, data) + def _unpause(self): + self.producer.resumeProducing() + +class PausingAndStoppingConsumer(PausingConsumer): + def write(self, data): + self.producer.pauseProducing() + reactor.callLater(0.5, self._stop) + def _stop(self): + self.producer.stopProducing() + +class StoppingConsumer(PausingConsumer): + def write(self, data): + self.producer.stopProducing() + +class StallingConsumer(MemoryConsumer): + def __init__(self, halfway_cb): + MemoryConsumer.__init__(self) + self.halfway_cb = halfway_cb + self.writes = 0 + def write(self, data): + self.writes += 1 + if self.writes == 1: + self.halfway_cb() + return MemoryConsumer.write(self, data) + +class Corruption(_Base, unittest.TestCase): + + def _corrupt_flip(self, ign, imm_uri, which): + log.msg("corrupt %d" % which) + def _corruptor(s, debug=False): + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + + def _corrupt_set(self, ign, imm_uri, which, newvalue): + log.msg("corrupt %d" % which) + def _corruptor(s, debug=False): + return s[:which] + chr(newvalue) + s[which+1:] + self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + + def test_each_byte(self): + # Setting catalog_detection=True performs an exhaustive test of the + # Downloader's response to corruption in the lsb of each byte of the + # 2070-byte share, with two goals: make sure we tolerate all forms of + # corruption (i.e. don't hang or return bad data), and make a list of + # which bytes can be corrupted without influencing the download + # (since we don't need every byte of the share). That takes 50s to + # run on my laptop and doesn't have any actual asserts, so we don't + # normally do that. + self.catalog_detection = False + + self.basedir = "download/Corruption/each_byte" + self.set_up_grid() + self.c0 = self.g.clients[0] + + # to exercise the block-hash-tree code properly, we need to have + # multiple segments. We don't tell the downloader about the different + # segsize, so it guesses wrong and must do extra roundtrips. + u = upload.Data(plaintext, None) + u.max_segment_size = 120 # 3 segs, 4-wide hashtree + + if self.catalog_detection: + undetected = spans.Spans() + + def _download(ign, imm_uri, which, expected): + n = self.c0.create_node_from_uri(imm_uri) + # for this test to work, we need to have a new Node each time. + # Make sure the NodeMaker's weakcache hasn't interfered. + assert not n._cnode._node._shares + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + shnums = sorted([s._shnum for s in n._cnode._node._shares]) + no_sh0 = bool(0 not in shnums) + sh0 = [s for s in n._cnode._node._shares if s._shnum == 0] + sh0_had_corruption = False + if sh0 and sh0[0].had_corruption: + sh0_had_corruption = True + num_needed = len(n._cnode._node._shares) + if self.catalog_detection: + detected = no_sh0 or sh0_had_corruption or (num_needed!=3) + if not detected: + undetected.add(which, 1) + if expected == "no-sh0": + self.failIfIn(0, shnums) + elif expected == "0bad-need-3": + self.failIf(no_sh0) + self.failUnless(sh0[0].had_corruption) + self.failUnlessEqual(num_needed, 3) + elif expected == "need-4th": + self.failIf(no_sh0) + self.failUnless(sh0[0].had_corruption) + self.failIfEqual(num_needed, 3) + d.addCallback(_got_data) + return d + + + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + self.shares = self.copy_shares(imm_uri) + d = defer.succeed(None) + # 'victims' is a list of corruption tests to run. Each one flips + # the low-order bit of the specified offset in the share file (so + # offset=0 is the MSB of the container version, offset=15 is the + # LSB of the share version, offset=24 is the MSB of the + # data-block-offset, and offset=48 is the first byte of the first + # data-block). Each one also specifies what sort of corruption + # we're expecting to see. + no_sh0_victims = [0,1,2,3] # container version + need3_victims = [ ] # none currently in this category + # when the offsets are corrupted, the Share will be unable to + # retrieve the data it wants (because it thinks that data lives + # off in the weeds somewhere), and Share treats DataUnavailable + # as abandon-this-share, so in general we'll be forced to look + # for a 4th share. + need_4th_victims = [12,13,14,15, # share version + 24,25,26,27, # offset[data] + 32,33,34,35, # offset[crypttext_hash_tree] + 36,37,38,39, # offset[block_hashes] + 44,45,46,47, # offset[UEB] + ] + need_4th_victims.append(48) # block data + # when corrupting hash trees, we must corrupt a value that isn't + # directly set from somewhere else. Since we download data from + # seg0, corrupt something on its hash chain, like [2] (the + # right-hand child of the root) + need_4th_victims.append(600+2*32) # block_hashes[2] + # Share.loop is pretty conservative: it abandons the share at the + # first sign of corruption. It doesn't strictly need to be this + # way: if the UEB were corrupt, we could still get good block + # data from that share, as long as there was a good copy of the + # UEB elsewhere. If this behavior is relaxed, then corruption in + # the following fields (which are present in multiple shares) + # should fall into the "need3_victims" case instead of the + # "need_4th_victims" case. + need_4th_victims.append(376+2*32) # crypttext_hash_tree[2] + need_4th_victims.append(824) # share_hashes + need_4th_victims.append(994) # UEB length + need_4th_victims.append(998) # UEB + corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] + + [(i, "0bad-need-3") for i in need3_victims] + + [(i, "need-4th") for i in need_4th_victims]) + if self.catalog_detection: + corrupt_me = [(i, "") for i in range(len(self.sh0_orig))] + for i,expected in corrupt_me: + # All these tests result in a successful download. What we're + # measuring is how many shares the downloader had to use. + d.addCallback(self._corrupt_flip, imm_uri, i) + d.addCallback(_download, imm_uri, i, expected) + d.addCallback(lambda ign: self.restore_all_shares(self.shares)) + d.addCallback(fireEventually) + corrupt_values = [(3, 2, "no-sh0"), + (15, 2, "need-4th"), # share looks v2 + ] + for i,newvalue,expected in corrupt_values: + d.addCallback(self._corrupt_set, imm_uri, i, newvalue) + d.addCallback(_download, imm_uri, i, expected) + d.addCallback(lambda ign: self.restore_all_shares(self.shares)) + d.addCallback(fireEventually) + return d + d.addCallback(_uploaded) + def _show_results(ign): + print + print ("of [0:%d], corruption ignored in %s" % + (len(self.sh0_orig), undetected.dump())) + if self.catalog_detection: + d.addCallback(_show_results) + # of [0:2070], corruption ignored in len=1133: + # [4-11],[16-23],[28-31],[152-439],[600-663],[1309-2069] + # [4-11]: container sizes + # [16-23]: share block/data sizes + # [152-375]: plaintext hash tree + # [376-408]: crypttext_hash_tree[0] (root) + # [408-439]: crypttext_hash_tree[1] (computed) + # [600-631]: block hash tree[0] (root) + # [632-663]: block hash tree[1] (computed) + # [1309-]: reserved+unused UEB space + return d + + def test_failure(self): + # this test corrupts all shares in the same way, and asserts that the + # download fails. + + self.basedir = "download/Corruption/failure" + self.set_up_grid() + self.c0 = self.g.clients[0] + + # to exercise the block-hash-tree code properly, we need to have + # multiple segments. We don't tell the downloader about the different + # segsize, so it guesses wrong and must do extra roundtrips. + u = upload.Data(plaintext, None) + u.max_segment_size = 120 # 3 segs, 4-wide hashtree + + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + self.shares = self.copy_shares(imm_uri) + + corrupt_me = [(48, "block data", "Last failure: None"), + (600+2*32, "block_hashes[2]", "BadHashError"), + (376+2*32, "crypttext_hash_tree[2]", "BadHashError"), + (824, "share_hashes", "BadHashError"), + ] + def _download(imm_uri): + n = self.c0.create_node_from_uri(imm_uri) + # for this test to work, we need to have a new Node each time. + # Make sure the NodeMaker's weakcache hasn't interfered. + assert not n._cnode._node._shares + return download_to_data(n) + + d = defer.succeed(None) + for i,which,substring in corrupt_me: + # All these tests result in a failed download. + d.addCallback(self._corrupt_flip_all, imm_uri, i) + d.addCallback(lambda ign: + self.shouldFail(NotEnoughSharesError, which, + substring, + _download, imm_uri)) + d.addCallback(lambda ign: self.restore_all_shares(self.shares)) + d.addCallback(fireEventually) + return d + d.addCallback(_uploaded) + + return d + + def _corrupt_flip_all(self, ign, imm_uri, which): + def _corruptor(s, debug=False): + return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] + self.corrupt_all_shares(imm_uri, _corruptor) + +class DownloadV2(_Base, unittest.TestCase): + # tests which exercise v2-share code. They first upload a file with + # FORCE_V2 set. + + def setUp(self): + d = defer.maybeDeferred(_Base.setUp, self) + def _set_force_v2(ign): + self.old_force_v2 = layout.FORCE_V2 + layout.FORCE_V2 = True + d.addCallback(_set_force_v2) + return d + def tearDown(self): + layout.FORCE_V2 = self.old_force_v2 + return _Base.tearDown(self) + + def test_download(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # upload a file + u = upload.Data(plaintext, None) + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + n = self.c0.create_node_from_uri(imm_uri) + return download_to_data(n) + d.addCallback(_uploaded) + return d + + def test_download_no_overrun(self): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + # tweak the client's copies of server-version data, so it believes + # that they're old and can't handle reads that overrun the length of + # the share. This exercises a different code path. + for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v1["tolerates-immutable-read-overrun"] = False + + # upload a file + u = upload.Data(plaintext, None) + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + n = self.c0.create_node_from_uri(imm_uri) + return download_to_data(n) + d.addCallback(_uploaded) + return d + + def OFF_test_no_overrun_corrupt_shver(self): # unnecessary + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + + for (peerid, rref) in self.c0.storage_broker.get_all_servers(): + v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] + v1["tolerates-immutable-read-overrun"] = False + + # upload a file + u = upload.Data(plaintext, None) + d = self.c0.upload(u) + def _uploaded(ur): + imm_uri = ur.uri + def _do_corrupt(which, newvalue): + def _corruptor(s, debug=False): + return s[:which] + chr(newvalue) + s[which+1:] + self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + _do_corrupt(12+3, 0x00) + n = self.c0.create_node_from_uri(imm_uri) + d = download_to_data(n) + def _got_data(data): + self.failUnlessEqual(data, plaintext) + d.addCallback(_got_data) + return d + d.addCallback(_uploaded) + return d diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 1108e18..c06fbbd 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -1,17 +1,15 @@ from zope.interface import implements from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.python.failure import Failure from foolscap.api import fireEventually -from allmydata import hashtree, uri -from allmydata.immutable import encode, upload, download +from allmydata import uri +from allmydata.immutable import encode, upload, checker from allmydata.util import hashutil from allmydata.util.assertutil import _assert -from allmydata.util.consumer import MemoryConsumer -from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ - NotEnoughSharesError, IStorageBroker, UploadUnhappinessError -from allmydata.monitor import Monitor -import allmydata.test.common_util as testutil +from allmydata.util.consumer import download_to_data +from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader +from allmydata.test.no_network import GridTestMixin class LostPeerError(Exception): pass @@ -19,9 +17,6 @@ class LostPeerError(Exception): def flip_bit(good): # flips the last bit return good[:-1] + chr(ord(good[-1]) ^ 0x01) -class FakeStorageBroker: - implements(IStorageBroker) - class FakeBucketReaderWriterProxy: implements(IStorageBucketWriter, IStorageBucketReader) # these are used for both reading and writing @@ -59,13 +54,6 @@ class FakeBucketReaderWriterProxy: self.blocks[segmentnum] = data return defer.maybeDeferred(_try) - def put_plaintext_hashes(self, hashes): - def _try(): - assert not self.closed - assert not self.plaintext_hashes - self.plaintext_hashes = hashes - return defer.maybeDeferred(_try) - def put_crypttext_hashes(self, hashes): def _try(): assert not self.closed @@ -223,7 +211,7 @@ class ValidatedExtendedURIProxy(unittest.TestCase): fb = FakeBucketReaderWriterProxy() fb.put_uri_extension(uebstring) verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE) - vup = download.ValidatedExtendedURIProxy(fb, verifycap) + vup = checker.ValidatedExtendedURIProxy(fb, verifycap) return vup.start() def _test_accept(self, uebdict): @@ -237,7 +225,7 @@ class ValidatedExtendedURIProxy(unittest.TestCase): def _test_reject(self, uebdict): d = self._test(uebdict) - d.addBoth(self._should_fail, (KeyError, download.BadURIExtension)) + d.addBoth(self._should_fail, (KeyError, checker.BadURIExtension)) return d def test_accept_minimal(self): @@ -333,30 +321,6 @@ class Encode(unittest.TestCase): return d - # a series of 3*3 tests to check out edge conditions. One axis is how the - # plaintext is divided into segments: kn+(-1,0,1). Another way to express - # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we - # might test 74 bytes, 75 bytes, and 76 bytes. - - # on the other axis is how many leaves in the block hash tree we wind up - # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns - # into a single leaf. So we'd like to check out, e.g., 3 segments, 4 - # segments, and 5 segments. - - # that results in the following series of data lengths: - # 3 segs: 74, 75, 51 - # 4 segs: 99, 100, 76 - # 5 segs: 124, 125, 101 - - # all tests encode to 100 shares, which means the share hash tree will - # have 128 leaves, which means that buckets will be given an 8-long share - # hash chain - - # all 3-segment files will have a 4-leaf blockhashtree, and thus expect - # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash - # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash - # trees, which get 15 blockhashes. - def test_send_74(self): # 3 segments (25, 25, 24) return self.do_encode(25, 74, 100, 3, 7, 8) @@ -387,422 +351,62 @@ class Encode(unittest.TestCase): # 5 segments: 25, 25, 25, 25, 1 return self.do_encode(25, 101, 100, 5, 15, 8) -class PausingConsumer(MemoryConsumer): - def __init__(self): - MemoryConsumer.__init__(self) - self.size = 0 - self.writes = 0 - def write(self, data): - self.size += len(data) - self.writes += 1 - if self.writes <= 2: - # we happen to use 4 segments, and want to avoid pausing on the - # last one (since then the _unpause timer will still be running) - self.producer.pauseProducing() - reactor.callLater(0.1, self._unpause) - return MemoryConsumer.write(self, data) - def _unpause(self): - self.producer.resumeProducing() - -class PausingAndStoppingConsumer(PausingConsumer): - def write(self, data): - self.producer.pauseProducing() - reactor.callLater(0.5, self._stop) - def _stop(self): - self.producer.stopProducing() - -class StoppingConsumer(PausingConsumer): - def write(self, data): - self.producer.stopProducing() - -class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): - timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. - def send_and_recover(self, k_and_happy_and_n=(25,75,100), - AVAILABLE_SHARES=None, - datalen=76, - max_segment_size=25, - bucket_modes={}, - recover_mode="recover", - consumer=None, - ): - if AVAILABLE_SHARES is None: - AVAILABLE_SHARES = k_and_happy_and_n[2] - data = make_data(datalen) - d = self.send(k_and_happy_and_n, AVAILABLE_SHARES, - max_segment_size, bucket_modes, data) - # that fires with (uri_extension_hash, e, shareholders) - d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode, - consumer=consumer) - # that fires with newdata - def _downloaded((newdata, fd)): - self.failUnless(newdata == data, str((len(newdata), len(data)))) - return fd - d.addCallback(_downloaded) - return d - def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size, - bucket_modes, data): - k, happy, n = k_and_happy_and_n - NUM_SHARES = k_and_happy_and_n[2] - if AVAILABLE_SHARES is None: - AVAILABLE_SHARES = NUM_SHARES - e = encode.Encoder() - u = upload.Data(data, convergence="some convergence string") - # force use of multiple segments by using a low max_segment_size - u.max_segment_size = max_segment_size - u.encoding_param_k = k - u.encoding_param_happy = happy - u.encoding_param_n = n - eu = upload.EncryptAnUploadable(u) - d = e.set_encrypted_uploadable(eu) - - shareholders = {} - def _ready(res): - k,happy,n = e.get_param("share_counts") - assert n == NUM_SHARES # else we'll be completely confused - servermap = {} - for shnum in range(NUM_SHARES): - mode = bucket_modes.get(shnum, "good") - peer = FakeBucketReaderWriterProxy(mode, "peer%d" % shnum) - shareholders[shnum] = peer - servermap.setdefault(shnum, set()).add(peer.get_peerid()) - e.set_shareholders(shareholders, servermap) - return e.start() - d.addCallback(_ready) - def _sent(res): - d1 = u.get_encryption_key() - d1.addCallback(lambda key: (res, key, shareholders)) - return d1 - d.addCallback(_sent) - return d +class Roundtrip(GridTestMixin, unittest.TestCase): - def recover(self, (res, key, shareholders), AVAILABLE_SHARES, - recover_mode, consumer=None): - verifycap = res - - if "corrupt_key" in recover_mode: - # we corrupt the key, so that the decrypted data is corrupted and - # will fail the plaintext hash check. Since we're manually - # attaching shareholders, the fact that the storage index is also - # corrupted doesn't matter. - key = flip_bit(key) - - u = uri.CHKFileURI(key=key, - uri_extension_hash=verifycap.uri_extension_hash, - needed_shares=verifycap.needed_shares, - total_shares=verifycap.total_shares, - size=verifycap.size) - - sb = FakeStorageBroker() - if not consumer: - consumer = MemoryConsumer() - innertarget = download.ConsumerAdapter(consumer) - target = download.DecryptingTarget(innertarget, u.key) - fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor()) - - # we manually cycle the CiphertextDownloader through a number of steps that - # would normally be sequenced by a Deferred chain in - # CiphertextDownloader.start(), to give us more control over the process. - # In particular, by bypassing _get_all_shareholders, we skip - # permuted-peerlist selection. - for shnum, bucket in shareholders.items(): - if shnum < AVAILABLE_SHARES and bucket.closed: - fd.add_share_bucket(shnum, bucket) - fd._got_all_shareholders(None) - - # Make it possible to obtain uri_extension from the shareholders. - # Arrange for shareholders[0] to be the first, so we can selectively - # corrupt the data it returns. - uri_extension_sources = shareholders.values() - uri_extension_sources.remove(shareholders[0]) - uri_extension_sources.insert(0, shareholders[0]) - - d = defer.succeed(None) - - # have the CiphertextDownloader retrieve a copy of uri_extension itself - d.addCallback(fd._obtain_uri_extension) - - if "corrupt_crypttext_hashes" in recover_mode: - # replace everybody's crypttext hash trees with a different one - # (computed over a different file), then modify our uri_extension - # to reflect the new crypttext hash tree root - def _corrupt_crypttext_hashes(unused): - assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup - assert fd._vup.crypttext_root_hash, fd._vup - badhash = hashutil.tagged_hash("bogus", "data") - bad_crypttext_hashes = [badhash] * fd._vup.num_segments - badtree = hashtree.HashTree(bad_crypttext_hashes) - for bucket in shareholders.values(): - bucket.crypttext_hashes = list(badtree) - fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments) - fd._crypttext_hash_tree.set_hashes({0: badtree[0]}) - return fd._vup - d.addCallback(_corrupt_crypttext_hashes) - - # also have the CiphertextDownloader ask for hash trees - d.addCallback(fd._get_crypttext_hash_tree) - - d.addCallback(fd._download_all_segments) - d.addCallback(fd._done) - def _done(t): - newdata = "".join(consumer.chunks) - return (newdata, fd) - d.addCallback(_done) - return d - - def test_not_enough_shares(self): - d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(NotEnoughSharesError)) - d.addBoth(_done) - return d - - def test_one_share_per_peer(self): - return self.send_and_recover() - - def test_74(self): - return self.send_and_recover(datalen=74) - def test_75(self): - return self.send_and_recover(datalen=75) - def test_51(self): - return self.send_and_recover(datalen=51) - - def test_99(self): - return self.send_and_recover(datalen=99) - def test_100(self): - return self.send_and_recover(datalen=100) - def test_76(self): - return self.send_and_recover(datalen=76) - - def test_124(self): - return self.send_and_recover(datalen=124) - def test_125(self): - return self.send_and_recover(datalen=125) - def test_101(self): - return self.send_and_recover(datalen=101) - - def test_pause(self): - # use a download target that does pauseProducing/resumeProducing a - # few times, then finishes - c = PausingConsumer() - d = self.send_and_recover(consumer=c) - return d - - def test_pause_then_stop(self): - # use a download target that pauses, then stops. - c = PausingAndStoppingConsumer() - d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop", - "our Consumer called stopProducing()", - self.send_and_recover, consumer=c) - return d - - def test_stop(self): - # use a download targetthat does an immediate stop (ticket #473) - c = StoppingConsumer() - d = self.shouldFail(download.DownloadStopped, "test_stop", - "our Consumer called stopProducing()", - self.send_and_recover, consumer=c) - return d - - # the following tests all use 4-out-of-10 encoding - - def test_bad_blocks(self): - # the first 6 servers have bad blocks, which will be caught by the - # blockhashes - modemap = dict([(i, "bad block") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_bad_blocks_failure(self): - # the first 7 servers have bad blocks, which will be caught by the - # blockhashes, and the download will fail - modemap = dict([(i, "bad block") - for i in range(7)] - + [(i, "good") - for i in range(7, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure), res) - self.failUnless(res.check(NotEnoughSharesError), res) - d.addBoth(_done) - return d - - def test_bad_blockhashes(self): - # the first 6 servers have bad block hashes, so the blockhash tree - # will not validate - modemap = dict([(i, "bad blockhash") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_bad_blockhashes_failure(self): - # the first 7 servers have bad block hashes, so the blockhash tree - # will not validate, and the download will fail - modemap = dict([(i, "bad blockhash") - for i in range(7)] - + [(i, "good") - for i in range(7, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(NotEnoughSharesError), res) - d.addBoth(_done) - return d - - def test_bad_sharehashes(self): - # the first 6 servers have bad block hashes, so the sharehash tree - # will not validate - modemap = dict([(i, "bad sharehash") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def assertFetchFailureIn(self, fd, where): - expected = {"uri_extension": 0, - "crypttext_hash_tree": 0, - } - if where is not None: - expected[where] += 1 - self.failUnlessEqual(fd._fetch_failures, expected) - - def test_good(self): - # just to make sure the test harness works when we aren't - # intentionally causing failures - modemap = dict([(i, "good") for i in range(0, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, None) - return d - - def test_bad_uri_extension(self): - # the first server has a bad uri_extension block, so we will fail - # over to a different server. - modemap = dict([(i, "bad uri_extension") for i in range(1)] + - [(i, "good") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, "uri_extension") - return d - - def test_bad_crypttext_hashroot(self): - # the first server has a bad crypttext hashroot, so we will fail - # over to a different server. - modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] + - [(i, "good") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree") - return d - - def test_bad_crypttext_hashes(self): - # the first server has a bad crypttext hash block, so we will fail - # over to a different server. - modemap = dict([(i, "bad crypttext hash") for i in range(1)] + - [(i, "good") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree") - return d - - def test_bad_crypttext_hashes_failure(self): - # to test that the crypttext merkle tree is really being applied, we - # sneak into the download process and corrupt two things: we replace - # everybody's crypttext hashtree with a bad version (computed over - # bogus data), and we modify the supposedly-validated uri_extension - # block to match the new crypttext hashtree root. The download - # process should notice that the crypttext coming out of FEC doesn't - # match the tree, and fail. - - modemap = dict([(i, "good") for i in range(0, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap, - recover_mode=("corrupt_crypttext_hashes")) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(hashtree.BadHashError), res) - d.addBoth(_done) - return d + # a series of 3*3 tests to check out edge conditions. One axis is how the + # plaintext is divided into segments: kn+(-1,0,1). Another way to express + # this is n%k == -1 or 0 or 1. For example, for 25-byte segments, we + # might test 74 bytes, 75 bytes, and 76 bytes. - def OFF_test_bad_plaintext(self): - # faking a decryption failure is easier: just corrupt the key - modemap = dict([(i, "good") for i in range(0, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap, - recover_mode=("corrupt_key")) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(hashtree.BadHashError), res) - d.addBoth(_done) - return d + # on the other axis is how many leaves in the block hash tree we wind up + # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns + # into a single leaf. So we'd like to check out, e.g., 3 segments, 4 + # segments, and 5 segments. - def test_bad_sharehashes_failure(self): - # all ten servers have bad share hashes, so the sharehash tree - # will not validate, and the download will fail - modemap = dict([(i, "bad sharehash") - for i in range(10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(NotEnoughSharesError)) - d.addBoth(_done) - return d + # that results in the following series of data lengths: + # 3 segs: 74, 75, 51 + # 4 segs: 99, 100, 76 + # 5 segs: 124, 125, 101 - def test_missing_sharehashes(self): - # the first 6 servers are missing their sharehashes, so the - # sharehash tree will not validate - modemap = dict([(i, "missing sharehash") - for i in range(6)] - + [(i, "good") - for i in range(6, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_missing_sharehashes_failure(self): - # all servers are missing their sharehashes, so the sharehash tree will not validate, - # and the download will fail - modemap = dict([(i, "missing sharehash") - for i in range(10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure), res) - self.failUnless(res.check(NotEnoughSharesError), res) - d.addBoth(_done) - return d + # all tests encode to 100 shares, which means the share hash tree will + # have 128 leaves, which means that buckets will be given an 8-long share + # hash chain - def test_lost_one_shareholder(self): - # we have enough shareholders when we start, but one segment in we - # lose one of them. The upload should still succeed, as long as we - # still have 'servers_of_happiness' peers left. - modemap = dict([(i, "good") for i in range(9)] + - [(i, "lost") for i in range(9, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_lost_one_shareholder_early(self): - # we have enough shareholders when we choose peers, but just before - # we send the 'start' message, we lose one of them. The upload should - # still succeed, as long as we still have 'servers_of_happiness' peers - # left. - modemap = dict([(i, "good") for i in range(9)] + - [(i, "lost-early") for i in range(9, 10)]) - return self.send_and_recover((4,8,10), bucket_modes=modemap) - - def test_lost_many_shareholders(self): - # we have enough shareholders when we start, but one segment in we - # lose all but one of them. The upload should fail. - modemap = dict([(i, "good") for i in range(1)] + - [(i, "lost") for i in range(1, 10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(UploadUnhappinessError), res) - d.addBoth(_done) + # all 3-segment files will have a 4-leaf blockhashtree, and thus expect + # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash + # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash + # trees, which gets 15 blockhashes. + + def test_74(self): return self.do_test_size(74) + def test_75(self): return self.do_test_size(75) + def test_51(self): return self.do_test_size(51) + def test_99(self): return self.do_test_size(99) + def test_100(self): return self.do_test_size(100) + def test_76(self): return self.do_test_size(76) + def test_124(self): return self.do_test_size(124) + def test_125(self): return self.do_test_size(125) + def test_101(self): return self.do_test_size(101) + + def upload(self, data): + u = upload.Data(data, None) + u.max_segment_size = 25 + u.encoding_param_k = 25 + u.encoding_param_happy = 1 + u.encoding_param_n = 100 + d = self.c0.upload(u) + d.addCallback(lambda ur: self.c0.create_node_from_uri(ur.uri)) + # returns a FileNode return d - def test_lost_all_shareholders(self): - # we have enough shareholders when we start, but one segment in we - # lose all of them. The upload should fail. - modemap = dict([(i, "lost") for i in range(10)]) - d = self.send_and_recover((4,8,10), bucket_modes=modemap) - def _done(res): - self.failUnless(isinstance(res, Failure)) - self.failUnless(res.check(UploadUnhappinessError)) - d.addBoth(_done) + def do_test_size(self, size): + self.basedir = self.mktemp() + self.set_up_grid() + self.c0 = self.g.clients[0] + DATA = "p"*size + d = self.upload(DATA) + d.addCallback(lambda n: download_to_data(n)) + def _downloaded(newdata): + self.failUnlessEqual(newdata, DATA) + d.addCallback(_downloaded) return d diff --git a/src/allmydata/test/test_filenode.py b/src/allmydata/test/test_filenode.py index 5f3feaa..61bb0e8 100644 --- a/src/allmydata/test/test_filenode.py +++ b/src/allmydata/test/test_filenode.py @@ -2,9 +2,10 @@ from twisted.trial import unittest from allmydata import uri, client from allmydata.monitor import Monitor -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode from allmydata.mutable.filenode import MutableFileNode -from allmydata.util import hashutil, cachedir +from allmydata.util import hashutil from allmydata.util.consumer import download_to_data class NotANode: @@ -30,9 +31,8 @@ class Node(unittest.TestCase): needed_shares=3, total_shares=10, size=1000) - cf = cachedir.CacheFile("none") - fn1 = ImmutableFileNode(u, None, None, None, None, cf) - fn2 = ImmutableFileNode(u, None, None, None, None, cf) + fn1 = ImmutableFileNode(u, None, None, None, None) + fn2 = ImmutableFileNode(u, None, None, None, None) self.failUnlessEqual(fn1, fn2) self.failIfEqual(fn1, "I am not a filenode") self.failIfEqual(fn1, NotANode()) diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index b1def16..8856ce2 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -10,12 +10,14 @@ from allmydata.mutable.common import UnrecoverableFileError from allmydata.storage.common import storage_index_to_dir from allmydata.test.no_network import GridTestMixin from allmydata.test.common import ShouldFailMixin, _corrupt_share_data +from allmydata.util.pollmixin import PollMixin from allmydata.interfaces import NotEnoughSharesError immutable_plaintext = "data" * 10000 mutable_plaintext = "muta" * 10000 -class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): +class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, + unittest.TestCase): # Many of these tests take around 60 seconds on François's ARM buildslave: # http://tahoe-lafs.org/buildbot/builders/FranXois%20lenny-armv5tel # allmydata.test.test_hung_server.HungServerDownloadTest.test_2_good_8_broken_duplicate_share_fail once ERRORed after 197 seconds on Midnight Magic's NetBSD buildslave: @@ -36,6 +38,15 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): for (id, ss) in servers: self.g.unhang_server(id, **kwargs) + def _hang_shares(self, shnums, **kwargs): + # hang all servers who are holding the given shares + hung_serverids = set() + for (i_shnum, i_serverid, i_sharefile) in self.shares: + if i_shnum in shnums: + if i_serverid not in hung_serverids: + self.g.hang_server(i_serverid, **kwargs) + hung_serverids.add(i_serverid) + def _delete_all_shares_from(self, servers): serverids = [id for (id, ss) in servers] for (i_shnum, i_serverid, i_sharefile) in self.shares: @@ -113,7 +124,8 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): stage_4_d = None # currently we aren't doing any tests which require this for mutable files else: d = download_to_data(n) - stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME + #stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME + stage_4_d = None return (d, stage_4_d,) def _wait_for_data(self, n): @@ -141,7 +153,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): self._download_and_check) else: return self.shouldFail(NotEnoughSharesError, self.basedir, - "Failed to get enough shareholders", + "ran out of shares", self._download_and_check) @@ -204,48 +216,114 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): # The tests below do not currently pass for mutable files. - def test_3_good_7_hung(self): + def test_3_good_7_hung_immutable(self): d = defer.succeed(None) - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung")) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._download_and_check()) + d.addCallback(lambda ign: self._set_up(False, "test_3_good_7_hung")) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._download_and_check()) return d - def test_2_good_8_hung_then_1_recovers(self): + def test_5_overdue_immutable(self): + # restrict the ShareFinder to only allow 5 outstanding requests, and + # arrange for the first 5 servers to hang. Then trigger the OVERDUE + # timers (simulating 10 seconds passed), at which point the + # ShareFinder should send additional queries and finish the download + # quickly. If we didn't have OVERDUE timers, this test would fail by + # timing out. + done = [] + d = self._set_up(False, "test_5_overdue_immutable") + def _reduce_max_outstanding_requests_and_download(ign): + self._hang_shares(range(5)) + n = self.c0.create_node_from_uri(self.uri) + self._sf = n._cnode._node._sharefinder + self._sf.max_outstanding_requests = 5 + self._sf.OVERDUE_TIMEOUT = 1000.0 + d2 = download_to_data(n) + # start download, but don't wait for it to complete yet + def _done(res): + done.append(res) # we will poll for this later + d2.addBoth(_done) + d.addCallback(_reduce_max_outstanding_requests_and_download) + from foolscap.eventual import fireEventually, flushEventualQueue + # wait here a while + d.addCallback(lambda res: fireEventually(res)) + d.addCallback(lambda res: flushEventualQueue()) + d.addCallback(lambda ign: self.failIf(done)) + def _check_waiting(ign): + # all the share requests should now be stuck waiting + self.failUnlessEqual(len(self._sf.pending_requests), 5) + # but none should be marked as OVERDUE until the timers expire + self.failUnlessEqual(len(self._sf.overdue_requests), 0) + d.addCallback(_check_waiting) + def _mark_overdue(ign): + # declare four requests overdue, allowing new requests to take + # their place, and leaving one stuck. The finder will keep + # sending requests until there are 5 non-overdue ones + # outstanding, at which point we'll have 4 OVERDUE, 1 + # stuck-but-not-overdue, and 4 live requests. All 4 live requests + # will retire before the download is complete and the ShareFinder + # is shut off. That will leave 4 OVERDUE and 1 + # stuck-but-not-overdue, for a total of 5 requests in in + # _sf.pending_requests + for t in self._sf.overdue_timers.values()[:4]: + t.reset(-1.0) + # the timers ought to fire before the eventual-send does + return fireEventually() + d.addCallback(_mark_overdue) + def _we_are_done(): + return bool(done) + d.addCallback(lambda ign: self.poll(_we_are_done)) + def _check_done(ign): + self.failUnlessEqual(done, [immutable_plaintext]) + self.failUnlessEqual(len(self._sf.pending_requests), 5) + self.failUnlessEqual(len(self._sf.overdue_requests), 4) + d.addCallback(_check_done) + return d + + def test_3_good_7_hung_mutable(self): + raise unittest.SkipTest("still broken") d = defer.succeed(None) - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers")) - d.addCallback(lambda ign: self._hang(self.servers[2:3])) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._unhang(self.servers[2:3])) - d.addCallback(lambda ign: self._download_and_check()) + d.addCallback(lambda ign: self._set_up(True, "test_3_good_7_hung")) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._download_and_check()) return d - def test_2_good_8_hung_then_1_recovers_with_2_shares(self): + def test_2_good_8_hung_then_1_recovers_immutable(self): d = defer.succeed(None) - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares")) - d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) - d.addCallback(lambda ign: self._hang(self.servers[2:3])) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._unhang(self.servers[2:3])) - d.addCallback(lambda ign: self._download_and_check()) + d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers")) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) + return d + + def test_2_good_8_hung_then_1_recovers_mutable(self): + raise unittest.SkipTest("still broken") + d = defer.succeed(None) + d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers")) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) return d - def test_failover_during_stage_4(self): - # See #287 + def test_2_good_8_hung_then_1_recovers_with_2_shares_immutable(self): d = defer.succeed(None) - for mutable in [False]: - d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) - d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data)) - d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) - d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._start_download()) - def _after_starting_download((doned, started4d)): - started4d.addCallback(lambda ign: self._unhang(self.servers[3:4])) - doned.addCallback(self._check) - return doned - d.addCallback(_after_starting_download) + d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers_with_2_shares")) + d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) + return d + def test_2_good_8_hung_then_1_recovers_with_2_shares_mutable(self): + raise unittest.SkipTest("still broken") + d = defer.succeed(None) + d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers_with_2_shares")) + d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) return d diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index a7eaa1d..813c5be 100644 --- a/src/allmydata/test/test_immutable.py +++ b/src/allmydata/test/test_immutable.py @@ -5,7 +5,7 @@ from twisted.internet import defer from twisted.trial import unittest import random -class Test(common.ShareManglingMixin, unittest.TestCase): +class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase): def test_test_code(self): # The following process of stashing the shares, running # replace_shares, and asserting that the new set of shares equals the @@ -18,8 +18,9 @@ class Test(common.ShareManglingMixin, unittest.TestCase): return res d.addCallback(_stash_it) - # The following process of deleting 8 of the shares and asserting that you can't - # download it is more to test this test code than to test the Tahoe code... + # The following process of deleting 8 of the shares and asserting + # that you can't download it is more to test this test code than to + # test the Tahoe code... def _then_delete_8(unused=None): self.replace_shares(stash[0], storage_index=self.uri.get_storage_index()) for i in range(8): @@ -42,21 +43,24 @@ class Test(common.ShareManglingMixin, unittest.TestCase): return d def test_download(self): - """ Basic download. (This functionality is more or less already tested by test code in - other modules, but this module is also going to test some more specific things about - immutable download.) + """ Basic download. (This functionality is more or less already + tested by test code in other modules, but this module is also going + to test some more specific things about immutable download.) """ d = defer.succeed(None) before_download_reads = self._count_reads() def _after_download(unused=None): after_download_reads = self._count_reads() - self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) + #print before_download_reads, after_download_reads + self.failIf(after_download_reads-before_download_reads > 27, + (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d def test_download_from_only_3_remaining_shares(self): - """ Test download after 7 random shares (of the 10) have been removed. """ + """ Test download after 7 random shares (of the 10) have been + removed.""" d = defer.succeed(None) def _then_delete_7(unused=None): for i in range(7): @@ -65,13 +69,15 @@ class Test(common.ShareManglingMixin, unittest.TestCase): d.addCallback(_then_delete_7) def _after_download(unused=None): after_download_reads = self._count_reads() + #print before_download_reads, after_download_reads self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) d.addCallback(self._download_and_check_plaintext) d.addCallback(_after_download) return d def test_download_from_only_3_shares_with_good_crypttext_hash(self): - """ Test download after 7 random shares (of the 10) have had their crypttext hash tree corrupted. """ + """ Test download after 7 random shares (of the 10) have had their + crypttext hash tree corrupted.""" d = defer.succeed(None) def _then_corrupt_7(unused=None): shnums = range(10) @@ -84,39 +90,21 @@ class Test(common.ShareManglingMixin, unittest.TestCase): return d def test_download_abort_if_too_many_missing_shares(self): - """ Test that download gives up quickly when it realizes there aren't enough shares out - there.""" - d = defer.succeed(None) - def _then_delete_8(unused=None): - for i in range(8): - self._delete_a_share() - d.addCallback(_then_delete_8) - - before_download_reads = self._count_reads() - def _attempt_to_download(unused=None): - d2 = download_to_data(self.n) - - def _callb(res): - self.fail("Should have gotten an error from attempt to download, not %r" % (res,)) - def _errb(f): - self.failUnless(f.check(NotEnoughSharesError)) - d2.addCallbacks(_callb, _errb) - return d2 - - d.addCallback(_attempt_to_download) - - def _after_attempt(unused=None): - after_download_reads = self._count_reads() - # To pass this test, you are required to give up before actually trying to read any - # share data. - self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads)) - d.addCallback(_after_attempt) + """ Test that download gives up quickly when it realizes there aren't + enough shares out there.""" + for i in range(8): + self._delete_a_share() + d = self.shouldFail(NotEnoughSharesError, "delete 8", None, + download_to_data, self.n) + # the new downloader pipelines a bunch of read requests in parallel, + # so don't bother asserting anything about the number of reads return d def test_download_abort_if_too_many_corrupted_shares(self): - """ Test that download gives up quickly when it realizes there aren't enough uncorrupted - shares out there. It should be able to tell because the corruption occurs in the - sharedata version number, which it checks first.""" + """Test that download gives up quickly when it realizes there aren't + enough uncorrupted shares out there. It should be able to tell + because the corruption occurs in the sharedata version number, which + it checks first.""" d = defer.succeed(None) def _then_corrupt_8(unused=None): shnums = range(10) @@ -140,17 +128,22 @@ class Test(common.ShareManglingMixin, unittest.TestCase): def _after_attempt(unused=None): after_download_reads = self._count_reads() - # To pass this test, you are required to give up before reading all of the share - # data. Actually, we could give up sooner than 45 reads, but currently our download - # code does 45 reads. This test then serves as a "performance regression detector" - # -- if you change download code so that it takes *more* reads, then this test will - # fail. - self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads)) + #print before_download_reads, after_download_reads + # To pass this test, you are required to give up before reading + # all of the share data. Actually, we could give up sooner than + # 45 reads, but currently our download code does 45 reads. This + # test then serves as a "performance regression detector" -- if + # you change download code so that it takes *more* reads, then + # this test will fail. + self.failIf(after_download_reads-before_download_reads > 45, + (after_download_reads, before_download_reads)) d.addCallback(_after_attempt) return d -# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example +# XXX extend these tests to show bad behavior of various kinds from servers: +# raising exception from each remove_foo() method, for example # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit +# TODO: delete this whole file diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 30d1083..021e196 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -197,7 +197,7 @@ def make_nodemaker(s=None, num_peers=10): keygen = client.KeyGenerator() keygen.set_default_keysize(522) nodemaker = NodeMaker(storage_broker, sh, None, - None, None, None, + None, None, {"k": 3, "n": 10}, keygen) return nodemaker diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index 02264e4..bb30cc4 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -3,7 +3,7 @@ from allmydata.test import common from allmydata.monitor import Monitor from allmydata import check_results from allmydata.interfaces import NotEnoughSharesError -from allmydata.immutable import repairer, upload +from allmydata.immutable import upload from allmydata.util.consumer import download_to_data from twisted.internet import defer from twisted.trial import unittest @@ -363,99 +363,6 @@ WRITE_LEEWAY = 35 # Optimally, you could repair one of these (small) files in a single write. DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY -class DownUpConnector(unittest.TestCase): - def test_deferred_satisfaction(self): - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - # case 1: total data in buf is < requested data at time of request - duc.write('\x01') - d = duc.read_encrypted(2, False) - def _then(data): - self.failUnlessEqual(len(data), 2) - self.failUnlessEqual(data[0], '\x01') - self.failUnlessEqual(data[1], '\x02') - d.addCallback(_then) - duc.write('\x02') - return d - - def test_extra(self): - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - # case 1: total data in buf is < requested data at time of request - duc.write('\x01') - d = duc.read_encrypted(2, False) - def _then(data): - self.failUnlessEqual(len(data), 2) - self.failUnlessEqual(data[0], '\x01') - self.failUnlessEqual(data[1], '\x02') - d.addCallback(_then) - duc.write('\x02\0x03') - return d - - def test_short_reads_1(self): - # You don't get fewer bytes than you requested -- instead you get no callback at all. - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - - d = duc.read_encrypted(2, False) - duc.write('\x04') - - def _callb(res): - self.fail("Shouldn't have gotten this callback res: %s" % (res,)) - d.addCallback(_callb) - - # Also in the other order of read-vs-write: - duc2 = repairer.DownUpConnector() - duc2.registerProducer(None, True) # just because you have to call registerProducer first - duc2.write('\x04') - d = duc2.read_encrypted(2, False) - - def _callb2(res): - self.fail("Shouldn't have gotten this callback res: %s" % (res,)) - d.addCallback(_callb2) - - # But once the DUC is closed then you *do* get short reads. - duc3 = repairer.DownUpConnector() - duc3.registerProducer(None, True) # just because you have to call registerProducer first - - d = duc3.read_encrypted(2, False) - duc3.write('\x04') - duc3.close() - def _callb3(res): - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0], '\x04') - d.addCallback(_callb3) - return d - - def test_short_reads_2(self): - # Also in the other order of read-vs-write. - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - - duc.write('\x04') - d = duc.read_encrypted(2, False) - duc.close() - - def _callb(res): - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0], '\x04') - d.addCallback(_callb) - return d - - def test_short_reads_3(self): - # Also if it is closed before the read. - duc = repairer.DownUpConnector() - duc.registerProducer(None, True) # just because you have to call registerProducer first - - duc.write('\x04') - duc.close() - d = duc.read_encrypted(2, False) - def _callb(res): - self.failUnlessEqual(len(res), 1) - self.failUnlessEqual(res[0], '\x04') - d.addCallback(_callb) - return d - class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, common.ShouldFailMixin): diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 3351102..61662f2 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -9,7 +9,8 @@ from allmydata import uri from allmydata.storage.mutable import MutableShareFile from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode from allmydata.util import idlib, mathutil from allmydata.util import log, base32 from allmydata.util.consumer import MemoryConsumer, download_to_data diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 917472a..25d2d08 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -2086,3 +2086,11 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, # upload with exactly 75 peers (shares_of_happiness) # have a download fail # cancel a download (need to implement more cancel stuff) + +# from test_encode: +# NoNetworkGrid, upload part of ciphertext, kill server, continue upload +# check with Kevan, they want to live in test_upload, existing tests might cover +# def test_lost_one_shareholder(self): # these are upload-side tests +# def test_lost_one_shareholder_early(self): +# def test_lost_many_shareholders(self): +# def test_lost_all_shareholders(self): diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 0a326b3..2fceee5 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -7,12 +7,14 @@ from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.python import log +from hashlib import md5 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil from allmydata.util import assertutil, fileutil, deferredutil, abbreviate from allmydata.util import limiter, time_format, pollmixin, cachedir from allmydata.util import statistics, dictutil, pipeline from allmydata.util import log as tahoe_log +from allmydata.util.spans import Spans, overlap, DataSpans class Base32(unittest.TestCase): def test_b2a_matches_Pythons(self): @@ -1537,3 +1539,566 @@ class Log(unittest.TestCase): tahoe_log.err(format="intentional sample error", failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ") self.flushLoggedErrors(SampleError) + + +class SimpleSpans: + # this is a simple+inefficient form of util.spans.Spans . We compare the + # behavior of this reference model against the real (efficient) form. + + def __init__(self, _span_or_start=None, length=None): + self._have = set() + if length is not None: + for i in range(_span_or_start, _span_or_start+length): + self._have.add(i) + elif _span_or_start: + for (start,length) in _span_or_start: + self.add(start, length) + + def add(self, start, length): + for i in range(start, start+length): + self._have.add(i) + return self + + def remove(self, start, length): + for i in range(start, start+length): + self._have.discard(i) + return self + + def each(self): + return sorted(self._have) + + def __iter__(self): + items = sorted(self._have) + prevstart = None + prevend = None + for i in items: + if prevstart is None: + prevstart = prevend = i + continue + if i == prevend+1: + prevend = i + continue + yield (prevstart, prevend-prevstart+1) + prevstart = prevend = i + if prevstart is not None: + yield (prevstart, prevend-prevstart+1) + + def __len__(self): + # this also gets us bool(s) + return len(self._have) + + def __add__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.add(start, length) + return s + + def __sub__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.remove(start, length) + return s + + def __iadd__(self, other): + for (start, length) in other: + self.add(start, length) + return self + + def __isub__(self, other): + for (start, length) in other: + self.remove(start, length) + return self + + def __and__(self, other): + s = self.__class__() + for i in other.each(): + if i in self._have: + s.add(i, 1) + return s + + def __contains__(self, (start,length)): + for i in range(start, start+length): + if i not in self._have: + return False + return True + +class ByteSpans(unittest.TestCase): + def test_basic(self): + s = Spans() + self.failUnlessEqual(list(s), []) + self.failIf(s) + self.failIf((0,1) in s) + self.failUnlessEqual(len(s), 0) + + s1 = Spans(3, 4) # 3,4,5,6 + self._check1(s1) + + s2 = Spans(s1) + self._check1(s2) + + s2.add(10,2) # 10,11 + self._check1(s1) + self.failUnless((10,1) in s2) + self.failIf((10,1) in s1) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) + self.failUnlessEqual(len(s2), 6) + + s2.add(15,2).add(20,2) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) + self.failUnlessEqual(len(s2), 10) + + s2.remove(4,3).remove(15,1) + self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) + self.failUnlessEqual(len(s2), 6) + + s1 = SimpleSpans(3, 4) # 3 4 5 6 + s2 = SimpleSpans(5, 4) # 5 6 7 8 + i = s1 & s2 + self.failUnlessEqual(list(i.each()), [5, 6]) + + def _check1(self, s): + self.failUnlessEqual(list(s), [(3,4)]) + self.failUnless(s) + self.failUnlessEqual(len(s), 4) + self.failIf((0,1) in s) + self.failUnless((3,4) in s) + self.failUnless((3,1) in s) + self.failUnless((5,2) in s) + self.failUnless((6,1) in s) + self.failIf((6,2) in s) + self.failIf((7,1) in s) + self.failUnlessEqual(list(s.each()), [3,4,5,6]) + + def test_math(self): + s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 + s2 = Spans(5, 3) # 5,6,7 + s3 = Spans(8, 4) # 8,9,10,11 + + s = s1 - s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = s1 - s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = s2 - s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s1 & s2 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s2 & s1 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s1 & s3 + self.failUnlessEqual(list(s.each()), [8,9]) + s = s3 & s1 + self.failUnlessEqual(list(s.each()), [8,9]) + s = s2 & s3 + self.failUnlessEqual(list(s.each()), []) + s = s3 & s2 + self.failUnlessEqual(list(s.each()), []) + s = Spans() & s3 + self.failUnlessEqual(list(s.each()), []) + s = s3 & Spans() + self.failUnlessEqual(list(s.each()), []) + + s = s1 + s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = s1 + s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = s2 + s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + s = Spans(s1) + s -= s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = Spans(s1) + s -= s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = Spans(s2) + s -= s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + + s = Spans(s1) + s += s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = Spans(s1) + s += s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = Spans(s2) + s += s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + def test_random(self): + # attempt to increase coverage of corner cases by comparing behavior + # of a simple-but-slow model implementation against the + # complex-but-fast actual implementation, in a large number of random + # operations + S1 = SimpleSpans + S2 = Spans + s1 = S1(); s2 = S2() + seed = "" + def _create(subseed): + ns1 = S1(); ns2 = S2() + for i in range(10): + what = md5(subseed+str(i)).hexdigest() + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + ns1.add(start, length); ns2.add(start, length) + return ns1, ns2 + + #print + for i in range(1000): + what = md5(seed+str(i)).hexdigest() + op = what[0] + subop = what[1] + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + #print what + if op in "0": + if subop in "01234": + s1 = S1(); s2 = S2() + elif subop in "5678": + s1 = S1(start, length); s2 = S2(start, length) + else: + s1 = S1(s1); s2 = S2(s2) + #print "s2 = %s" % s2.dump() + elif op in "123": + #print "s2.add(%d,%d)" % (start, length) + s1.add(start, length); s2.add(start, length) + elif op in "456": + #print "s2.remove(%d,%d)" % (start, length) + s1.remove(start, length); s2.remove(start, length) + elif op in "78": + ns1, ns2 = _create(what[7:11]) + #print "s2 + %s" % ns2.dump() + s1 = s1 + ns1; s2 = s2 + ns2 + elif op in "9a": + ns1, ns2 = _create(what[7:11]) + #print "%s - %s" % (s2.dump(), ns2.dump()) + s1 = s1 - ns1; s2 = s2 - ns2 + elif op in "bc": + ns1, ns2 = _create(what[7:11]) + #print "s2 += %s" % ns2.dump() + s1 += ns1; s2 += ns2 + elif op in "de": + ns1, ns2 = _create(what[7:11]) + #print "%s -= %s" % (s2.dump(), ns2.dump()) + s1 -= ns1; s2 -= ns2 + else: + ns1, ns2 = _create(what[7:11]) + #print "%s &= %s" % (s2.dump(), ns2.dump()) + s1 = s1 & ns1; s2 = s2 & ns2 + #print "s2 now %s" % s2.dump() + self.failUnlessEqual(list(s1.each()), list(s2.each())) + self.failUnlessEqual(len(s1), len(s2)) + self.failUnlessEqual(bool(s1), bool(s2)) + self.failUnlessEqual(list(s1), list(s2)) + for j in range(10): + what = md5(what[12:14]+str(j)).hexdigest() + start = int(what[2:4], 16) + length = max(1, int(what[5:6], 16)) + span = (start, length) + self.failUnlessEqual(bool(span in s1), bool(span in s2)) + + + # s() + # s(start,length) + # s(s0) + # s.add(start,length) : returns s + # s.remove(start,length) + # s.each() -> list of byte offsets, mostly for testing + # list(s) -> list of (start,length) tuples, one per span + # (start,length) in s -> True if (start..start+length-1) are all members + # NOT equivalent to x in list(s) + # len(s) -> number of bytes, for testing, bool(), and accounting/limiting + # bool(s) (__len__) + # s = s1+s2, s1-s2, +=s1, -=s1 + + def test_overlap(self): + for a in range(20): + for b in range(10): + for c in range(20): + for d in range(10): + self._test_overlap(a,b,c,d) + + def _test_overlap(self, a, b, c, d): + s1 = set(range(a,a+b)) + s2 = set(range(c,c+d)) + #print "---" + #self._show_overlap(s1, "1") + #self._show_overlap(s2, "2") + o = overlap(a,b,c,d) + expected = s1.intersection(s2) + if not expected: + self.failUnlessEqual(o, None) + else: + start,length = o + so = set(range(start,start+length)) + #self._show(so, "o") + self.failUnlessEqual(so, expected) + + def _show_overlap(self, s, c): + import sys + out = sys.stdout + if s: + for i in range(max(s)): + if i in s: + out.write(c) + else: + out.write(" ") + out.write("\n") + +def extend(s, start, length, fill): + if len(s) >= start+length: + return s + assert len(fill) == 1 + return s + fill*(start+length-len(s)) + +def replace(s, start, data): + assert len(s) >= start+len(data) + return s[:start] + data + s[start+len(data):] + +class SimpleDataSpans: + def __init__(self, other=None): + self.missing = "" # "1" where missing, "0" where found + self.data = "" + if other: + for (start, data) in other.get_chunks(): + self.add(start, data) + + def __len__(self): + return len(self.missing.translate(None, "1")) + def _dump(self): + return [i for (i,c) in enumerate(self.missing) if c == "0"] + def _have(self, start, length): + m = self.missing[start:start+length] + if not m or len(m)" in body, body) body = " ".join(body.strip().split()) - exp = ("NotEnoughSharesError: This indicates that some " + msg = ("NotEnoughSharesError: This indicates that some " "servers were unavailable, or that shares have been " "lost to server departure, hard drive failure, or disk " "corruption. You should perform a filecheck on " "this object to learn more. The full error message is:" - " Failed to get enough shareholders: have 1, need 3") - self.failUnlessReallyEqual(exp, body) + " ran out of shares: %d complete, %d pending, 0 overdue," + " 0 unused, need 3. Last failure: None") + msg1 = msg % (1, 0) + msg2 = msg % (0, 1) + self.failUnless(body == msg1 or body == msg2, body) d.addCallback(_check_one_share) d.addCallback(lambda ignored: diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py index 13e4b51..3dc1d27 100644 --- a/src/allmydata/util/observer.py +++ b/src/allmydata/util/observer.py @@ -1,5 +1,6 @@ # -*- test-case-name: allmydata.test.test_observer -*- +import weakref from twisted.internet import defer from foolscap.api import eventually @@ -91,3 +92,47 @@ class ObserverList: def notify(self, *args, **kwargs): for o in self._watchers: eventually(o, *args, **kwargs) + +class EventStreamObserver: + """A simple class to distribute multiple events to a single subscriber. + It accepts arbitrary kwargs, but no posargs.""" + def __init__(self): + self._watcher = None + self._undelivered_results = [] + self._canceler = None + + def set_canceler(self, c, methname): + """I will call c.METHNAME(self) when somebody cancels me.""" + # we use a weakref to avoid creating a cycle between us and the thing + # we're observing: they'll be holding a reference to us to compare + # against the value we pass to their canceler function. However, + # since bound methods are first-class objects (and not kept alive by + # the object they're bound to), we can't just stash a weakref to the + # bound cancel method. Instead, we must hold a weakref to the actual + # object, and obtain its cancel method later. + # http://code.activestate.com/recipes/81253-weakmethod/ has an + # alternative. + self._canceler = (weakref.ref(c), methname) + + def subscribe(self, observer, **watcher_kwargs): + self._watcher = (observer, watcher_kwargs) + while self._undelivered_results: + self._notify(self._undelivered_results.pop(0)) + + def notify(self, **result_kwargs): + if self._watcher: + self._notify(result_kwargs) + else: + self._undelivered_results.append(result_kwargs) + + def _notify(self, result_kwargs): + o, watcher_kwargs = self._watcher + kwargs = dict(result_kwargs) + kwargs.update(watcher_kwargs) + eventually(o, **kwargs) + + def cancel(self): + wr,methname = self._canceler + o = wr() + if o: + getattr(o,methname)(self) diff --git a/src/allmydata/util/spans.py b/src/allmydata/util/spans.py new file mode 100755 index 0000000..2a199f0 --- /dev/null +++ b/src/allmydata/util/spans.py @@ -0,0 +1,431 @@ + +class Spans: + """I represent a compressed list of booleans, one per index (an integer). + Typically, each index represents an offset into a large string, pointing + to a specific byte of a share. In this context, True means that byte has + been received, or has been requested. + + Another way to look at this is maintaining a set of integers, optimized + for operations on spans like 'add range to set' and 'is range in set?'. + + This is a python equivalent of perl's Set::IntSpan module, frequently + used to represent .newsrc contents. + + Rather than storing an actual (large) list or dictionary, I represent my + internal state as a sorted list of spans, each with a start and a length. + My API is presented in terms of start+length pairs. I provide set + arithmetic operators, to efficiently answer questions like 'I want bytes + XYZ, I already requested bytes ABC, and I've already received bytes DEF: + what bytes should I request now?'. + + The new downloader will use it to keep track of which bytes we've requested + or received already. + """ + + def __init__(self, _span_or_start=None, length=None): + self._spans = list() + if length is not None: + self._spans.append( (_span_or_start, length) ) + elif _span_or_start: + for (start,length) in _span_or_start: + self.add(start, length) + self._check() + + def _check(self): + assert sorted(self._spans) == self._spans + prev_end = None + try: + for (start,length) in self._spans: + if prev_end is not None: + assert start > prev_end + prev_end = start+length + except AssertionError: + print "BAD:", self.dump() + raise + + def add(self, start, length): + assert start >= 0 + assert length > 0 + #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump()) + first_overlap = last_overlap = None + for i,(s_start,s_length) in enumerate(self._spans): + #print " (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length)) + if (overlap(s_start, s_length, start, length) + or adjacent(s_start, s_length, start, length)): + last_overlap = i + if first_overlap is None: + first_overlap = i + continue + # no overlap + if first_overlap is not None: + break + #print " first_overlap", first_overlap, last_overlap + if first_overlap is None: + # no overlap, so just insert the span and sort by starting + # position. + self._spans.insert(0, (start,length)) + self._spans.sort() + else: + # everything from [first_overlap] to [last_overlap] overlapped + first_start,first_length = self._spans[first_overlap] + last_start,last_length = self._spans[last_overlap] + newspan_start = min(start, first_start) + newspan_end = max(start+length, last_start+last_length) + newspan_length = newspan_end - newspan_start + newspan = (newspan_start, newspan_length) + self._spans[first_overlap:last_overlap+1] = [newspan] + #print " ADD done: %s" % self.dump() + self._check() + + return self + + def remove(self, start, length): + assert start >= 0 + assert length > 0 + #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump()) + first_complete_overlap = last_complete_overlap = None + for i,(s_start,s_length) in enumerate(self._spans): + s_end = s_start + s_length + o = overlap(s_start, s_length, start, length) + if o: + o_start, o_length = o + o_end = o_start+o_length + if o_start == s_start and o_end == s_end: + # delete this span altogether + if first_complete_overlap is None: + first_complete_overlap = i + last_complete_overlap = i + elif o_start == s_start: + # we only overlap the left side, so trim the start + # 1111 + # rrrr + # oo + # -> 11 + new_start = o_end + new_end = s_end + assert new_start > s_start + new_length = new_end - new_start + self._spans[i] = (new_start, new_length) + elif o_end == s_end: + # we only overlap the right side + # 1111 + # rrrr + # oo + # -> 11 + new_start = s_start + new_end = o_start + assert new_end < s_end + new_length = new_end - new_start + self._spans[i] = (new_start, new_length) + else: + # we overlap the middle, so create a new span. No need to + # examine any other spans. + # 111111 + # rr + # LL RR + left_start = s_start + left_end = o_start + left_length = left_end - left_start + right_start = o_end + right_end = s_end + right_length = right_end - right_start + self._spans[i] = (left_start, left_length) + self._spans.append( (right_start, right_length) ) + self._spans.sort() + break + if first_complete_overlap is not None: + del self._spans[first_complete_overlap:last_complete_overlap+1] + #print " REMOVE done: %s" % self.dump() + self._check() + return self + + def dump(self): + return "len=%d: %s" % (len(self), + ",".join(["[%d-%d]" % (start,start+l-1) + for (start,l) in self._spans]) ) + + def each(self): + for start, length in self._spans: + for i in range(start, start+length): + yield i + + def __iter__(self): + for s in self._spans: + yield s + + def __len__(self): + # this also gets us bool(s) + return sum([length for start,length in self._spans]) + + def __add__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.add(start, length) + return s + + def __sub__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.remove(start, length) + return s + + def __iadd__(self, other): + for (start, length) in other: + self.add(start, length) + return self + + def __isub__(self, other): + for (start, length) in other: + self.remove(start, length) + return self + + def __and__(self, other): + if not self._spans: + return self.__class__() + bounds = self.__class__(self._spans[0][0], + self._spans[-1][0]+self._spans[-1][1]) + not_other = bounds - other + return self - not_other + + def __contains__(self, (start,length)): + for span_start,span_length in self._spans: + o = overlap(start, length, span_start, span_length) + if o: + o_start,o_length = o + if o_start == start and o_length == length: + return True + return False + +def overlap(start0, length0, start1, length1): + # return start2,length2 of the overlapping region, or None + # 00 00 000 0000 00 00 000 00 00 00 00 + # 11 11 11 11 111 11 11 1111 111 11 11 + left = max(start0, start1) + right = min(start0+length0, start1+length1) + # if there is overlap, 'left' will be its start, and right-1 will + # be the end' + if left < right: + return (left, right-left) + return None + +def adjacent(start0, length0, start1, length1): + if (start0 < start1) and start0+length0 == start1: + return True + elif (start1 < start0) and start1+length1 == start0: + return True + return False + +class DataSpans: + """I represent portions of a large string. Equivalently, I can be said to + maintain a large array of characters (with gaps of empty elements). I can + be used to manage access to a remote share, where some pieces have been + retrieved, some have been requested, and others have not been read. + """ + + def __init__(self, other=None): + self.spans = [] # (start, data) tuples, non-overlapping, merged + if other: + for (start, data) in other.get_chunks(): + self.add(start, data) + + def __len__(self): + # return number of bytes we're holding + return sum([len(data) for (start,data) in self.spans]) + + def _dump(self): + # return iterator of sorted list of offsets, one per byte + for (start,data) in self.spans: + for i in range(start, start+len(data)): + yield i + + def dump(self): + return "len=%d: %s" % (len(self), + ",".join(["[%d-%d]" % (start,start+len(data)-1) + for (start,data) in self.spans]) ) + + def get_chunks(self): + return list(self.spans) + + def get_spans(self): + """Return a Spans object with a bit set for each byte I hold""" + return Spans([(start, len(data)) for (start,data) in self.spans]) + + def assert_invariants(self): + if not self.spans: + return + prev_start = self.spans[0][0] + prev_end = prev_start + len(self.spans[0][1]) + for start, data in self.spans[1:]: + if not start > prev_end: + # adjacent or overlapping: bad + print "ASSERTION FAILED", self.spans + raise AssertionError + + def get(self, start, length): + # returns a string of LENGTH, or None + #print "get", start, length, self.spans + end = start+length + for (s_start,s_data) in self.spans: + s_end = s_start+len(s_data) + #print " ",s_start,s_end + if s_start <= start < s_end: + # we want some data from this span. Because we maintain + # strictly merged and non-overlapping spans, everything we + # want must be in this span. + offset = start - s_start + if offset + length > len(s_data): + #print " None, span falls short" + return None # span falls short + #print " some", s_data[offset:offset+length] + return s_data[offset:offset+length] + if s_start >= end: + # we've gone too far: no further spans will overlap + #print " None, gone too far" + return None + #print " None, ran out of spans" + return None + + def add(self, start, data): + # first: walk through existing spans, find overlap, modify-in-place + # create list of new spans + # add new spans + # sort + # merge adjacent spans + #print "add", start, data, self.spans + end = start + len(data) + i = 0 + while len(data): + #print " loop", start, data, i, len(self.spans), self.spans + if i >= len(self.spans): + #print " append and done" + # append a last span + self.spans.append( (start, data) ) + break + (s_start,s_data) = self.spans[i] + # five basic cases: + # a: OLD b:OLDD c1:OLD c2:OLD d1:OLDD d2:OLD e: OLLDD + # NEW NEW NEW NEWW NEW NEW NEW + # + # we handle A by inserting a new segment (with "N") and looping, + # turning it into B or C. We handle B by replacing a prefix and + # terminating. We handle C (both c1 and c2) by replacing the + # segment (and, for c2, looping, turning it into A). We handle D + # by replacing a suffix (and, for d2, looping, turning it into + # A). We handle E by replacing the middle and terminating. + if start < s_start: + # case A: insert a new span, then loop with the remainder + #print " insert new psan" + s_len = s_start-start + self.spans.insert(i, (start, data[:s_len])) + i += 1 + start = s_start + data = data[s_len:] + continue + s_len = len(s_data) + s_end = s_start+s_len + if s_start <= start < s_end: + #print " modify this span", s_start, start, s_end + # we want to modify some data in this span: a prefix, a + # suffix, or the whole thing + if s_start == start: + if s_end <= end: + #print " replace whole segment" + # case C: replace this segment + self.spans[i] = (s_start, data[:s_len]) + i += 1 + start += s_len + data = data[s_len:] + # C2 is where len(data)>0 + continue + # case B: modify the prefix, retain the suffix + #print " modify prefix" + self.spans[i] = (s_start, data + s_data[len(data):]) + break + if start > s_start and end < s_end: + # case E: modify the middle + #print " modify middle" + prefix_len = start - s_start # we retain this much + suffix_len = s_end - end # and retain this much + newdata = s_data[:prefix_len] + data + s_data[-suffix_len:] + self.spans[i] = (s_start, newdata) + break + # case D: retain the prefix, modify the suffix + #print " modify suffix" + prefix_len = start - s_start # we retain this much + suffix_len = s_len - prefix_len # we replace this much + #print " ", s_data, prefix_len, suffix_len, s_len, data + self.spans[i] = (s_start, + s_data[:prefix_len] + data[:suffix_len]) + i += 1 + start += suffix_len + data = data[suffix_len:] + #print " now", start, data + # D2 is where len(data)>0 + continue + # else we're not there yet + #print " still looking" + i += 1 + continue + # now merge adjacent spans + #print " merging", self.spans + newspans = [] + for (s_start,s_data) in self.spans: + if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]), + s_start, len(s_data)): + newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data) + else: + newspans.append( (s_start, s_data) ) + self.spans = newspans + self.assert_invariants() + #print " done", self.spans + + def remove(self, start, length): + i = 0 + end = start + length + #print "remove", start, length, self.spans + while i < len(self.spans): + (s_start,s_data) = self.spans[i] + if s_start >= end: + # this segment is entirely right of the removed region, and + # all further segments are even further right. We're done. + break + s_len = len(s_data) + s_end = s_start + s_len + o = overlap(start, length, s_start, s_len) + if not o: + i += 1 + continue + o_start, o_len = o + o_end = o_start + o_len + if o_len == s_len: + # remove the whole segment + del self.spans[i] + continue + if o_start == s_start: + # remove a prefix, leaving the suffix from o_end to s_end + prefix_len = o_end - o_start + self.spans[i] = (o_end, s_data[prefix_len:]) + i += 1 + continue + elif o_end == s_end: + # remove a suffix, leaving the prefix from s_start to o_start + prefix_len = o_start - s_start + self.spans[i] = (s_start, s_data[:prefix_len]) + i += 1 + continue + # remove the middle, creating a new segment + # left is s_start:o_start, right is o_end:s_end + left_len = o_start - s_start + left = s_data[:left_len] + right_len = s_end - o_end + right = s_data[-right_len:] + self.spans[i] = (s_start, left) + self.spans.insert(i+1, (o_end, right)) + break + #print " done", self.spans + + def pop(self, start, length): + data = self.get(start, length) + if data: + self.remove(start, length) + return data diff --git a/src/allmydata/web/download-status.xhtml b/src/allmydata/web/download-status.xhtml index 77342ba..30abfca 100644 --- a/src/allmydata/web/download-status.xhtml +++ b/src/allmydata/web/download-status.xhtml @@ -18,6 +18,7 @@
  • Status:
  • +

    Download Results

    diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py index e4241a3..c3a55d7 100644 --- a/src/allmydata/web/status.py +++ b/src/allmydata/web/status.py @@ -358,6 +358,147 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): def download_results(self): return defer.maybeDeferred(self.download_status.get_results) + def relative_time(self, t): + if t is None: + return t + if self.download_status.started is not None: + return t - self.download_status.started + return t + def short_relative_time(self, t): + t = self.relative_time(t) + if t is None: + return "" + return "+%.6fs" % t + + def renderHTTP(self, ctx): + req = inevow.IRequest(ctx) + t = get_arg(req, "t") + if t == "json": + return self.json(req) + return rend.Page.renderHTTP(self, ctx) + + def json(self, req): + req.setHeader("content-type", "text/plain") + data = {} + dyhb_events = [] + for serverid,requests in self.download_status.dyhb_requests.iteritems(): + for req in requests: + dyhb_events.append( (base32.b2a(serverid),) + req ) + dyhb_events.sort(key=lambda req: req[1]) + data["dyhb"] = dyhb_events + request_events = [] + for serverid,requests in self.download_status.requests.iteritems(): + for req in requests: + request_events.append( (base32.b2a(serverid),) + req ) + request_events.sort(key=lambda req: (req[4],req[1])) + data["requests"] = request_events + data["segment"] = self.download_status.segment_events + data["read"] = self.download_status.read_events + return simplejson.dumps(data, indent=1) + "\n" + + def render_events(self, ctx, data): + if not self.download_status.storage_index: + return + srt = self.short_relative_time + l = T.ul() + + t = T.table(class_="status-download-events") + t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"], + T.td["shnums"], T.td["RTT"]]] + dyhb_events = [] + for serverid,requests in self.download_status.dyhb_requests.iteritems(): + for req in requests: + dyhb_events.append( (serverid,) + req ) + dyhb_events.sort(key=lambda req: req[1]) + for d_ev in dyhb_events: + (serverid, sent, shnums, received) = d_ev + serverid_s = idlib.shortnodeid_b2a(serverid) + rtt = received - sent + t[T.tr(style="background: %s" % self.color(serverid))[ + [T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)], + T.td[",".join([str(shnum) for shnum in shnums])], + T.td[self.render_time(None, rtt)], + ]]] + l["DYHB Requests:", t] + + t = T.table(class_="status-download-events") + t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"], + T.td["time"], T.td["decrypttime"], T.td["pausedtime"], + T.td["speed"]]] + for r_ev in self.download_status.read_events: + (start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev + print r_ev + if finishtime is not None: + rtt = finishtime - requesttime - paused + speed = self.render_rate(None, 1.0 * bytes / rtt) + rtt = self.render_time(None, rtt) + decrypt = self.render_time(None, decrypt) + paused = self.render_time(None, paused) + else: + speed, rtt, decrypt, paused = "","","","" + t[T.tr[T.td["[%d:+%d]" % (start, length)], + T.td[srt(requesttime)], T.td[srt(finishtime)], + T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused], + T.td[speed], + ]] + l["Read Events:", t] + + t = T.table(class_="status-download-events") + t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"], + T.td["decodetime"], T.td["segtime"], T.td["speed"]]] + reqtime = (None, None) + for s_ev in self.download_status.segment_events: + (etype, segnum, when, segstart, seglen, decodetime) = s_ev + if etype == "request": + t[T.tr[T.td["request"], T.td["seg%d" % segnum], + T.td[srt(when)]]] + reqtime = (segnum, when) + elif etype == "delivery": + if reqtime[0] == segnum: + segtime = when - reqtime[1] + speed = self.render_rate(None, 1.0 * seglen / segtime) + segtime = self.render_time(None, segtime) + else: + segtime, speed = "", "" + t[T.tr[T.td["delivery"], T.td["seg%d" % segnum], + T.td[srt(when)], + T.td["[%d:+%d]" % (segstart, seglen)], + T.td[self.render_time(None,decodetime)], + T.td[segtime], T.td[speed]]] + elif etype == "error": + t[T.tr[T.td["error"], T.td["seg%d" % segnum]]] + l["Segment Events:", t] + + t = T.table(border="1") + t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"], + T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]] + reqtime = (None, None) + request_events = [] + for serverid,requests in self.download_status.requests.iteritems(): + for req in requests: + request_events.append( (serverid,) + req ) + request_events.sort(key=lambda req: (req[4],req[1])) + for r_ev in request_events: + (peerid, shnum, start, length, sent, receivedlen, received) = r_ev + rtt = None + if received is not None: + rtt = received - sent + peerid_s = idlib.shortnodeid_b2a(peerid) + t[T.tr(style="background: %s" % self.color(peerid))[ + T.td[peerid_s], T.td[shnum], + T.td["[%d:+%d]" % (start, length)], + T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen], + T.td[self.render_time(None, rtt)], + ]] + l["Requests:", t] + + return l + + def color(self, peerid): + def m(c): + return min(ord(c) / 2 + 0x80, 0xff) + return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2])) + def render_results(self, ctx, data): d = self.download_results() def _got_results(results): @@ -371,7 +512,7 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): TIME_FORMAT = "%H:%M:%S %d-%b-%Y" started_s = time.strftime(TIME_FORMAT, time.localtime(data.get_started())) - return started_s + return started_s + " (%s)" % data.get_started() def render_si(self, ctx, data): si_s = base32.b2a_or_none(data.get_storage_index()) diff --git a/src/allmydata/web/tahoe.css b/src/allmydata/web/tahoe.css index a9aced6..0ed83fc 100644 --- a/src/allmydata/web/tahoe.css +++ b/src/allmydata/web/tahoe.css @@ -135,4 +135,14 @@ table.tahoe-directory { display: inline; text-align: center; padding: 0 1em; -} \ No newline at end of file +} + +/* recent upload/download status pages */ + +table.status-download-events { + border: 1px solid #aaa; +} +table.status-download-events td { + border: 1px solid #a00; + padding: 2px +}