hunk ./src/allmydata/interfaces.py 503 def get_used_space(): """ - Returns the amount of backend storage including overhead, in bytes, used - by this share. + Returns the amount of backend storage including overhead (which may + have to be estimated), in bytes, used by this share. """ def unlink(): hunk ./src/allmydata/storage/backends/s3/immutable.py 3 import struct +from cStringIO import StringIO from twisted.internet import defer hunk ./src/allmydata/storage/backends/s3/immutable.py 27 # data_length+0x0c: first lease. Each lease record is 72 bytes. -class ImmutableS3Share(object): - implements(IStoredShare) +class ImmutableS3ShareBase(object): + implements(IShareBase) # XXX sharetype = "immutable" LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility hunk ./src/allmydata/storage/backends/s3/immutable.py 35 HEADER = ">LLL" HEADER_SIZE = struct.calcsize(HEADER) - def __init__(self, s3bucket, storageindex, shnum, max_size=None, data=None): - """ - If max_size is not None then I won't allow more than max_size to be written to me. - - Clients should use the load_immutable_s3_share and create_immutable_s3_share - factory functions rather than creating instances directly. - """ + def __init__(self, s3bucket, storageindex, shnum): self._s3bucket = s3bucket self._storageindex = storageindex self._shnum = shnum hunk ./src/allmydata/storage/backends/s3/immutable.py 39 - self._max_size = max_size - self._data = data self._key = get_s3_share_key(storageindex, shnum) hunk ./src/allmydata/storage/backends/s3/immutable.py 40 - self._data_offset = self.HEADER_SIZE - self._loaded = False def __repr__(self): hunk ./src/allmydata/storage/backends/s3/immutable.py 42 - return ("" % (self._key,)) - - def load(self): - if self._max_size is not None: # creating share - # The second field, which was the four-byte share data length in - # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. - # We also write 0 for the number of leases. - self._home.setContent(struct.pack(self.HEADER, 1, 0, 0) ) - self._end_offset = self.HEADER_SIZE + self._max_size - self._size = self.HEADER_SIZE - self._writes = [] - self._loaded = True - return defer.succeed(None) - - if self._data is None: - # If we don't already have the data, get it from S3. - d = self._s3bucket.get_object(self._key) - else: - d = defer.succeed(self._data) - - def _got_data(data): - self._data = data - header = self._data[:self.HEADER_SIZE] - (version, unused, num_leases) = struct.unpack(self.HEADER, header) - - if version != 1: - msg = "%r had version %d but we wanted 1" % (self, version) - raise UnknownImmutableContainerVersionError(msg) - - # We cannot write leases in share files, but allow them to be present - # in case a share file is copied from a disk backend, or in case we - # need them in future. - self._size = len(self._data) - self._end_offset = self._size - (num_leases * self.LEASE_SIZE) - self._loaded = True - d.addCallback(_got_data) - return d - - def close(self): - # This will briefly use memory equal to double the share size. - # We really want to stream writes to S3, but I don't think txaws supports that yet - # (and neither does IS3Bucket, since that's a thin wrapper over the txaws S3 API). - - self._data = "".join(self._writes) - del self._writes - self._s3bucket.put_object(self._key, self._data) - return defer.succeed(None) - - def get_used_space(self): - return self._size + return ("<%s at %r>" % (self.__class__.__name__, self._key,)) def get_storage_index(self): return self._storageindex hunk ./src/allmydata/storage/backends/s3/immutable.py 53 def get_shnum(self): return self._shnum - def unlink(self): - self._data = None - self._writes = None - return self._s3bucket.delete_object(self._key) +class ImmutableS3ShareForWriting(ImmutableS3ShareBase): + implements(IShareForWriting) # XXX + + def __init__(self, s3bucket, storageindex, shnum, max_size): + """ + I won't allow more than max_size to be written to me. + """ + precondition(isinstance(max_size, (int, long)), max_size) + ImmutableS3ShareBase.__init__(self, s3bucket, storageindex, shnum) + self._max_size = max_size + self._end_offset = self.HEADER_SIZE + self._max_size + + self._buf = StringIO() + # The second field, which was the four-byte share data length in + # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. + # We also write 0 for the number of leases. + self._buf.write(struct.pack(self.HEADER, 1, 0, 0) ) + + def close(self): + # We really want to stream writes to S3, but txaws doesn't support + # that yet (and neither does IS3Bucket, since that's a thin wrapper + # over the txaws S3 API). See + # https://bugs.launchpad.net/txaws/+bug/767205 and + # https://bugs.launchpad.net/txaws/+bug/783801 + return self._s3bucket.put_object(self._key, self._buf.getvalue()) def get_allocated_size(self): return self._max_size hunk ./src/allmydata/storage/backends/s3/immutable.py 82 - def get_size(self): - return self._size + def write_share_data(self, offset, data): + self._buf.seek(offset) + self._buf.write(data) + if self._buf.tell() > self._max_size: + raise DataTooLargeError(self._max_size, offset, len(data)) + return defer.succeed(None) + +class ImmutableS3ShareForReading(object): + implements(IStoredShareForReading) # XXX + + def __init__(self, s3bucket, storageindex, shnum, data): + ImmutableS3ShareBase.__init__(self, s3bucket, storageindex, shnum) + self._data = data + + header = self._data[:self.HEADER_SIZE] + (version, unused, num_leases) = struct.unpack(self.HEADER, header) hunk ./src/allmydata/storage/backends/s3/immutable.py 99 - def get_data_length(self): - return self._end_offset - self._data_offset + if version != 1: + msg = "%r had version %d but we wanted 1" % (self, version) + raise UnknownImmutableContainerVersionError(msg) + + # We cannot write leases in share files, but allow them to be present + # in case a share file is copied from a disk backend, or in case we + # need them in future. + self._end_offset = len(self._data) - (num_leases * self.LEASE_SIZE) def readv(self, readv): datav = [] hunk ./src/allmydata/storage/backends/s3/immutable.py 119 # Reads beyond the end of the data are truncated. Reads that start # beyond the end of the data return an empty string. - seekpos = self._data_offset+offset + seekpos = self.HEADER_SIZE+offset actuallength = max(0, min(length, self._end_offset-seekpos)) if actuallength == 0: return defer.succeed("") hunk ./src/allmydata/storage/backends/s3/immutable.py 124 return defer.succeed(self._data[offset:offset+actuallength]) - - def write_share_data(self, offset, data): - length = len(data) - precondition(offset >= self._size, "offset = %r, size = %r" % (offset, self._size)) - if self._max_size is not None and offset+length > self._max_size: - raise DataTooLargeError(self._max_size, offset, length) - - if offset > self._size: - self._writes.append("\x00" * (offset - self._size)) - self._writes.append(data) - self._size = offset + len(data) - return defer.succeed(None) - - def add_lease(self, lease_info): - pass - - -def load_immutable_s3_share(s3bucket, storageindex, shnum, data=None): - return ImmutableS3Share(s3bucket, storageindex, shnum, data=data).load() - -def create_immutable_s3_share(s3bucket, storageindex, shnum, max_size): - return ImmutableS3Share(s3bucket, storageindex, shnum, max_size=max_size).load() hunk ./src/allmydata/storage/backends/s3/s3_backend.py 9 from allmydata.storage.common import si_a2b from allmydata.storage.bucket import BucketWriter from allmydata.storage.backends.base import Backend, ShareSet -from allmydata.storage.backends.s3.immutable import load_immutable_s3_share, create_immutable_s3_share +from allmydata.storage.backends.s3.immutable import ImmutableS3ShareForReading, ImmutableS3ShareForWriting from allmydata.storage.backends.s3.mutable import load_mutable_s3_share, create_mutable_s3_share from allmydata.storage.backends.s3.s3_common import get_s3_share_key, NUM_RE from allmydata.mutable.layout import MUTABLE_MAGIC hunk ./src/allmydata/storage/backends/s3/s3_backend.py 107 return load_mutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) else: # assume it's immutable - return load_immutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) + return ImmutableS3ShareForReading(self._s3bucket, self._storageindex, shnum, data=data) d.addCallback(_make_share) return d hunk ./src/allmydata/storage/backends/s3/s3_backend.py 116 return False def make_bucket_writer(self, storageserver, shnum, max_space_per_bucket, lease_info, canary): - d = create_immutable_s3_share(self._s3bucket, self.get_storage_index(), shnum, + immsh = ImmutableS3ShareForWriting(self._s3bucket, self.get_storage_index(), shnum, max_size=max_space_per_bucket) hunk ./src/allmydata/storage/backends/s3/s3_backend.py 118 - def _created(immsh): - return BucketWriter(storageserver, immsh, lease_info, canary) - d.addCallback(_created) - return d + return defer.succeed(BucketWriter(storageserver, immsh, lease_info, canary)) def _create_mutable_share(self, storageserver, shnum, write_enabler): serverid = storageserver.get_serverid()