diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 33c16cf..04482e6 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -130,8 +130,9 @@ class DownloadNode: # for concurrent operations: each gets its own Segmentation manager if size is None: size = self._verifycap.size - # clip size so offset+size does not go past EOF - size = min(size, self._verifycap.size-offset) + # ignore overruns: clip size so offset+size does not go past EOF, and + # so size is not negative (which indicates that offset >= EOF) + size = max(0, min(size, self._verifycap.size-offset)) if read_ev is None: read_ev = self._download_status.add_read_event(offset, size, now()) @@ -143,6 +144,10 @@ class DownloadNode: sp = self._history.stats_provider sp.count("downloader.files_downloaded", 1) # really read() calls sp.count("downloader.bytes_downloaded", size) + if size == 0: + read_ev.finished(now()) + # no data, so no producer, so no register/unregisterProducer + return defer.succeed(consumer) s = Segmentation(self, offset, size, consumer, read_ev, lp) # this raises an interesting question: what segments to fetch? if # offset=0, always fetch the first segment, and then allow diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index c447448..df7a3a1 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -316,6 +316,9 @@ class Encoder(object): # of additional shares which can be substituted if the primary ones # are unavailable + # we read data from the source one segment at a time, and then chop + # it into 'input_piece_size' pieces before handing it to the codec + crypttext_segment_hasher = hashutil.crypttext_segment_hasher() # memory footprint: we only hold a tiny piece of the plaintext at any @@ -350,8 +353,7 @@ class Encoder(object): crypttext_segment_hasher = hashutil.crypttext_segment_hasher() d = self._gather_data(self.required_shares, input_piece_size, - crypttext_segment_hasher, - allow_short=True) + crypttext_segment_hasher, allow_short=True) def _done_gathering(chunks): for c in chunks: # a short trailing chunk will have been padded by @@ -369,58 +371,50 @@ class Encoder(object): def _gather_data(self, num_chunks, input_chunk_size, crypttext_segment_hasher, - allow_short=False, - previous_chunks=[]): + allow_short=False): """Return a Deferred that will fire when the required number of chunks have been read (and hashed and encrypted). The Deferred fires - with the combination of any 'previous_chunks' and the new chunks - which were gathered.""" + with a list of chunks, each of size input_chunk_size.""" + + # I originally built this to allow read_encrypted() to behave badly: + # to let it return more or less data than you asked for. It would + # stash the leftovers until later, and then recurse until it got + # enough. I don't think that was actually useful. + # + # who defines read_encrypted? + # offloaded.LocalCiphertextReader: real disk file: exact + # upload.EncryptAnUploadable: Uploadable, but a wrapper that makes + # it exact. The return value is a list of 50KiB chunks, to reduce + # the memory footprint of the encryption process. + # repairer.Repairer: immutable.filenode.CiphertextFileNode: exact + # + # This has been redefined to require read_encrypted() to behave like + # a local file: return exactly the amount requested unless it hits + # EOF. + # -warner if self._aborted: raise UploadAborted() - if not num_chunks: - return defer.succeed(previous_chunks) - - d = self._uploadable.read_encrypted(input_chunk_size, False) + read_size = num_chunks * input_chunk_size + d = self._uploadable.read_encrypted(read_size, hash_only=False) def _got(data): + assert isinstance(data, (list,tuple)) if self._aborted: raise UploadAborted() - encrypted_pieces = [] - length = 0 - while data: - encrypted_piece = data.pop(0) - length += len(encrypted_piece) - crypttext_segment_hasher.update(encrypted_piece) - self._crypttext_hasher.update(encrypted_piece) - encrypted_pieces.append(encrypted_piece) - - precondition(length <= input_chunk_size, - "length=%d > input_chunk_size=%d" % - (length, input_chunk_size)) - if allow_short: - if length < input_chunk_size: - # padding - pad_size = input_chunk_size - length - encrypted_pieces.append('\x00' * pad_size) - else: - # non-tail segments should be the full segment size - if length != input_chunk_size: - log.msg("non-tail segment should be full segment size: %d!=%d" - % (length, input_chunk_size), - level=log.BAD, umid="jNk5Yw") - precondition(length == input_chunk_size, - "length=%d != input_chunk_size=%d" % - (length, input_chunk_size)) - - encrypted_piece = "".join(encrypted_pieces) - return previous_chunks + [encrypted_piece] - + data = "".join(data) + precondition(len(data) <= read_size, len(data), read_size) + if not allow_short: + precondition(len(data) == read_size, len(data), read_size) + crypttext_segment_hasher.update(data) + self._crypttext_hasher.update(data) + if allow_short and len(data) < read_size: + # padding + data += "\x00" * (read_size - len(data)) + encrypted_pieces = [data[i:i+input_chunk_size] + for i in range(0, len(data), input_chunk_size)] + return encrypted_pieces d.addCallback(_got) - d.addCallback(lambda chunks: - self._gather_data(num_chunks-1, input_chunk_size, - crypttext_segment_hasher, - allow_short, chunks)) return d def _send_segment(self, (shares, shareids), segnum): diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index c5a47e1..48094a9 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1622,7 +1622,11 @@ class IUploadable(Interface): If the data must be acquired through multiple internal read operations, returning a list instead of a single string may help to - reduce string copies. + reduce string copies. However, the length of the concatenated strings + must equal the amount of data requested, unless EOF is encountered. + Long reads, or short reads without EOF, are not allowed. read() + should return the same amount of data as a local disk file read, just + in a different shape and asynchronously. 'length' will typically be equal to (min(get_size(),1MB)/req_shares), so a 10kB file means length=3kB, 100kB file means length=30kB, diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index 49c4cff..942d327 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -672,6 +672,35 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, return d #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet." + def test_tiny_reads(self): + # ticket #1223 points out three problems: + # repairer reads beyond end of input file + # new-downloader does not tolerate overreads + # uploader does lots of tiny reads, inefficient + self.basedir = "repairer/Repairer/test_tiny_reads" + self.set_up_grid() + c0 = self.g.clients[0] + DATA = "a"*135 + c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22 + c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66 + d = c0.upload(upload.Data(DATA, convergence="")) + def _then(ur): + self.uri = ur.uri + self.delete_shares_numbered(self.uri, [0]) + self.c0_filenode = c0.create_node_from_uri(ur.uri) + self._stash_counts() + return self.c0_filenode.check_and_repair(Monitor()) + d.addCallback(_then) + def _check(ign): + (r,a,w) = self._get_delta_counts() + # when the uploader (driven by the repairer) does full-segment + # reads, this makes 44 server read calls (2*k). Before, when it + # was doing input_chunk_size reads (7 bytes), it was doing over + # 400. + self.failIf(r > 100, "too many reads: %d>100" % r) + d.addCallback(_check) + return d + # XXX extend these tests to show that the checker detects which specific # share on which specific server is broken -- this is necessary so that the