diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index 44c8e95..ed3785b 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -3,7 +3,7 @@ import binascii import copy import time now = time.time -from zope.interface import implements +from zope.interface import implements, Interface from twisted.internet import defer from twisted.internet.interfaces import IConsumer @@ -19,36 +19,49 @@ from allmydata.immutable.repairer import Repairer from allmydata.immutable.downloader.node import DownloadNode from allmydata.immutable.downloader.status import DownloadStatus +class IDownloadStatusHandlingConsumer(Interface): + def set_download_status_read_event(read_ev): + """Record the DownloadStatus 'read event', to be updated with the + time it takes to decrypt each chunk of data.""" + class CiphertextFileNode: def __init__(self, verifycap, storage_broker, secret_holder, - terminator, history, download_status=None): + terminator, history): assert isinstance(verifycap, uri.CHKFileVerifierURI) self._verifycap = verifycap self._storage_broker = storage_broker self._secret_holder = secret_holder - if download_status is None: - ds = DownloadStatus(verifycap.storage_index, verifycap.size) - if history: - history.add_download(ds) - download_status = ds self._terminator = terminator self._history = history - self._download_status = download_status + self._download_status = None self._node = None # created lazily, on read() def _maybe_create_download_node(self): + if not self._download_status: + ds = DownloadStatus(self._verifycap.storage_index, + self._verifycap.size) + if self._history: + self._history.add_download(ds) + self._download_status = ds if self._node is None: self._node = DownloadNode(self._verifycap, self._storage_broker, self._secret_holder, self._terminator, self._history, self._download_status) - def read(self, consumer, offset=0, size=None, read_ev=None): + def read(self, consumer, offset=0, size=None): """I am the main entry point, from which FileNode.read() can get data. I feed the consumer with the desired range of ciphertext. I return a Deferred that fires (with the consumer) when the read is finished.""" self._maybe_create_download_node() + actual_size = size + if actual_size is None: + actual_size = self._verifycap.size - offset + read_ev = self._download_status.add_read_event(offset, actual_size, + now()) + if IDownloadStatusHandlingConsumer.providedBy(consumer): + consumer.set_download_status_read_event(read_ev) return self._node.read(consumer, offset, size, read_ev) def get_segment(self, segnum): @@ -155,17 +168,16 @@ class CiphertextFileNode: monitor=monitor) return v.start() - class DecryptingConsumer: """I sit between a CiphertextDownloader (which acts as a Producer) and the real Consumer, decrypting everything that passes by. The real Consumer sees the real Producer, but the Producer sees us instead of the real consumer.""" - implements(IConsumer) + implements(IConsumer, IDownloadStatusHandlingConsumer) - def __init__(self, consumer, readkey, offset, read_event): + def __init__(self, consumer, readkey, offset): self._consumer = consumer - self._read_event = read_event + self._read_event = None # TODO: pycryptopp CTR-mode needs random-access operations: I want # either a=AES(readkey, offset) or better yet both of: # a=AES(readkey, offset=0) @@ -177,6 +189,9 @@ class DecryptingConsumer: self._decryptor = AES(readkey, iv=iv) self._decryptor.process("\x00"*offset_small) + def set_download_status_read_event(self, read_ev): + self._read_event = read_ev + def registerProducer(self, producer, streaming): # this passes through, so the real consumer can flow-control the real # producer. Therefore we don't need to provide any IPushProducer @@ -188,8 +203,9 @@ class DecryptingConsumer: def write(self, ciphertext): started = now() plaintext = self._decryptor.process(ciphertext) - elapsed = now() - started - self._read_event.update(0, elapsed, 0) + if self._read_event: + elapsed = now() - started + self._read_event.update(0, elapsed, 0) self._consumer.write(plaintext) class ImmutableFileNode: @@ -200,12 +216,8 @@ class ImmutableFileNode: history): assert isinstance(filecap, uri.CHKFileURI) verifycap = filecap.get_verify_cap() - ds = DownloadStatus(verifycap.storage_index, verifycap.size) - if history: - history.add_download(ds) - self._download_status = ds self._cnode = CiphertextFileNode(verifycap, storage_broker, - secret_holder, terminator, history, ds) + secret_holder, terminator, history) assert isinstance(filecap, uri.CHKFileURI) self.u = filecap self._readkey = filecap.key @@ -226,14 +238,8 @@ class ImmutableFileNode: return True def read(self, consumer, offset=0, size=None): - actual_size = size - if actual_size == None: - actual_size = self.u.size - actual_size = actual_size - offset - read_ev = self._download_status.add_read_event(offset,actual_size, - now()) - decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev) - d = self._cnode.read(decryptor, offset, size, read_ev) + decryptor = DecryptingConsumer(consumer, self._readkey, offset) + d = self._cnode.read(decryptor, offset, size) d.addCallback(lambda dc: consumer) return d