diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index d512702..8f60232 100644 --- a/src/allmydata/immutable/downloader/share.py +++ b/src/allmydata/immutable/downloader/share.py @@ -725,6 +725,31 @@ class Share: # Reconsider the removal: maybe bring it back. ds = self._download_status + v = self._server.get_version() + if (v["http://allmydata.org/tahoe/protocols/storage/v1"] + ["has-immutable-readv"]): + # new-style readv() form + readv = list(ask) + if not readv: + return # nothing to do + lp = log.msg(format="%(share)s send_readv [%(span)s]", + share=repr(self), span=ask.dump(), + level=log.NOISY, parent=self._lp, umid="nByhWA") + block_ev = ds.add_block_request(self._server, self._shnum, + readv[0][0], readv[0][1], now()) + for (start, length) in readv: + self._pending.add(start, length) + d = self._rref.callRemote("readv", readv) + d.addCallback(self._got_datav, readv, lp) + d.addErrback(self._got_error, readv[0][0], readv[0][1], block_ev, lp) + d.addCallback(self._trigger_loop) + d.addErrback(lambda f: + log.err(format="unhandled error during send_request", + failure=f, parent=self._lp, + level=log.WEIRD, umid="qZu0wg")) + return d + + # old-style lots-of-read() form for (start, length) in ask: # TODO: quantize to reasonably-large blocks self._pending.add(start, length) @@ -747,6 +772,14 @@ class Share: def _send_request(self, start, length): return self._rref.callRemote("read", start, length) + def _got_datav(self, datav, readv, lp): + for i,(start,length) in enumerate(readv): + data = datav[i] + self._pending.remove(start, length) + self._received.add(start, data) + if len(data) < length: + self._unavailable.add(start+len(data), length-len(data)) + def _got_data(self, data, start, length, block_ev, lp): block_ev.finished(len(data), now()) if not self._alive: diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index cb45623..a1a6f30 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -30,6 +30,22 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests +TestVector = ListOf(TupleOf(Offset, ReadSize, str, str)) +# elements are (offset, length, operator, specimen) +# operator is one of "lt, le, eq, ne, ge, gt" +# nop always passes and is used to fetch data while writing. +# you should use length==len(specimen) for everything except nop +DataVector = ListOf(TupleOf(Offset, ShareData)) +# (offset, data). This limits us to 30 writes of 1MiB each per call +TestAndWriteVectorsForShares = DictOf(int, + TupleOf(TestVector, + DataVector, + ChoiceOf(None, Offset), # new_length + )) +ReadVector = ListOf(TupleOf(Offset, ReadSize)) +ReadData = ListOf(ShareData) +# returns data[offset:offset+length] for each element of TestVector + class RIStubClient(RemoteInterface): """Each client publishes a service announcement for a dummy object called the StubClient. This object doesn't actually offer any services, but the @@ -59,6 +75,8 @@ class RIBucketWriter(RemoteInterface): class RIBucketReader(RemoteInterface): def read(offset=Offset, length=ReadSize): return ShareData + def readv(readv=ReadVector): + return ReadData def advise_corrupt_share(reason=str): """Clients who discover hash failures in shares that they have @@ -72,22 +90,6 @@ class RIBucketReader(RemoteInterface): documentation. """ -TestVector = ListOf(TupleOf(Offset, ReadSize, str, str)) -# elements are (offset, length, operator, specimen) -# operator is one of "lt, le, eq, ne, ge, gt" -# nop always passes and is used to fetch data while writing. -# you should use length==len(specimen) for everything except nop -DataVector = ListOf(TupleOf(Offset, ShareData)) -# (offset, data). This limits us to 30 writes of 1MiB each per call -TestAndWriteVectorsForShares = DictOf(int, - TupleOf(TestVector, - DataVector, - ChoiceOf(None, Offset), # new_length - )) -ReadVector = ListOf(TupleOf(Offset, ReadSize)) -ReadData = ListOf(ShareData) -# returns data[offset:offset+length] for each element of TestVector - class RIStorageServer(RemoteInterface): __remote_name__ = "RIStorageServer.tahoe.allmydata.com" diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index a50ff42..8ca6a80 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -314,6 +314,16 @@ class BucketReader(Referenceable): self.ss.count("read") return data + def remote_readv(self, readv): + start = time.time() + datav = [] + for (offset, length) in readv: + datav.append(self._share_file.read_share_data(offset, length)) + self.ss.add_latency("immutable-readv", time.time() - start) + self.ss.count("immutable-readv") + self.ss.count("immutable-vectors", len(readv)) + return datav + def remote_advise_corrupt_share(self, reason): return self.ss.remote_advise_corrupt_share("immutable", self.storage_index, diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 1f39c9c..a9b452e 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -77,6 +77,7 @@ class StorageServer(service.MultiService, Referenceable): "write": [], "close": [], "read": [], + "immutable-readv": [], "get": [], "writev": [], # mutable "readv": [], @@ -224,6 +225,7 @@ class StorageServer(service.MultiService, Referenceable): "delete-mutable-shares-with-zero-length-writev": True, "fills-holes-with-zero-bytes": True, "prevents-read-past-end-of-share-data": True, + "has-immutable-readv": True, }, "application-version": str(allmydata.__full_version__), } @@ -491,6 +493,7 @@ class StorageServer(service.MultiService, Referenceable): def remote_slot_readv(self, storage_index, shares, readv): start = time.time() self.count("readv") + self.count("mutable-vectors", len(shares)*len(readv)) si_s = si_b2a(storage_index) lp = log.msg("storage: slot_readv %s %s" % (si_s, shares), facility="tahoe.storage", level=log.OPERATIONAL) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index aa696ed..d7bb38a 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -170,6 +170,9 @@ class NativeStorageServer: { "maximum-immutable-share-size": 2**32, "tolerates-immutable-read-overrun": False, "delete-mutable-shares-with-zero-length-writev": False, + "fills-holes-with-zero-bytes": False, + "prevents-read-past-end-of-share-data": False, + "has-immutable-readv": False, }, "application-version": "unknown: no get_version()", }