diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/immutable.py ticket999-v16/src/allmydata/storage/backends/s3/immutable.py *** ticket999-v13a/src/allmydata/storage/backends/s3/immutable.py 2011-09-28 00:34:12.806614999 +0100 --- ticket999-v16/src/allmydata/storage/backends/s3/immutable.py 2011-09-29 05:13:51.646615008 +0100 *************** *** 1,13 **** import struct ! from zope.interface import implements from allmydata.interfaces import IStoredShare from allmydata.util.assertutil import precondition from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, DataTooLargeError # Each share file (with key 'shares/$PREFIX/$STORAGEINDEX/$SHNUM') contains # lease information [currently inaccessible] and share data. The share data is --- 1,15 ---- import struct ! from twisted.internet import defer + from zope.interface import implements from allmydata.interfaces import IStoredShare from allmydata.util.assertutil import precondition from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, DataTooLargeError + from allmydata.storage.backends.s3.s3_common import get_s3_share_key # Each share file (with key 'shares/$PREFIX/$STORAGEINDEX/$SHNUM') contains # lease information [currently inaccessible] and share data. The share data is *************** *** 28,77 **** LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility HEADER = ">LLL" HEADER_SIZE = struct.calcsize(HEADER) ! def __init__(self, storageindex, shnum, s3bucket, max_size=None, data=None): """ If max_size is not None then I won't allow more than max_size to be written to me. """ ! precondition((max_size is not None) or (data is not None), max_size, data) self._storageindex = storageindex self._shnum = shnum - self._s3bucket = s3bucket self._max_size = max_size self._data = data ! sistr = self.get_storage_index_string() ! self._key = "shares/%s/%s/%d" % (sistr[:2], sistr, shnum) ! if data is None: # creating share # The second field, which was the four-byte share data length in # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. # We also write 0 for the number of leases. self._home.setContent(struct.pack(self.HEADER, 1, 0, 0) ) ! self._end_offset = self.HEADER_SIZE + max_size self._size = self.HEADER_SIZE self._writes = [] else: ! (version, unused, num_leases) = struct.unpack(self.HEADER, data[:self.HEADER_SIZE]) if version != 1: msg = "%r had version %d but we wanted 1" % (self, version) raise UnknownImmutableContainerVersionError(msg) # We cannot write leases in share files, but allow them to be present # in case a share file is copied from a disk backend, or in case we # need them in future. ! self._size = len(data) self._end_offset = self._size - (num_leases * self.LEASE_SIZE) ! self._data_offset = self.HEADER_SIZE ! ! def __repr__(self): ! return ("" % (self._key,)) def close(self): ! # TODO: finalize write to S3. ! pass def get_used_space(self): return self._size --- 30,101 ---- LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility HEADER = ">LLL" HEADER_SIZE = struct.calcsize(HEADER) ! def __init__(self, s3bucket, storageindex, shnum, max_size=None, data=None): """ If max_size is not None then I won't allow more than max_size to be written to me. + + Clients should use the load_immutable_s3_share and create_immutable_s3_share + factory functions rather than creating instances directly. """ ! self._s3bucket = s3bucket self._storageindex = storageindex self._shnum = shnum self._max_size = max_size self._data = data + self._key = get_s3_share_key(storageindex, shnum) + self._data_offset = self.HEADER_SIZE + self._loaded = False ! def __repr__(self): ! return ("" % (self._key,)) ! def load(self): ! if self._max_size is not None: # creating share # The second field, which was the four-byte share data length in # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. # We also write 0 for the number of leases. self._home.setContent(struct.pack(self.HEADER, 1, 0, 0) ) ! self._end_offset = self.HEADER_SIZE + self._max_size self._size = self.HEADER_SIZE self._writes = [] + self._loaded = True + return defer.succeed(None) + + if self._data is None: + # If we don't already have the data, get it from S3. + d = self._s3bucket.get_object(self._key) else: ! d = defer.succeed(self._data) ! ! def _got_data(data): ! self._data = data ! header = self._data[:self.HEADER_SIZE] ! (version, unused, num_leases) = struct.unpack(self.HEADER, header) if version != 1: msg = "%r had version %d but we wanted 1" % (self, version) raise UnknownImmutableContainerVersionError(msg) # We cannot write leases in share files, but allow them to be present # in case a share file is copied from a disk backend, or in case we # need them in future. ! self._size = len(self._data) self._end_offset = self._size - (num_leases * self.LEASE_SIZE) ! self._loaded = True ! d.addCallback(_got_data) ! return d def close(self): ! # This will briefly use memory equal to double the share size. ! # We really want to stream writes to S3, but I don't think txaws supports that yet ! # (and neither does IS3Bucket, since that's a thin wrapper over the txaws S3 API). ! ! self._data = "".join(self._writes) ! del self._writes ! self._s3bucket.put_object(self._key, self._data) ! return defer.succeed(None) def get_used_space(self): return self._size *************** *** 101,109 **** def readv(self, readv): datav = [] for (offset, length) in readv: datav.append(self.read_share_data(offset, length)) ! return datav def read_share_data(self, offset, length): precondition(offset >= 0) --- 125,133 ---- def readv(self, readv): datav = [] for (offset, length) in readv: datav.append(self.read_share_data(offset, length)) ! return defer.succeed(datav) def read_share_data(self, offset, length): precondition(offset >= 0) *************** *** 111,122 **** # beyond the end of the data return an empty string. seekpos = self._data_offset+offset actuallength = max(0, min(length, self._end_offset-seekpos)) if actuallength == 0: ! return "" ! ! # TODO: perform an S3 GET request, possibly with a Content-Range header. ! return "\x00"*actuallength def write_share_data(self, offset, data): length = len(data) precondition(offset >= self._size, "offset = %r, size = %r" % (offset, self._size)) --- 135,144 ---- # beyond the end of the data return an empty string. seekpos = self._data_offset+offset actuallength = max(0, min(length, self._end_offset-seekpos)) if actuallength == 0: ! return defer.succeed("") ! return defer.succeed(self._data[offset:offset+actuallength]) def write_share_data(self, offset, data): length = len(data) precondition(offset >= self._size, "offset = %r, size = %r" % (offset, self._size)) *************** *** 126,132 **** --- 148,162 ---- if offset > self._size: self._writes.append("\x00" * (offset - self._size)) self._writes.append(data) self._size = offset + len(data) + return defer.succeed(None) def add_lease(self, lease_info): pass + + + def load_immutable_s3_share(s3bucket, storageindex, shnum, data=None): + return ImmutableS3Share(s3bucket, storageindex, shnum, data=data).load() + + def create_immutable_s3_share(s3bucket, storageindex, shnum, max_size): + return ImmutableS3Share(s3bucket, storageindex, shnum, max_size=max_size).load() diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/mutable.py ticket999-v16/src/allmydata/storage/backends/s3/mutable.py *** ticket999-v13a/src/allmydata/storage/backends/s3/mutable.py 2011-09-28 00:34:12.796615003 +0100 --- ticket999-v16/src/allmydata/storage/backends/s3/mutable.py 2011-09-29 05:13:51.656615001 +0100 *************** *** 1,7 **** --- 1,9 ---- import struct + from twisted.internet import defer + from zope.interface import implements from allmydata.interfaces import IStoredMutableShare, BadWriteEnablerError from allmydata.util import fileutil, idlib, log *************** *** 11,18 **** --- 13,21 ---- from allmydata.storage.common import si_b2a, UnknownMutableContainerVersionError, \ DataTooLargeError from allmydata.storage.lease import LeaseInfo from allmydata.storage.backends.base import testv_compare + from allmydata.mutable.layout import MUTABLE_MAGIC # The MutableS3Share is like the ImmutableS3Share, but used for mutable data. # It has a different layout. See docs/mutable.rst for more details. *************** *** 51,67 **** assert LEASE_SIZE == 92 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE assert DATA_OFFSET == 468, DATA_OFFSET ! # our sharefiles share with a recognizable string, plus some random ! # binary data to reduce the chance that a regular text file will look ! # like a sharefile. ! MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" assert len(MAGIC) == 32 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary # TODO: decide upon a policy for max share size ! def __init__(self, storageindex, shnum, home, parent=None): self._storageindex = storageindex self._shnum = shnum self._home = home if self._home.exists(): --- 54,71 ---- assert LEASE_SIZE == 92 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE assert DATA_OFFSET == 468, DATA_OFFSET ! MAGIC = MUTABLE_MAGIC assert len(MAGIC) == 32 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary # TODO: decide upon a policy for max share size ! def __init__(self, home, storageindex, shnum, parent=None): ! """ ! Clients should use the load_mutable_s3_share and create_mutable_s3_share ! factory functions rather than creating instances directly. ! """ self._storageindex = storageindex self._shnum = shnum self._home = home if self._home.exists(): *************** *** 79,91 **** --- 83,100 ---- raise UnknownMutableContainerVersionError(msg) finally: f.close() self.parent = parent # for logging + self._loaded = False def log(self, *args, **kwargs): if self.parent: return self.parent.log(*args, **kwargs) + def load(self): + self._loaded = True + return defer.succeed(self) + def create(self, serverid, write_enabler): assert not self._home.exists() data_length = 0 extra_lease_offset = (self.HEADER_SIZE *************** *** 105,118 **** --- 114,130 ---- f.write(struct.pack(">L", num_extra_leases)) # extra leases go here, none at creation finally: f.close() + self._loaded = True + return defer.succeed(self) def __repr__(self): return ("" % (si_b2a(self._storageindex), self._shnum, quote_filepath(self._home))) def get_used_space(self): + assert self._loaded return fileutil.get_used_space(self._home) def get_storage_index(self): return self._storageindex *************** *** 124,131 **** --- 136,144 ---- return self._shnum def unlink(self): self._home.remove() + return defer.succeed(None) def _read_data_length(self, f): f.seek(self.DATA_LENGTH_OFFSET) (data_length,) = struct.unpack(">Q", f.read(8)) *************** *** 325,338 **** for (offset, length) in readv: datav.append(self._read_share_data(f, offset, length)) finally: f.close() ! return datav def get_size(self): return self._home.getsize() def get_data_length(self): f = self._home.open('rb') try: data_length = self._read_data_length(f) finally: --- 338,353 ---- for (offset, length) in readv: datav.append(self._read_share_data(f, offset, length)) finally: f.close() ! return defer.succeed(datav) def get_size(self): + assert self._loaded return self._home.getsize() def get_data_length(self): + assert self._loaded f = self._home.open('rb') try: data_length = self._read_data_length(f) finally: *************** *** 357,364 **** --- 372,380 ---- nodeid=idlib.nodeid_b2a(write_enabler_nodeid)) msg = "The write enabler was recorded by nodeid '%s'." % \ (idlib.nodeid_b2a(write_enabler_nodeid),) raise BadWriteEnablerError(msg) + return defer.succeed(None) def check_testv(self, testv): test_good = True f = self._home.open('rb+') *************** *** 369,377 **** test_good = False break finally: f.close() ! return test_good def writev(self, datav, new_length): f = self._home.open('rb+') try: --- 385,393 ---- test_good = False break finally: f.close() ! return defer.succeed(test_good) def writev(self, datav, new_length): f = self._home.open('rb+') try: *************** *** 385,398 **** # share data has shrunk, then call # self._change_container_size() here. finally: f.close() def close(self): ! pass ! def create_mutable_s3_share(storageindex, shnum, fp, serverid, write_enabler, parent): ! ms = MutableS3Share(storageindex, shnum, fp, parent) ! ms.create(serverid, write_enabler) ! del ms ! return MutableS3Share(storageindex, shnum, fp, parent) --- 401,415 ---- # share data has shrunk, then call # self._change_container_size() here. finally: f.close() + return defer.succeed(None) def close(self): ! return defer.succeed(None) ! + def load_mutable_s3_share(home, storageindex=None, shnum=None, parent=None): + return MutableS3Share(home, storageindex, shnum, parent).load() ! def create_mutable_s3_share(home, serverid, write_enabler, storageindex=None, shnum=None, parent=None): ! return MutableS3Share(home, storageindex, shnum, parent).create(serverid, write_enabler) diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/s3_backend.py ticket999-v16/src/allmydata/storage/backends/s3/s3_backend.py *** ticket999-v13a/src/allmydata/storage/backends/s3/s3_backend.py 2011-09-28 00:34:12.806614999 +0100 --- ticket999-v16/src/allmydata/storage/backends/s3/s3_backend.py 2011-09-29 05:13:52.626615006 +0100 *************** *** 1,63 **** ! import re ! ! from zope.interface import implements, Interface from allmydata.interfaces import IStorageBackend, IShareSet from allmydata.storage.common import si_a2b from allmydata.storage.bucket import BucketWriter from allmydata.storage.backends.base import Backend, ShareSet ! from allmydata.storage.backends.s3.immutable import ImmutableS3Share ! from allmydata.storage.backends.s3.mutable import MutableS3Share ! ! # The S3 bucket has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM . ! ! NUM_RE=re.compile("^[0-9]+$") ! ! ! class IS3Bucket(Interface): ! """ ! I represent an S3 bucket. ! """ ! def create(self): ! """ ! Create this bucket. ! """ ! ! def delete(self): ! """ ! Delete this bucket. ! The bucket must be empty before it can be deleted. ! """ ! ! def list_objects(self, prefix=""): ! """ ! Get a list of all the objects in this bucket whose object names start with ! the given prefix. ! """ ! ! def put_object(self, object_name, data, content_type=None, metadata={}): ! """ ! Put an object in this bucket. ! Any existing object of the same name will be replaced. ! """ ! ! def get_object(self, object_name): ! """ ! Get an object from this bucket. ! """ ! ! def head_object(self, object_name): ! """ ! Retrieve object metadata only. ! """ ! ! def delete_object(self, object_name): ! """ ! Delete an object from this bucket. ! Once deleted, there is no method to restore or undelete an object. ! """ class S3Backend(Backend): implements(IStorageBackend) --- 1,16 ---- ! from zope.interface import implements from allmydata.interfaces import IStorageBackend, IShareSet + from allmydata.util.deferredutil import gatherResults from allmydata.storage.common import si_a2b from allmydata.storage.bucket import BucketWriter from allmydata.storage.backends.base import Backend, ShareSet ! from allmydata.storage.backends.s3.immutable import load_immutable_s3_share, create_immutable_s3_share ! from allmydata.storage.backends.s3.mutable import load_mutable_s3_share, create_mutable_s3_share ! from allmydata.storage.backends.s3.s3_common import get_s3_share_key, NUM_RE ! from allmydata.mutable.layout import MUTABLE_MAGIC class S3Backend(Backend): implements(IStorageBackend) *************** *** 74,83 **** # we don't actually create the corruption-advisory dir until necessary self._corruption_advisory_dir = corruption_advisory_dir def get_sharesets_for_prefix(self, prefix): ! # TODO: query S3 for keys matching prefix ! return [] def get_shareset(self, storageindex): return S3ShareSet(storageindex, self._s3bucket) --- 27,53 ---- # we don't actually create the corruption-advisory dir until necessary self._corruption_advisory_dir = corruption_advisory_dir def get_sharesets_for_prefix(self, prefix): ! # XXX crawler.py needs to be changed to handle a Deferred return from this method. ! ! d = self._s3bucket.list_objects('shares/%s/' % (prefix,), '/') ! def _get_sharesets(res): ! # XXX this enumerates all shares to get the set of SIs. ! # Is there a way to enumerate SIs more efficiently? ! si_strings = set() ! for item in res.contents: ! # XXX better error handling ! path = item.key.split('/') ! assert path[0:2] == ["shares", prefix] ! si_strings.add(path[2]) ! ! # XXX we want this to be deterministic, so we return the sharesets sorted ! # by their si_strings, but we shouldn't need to explicitly re-sort them ! # because list_objects returns a sorted list. ! return [S3ShareSet(si_a2b(s), self._s3bucket) for s in sorted(si_strings)] ! d.addCallback(_get_sharesets) ! return d def get_shareset(self, storageindex): return S3ShareSet(storageindex, self._s3bucket) *************** *** 99,108 **** def __init__(self, storageindex, s3bucket): ShareSet.__init__(self, storageindex) self._s3bucket = s3bucket ! sistr = self.get_storage_index_string() ! self._key = 'shares/%s/%s/' % (sistr[:2], sistr) def get_overhead(self): return 0 --- 69,77 ---- def __init__(self, storageindex, s3bucket): ShareSet.__init__(self, storageindex) self._s3bucket = s3bucket ! self._key = get_s3_share_key(storageindex) def get_overhead(self): return 0 *************** *** 110,134 **** """ Generate IStorageBackendShare objects for shares we have for this storage index. ("Shares we have" means completed ones, excluding incoming ones.) """ ! pass def has_incoming(self, shnum): # TODO: this might need to be more like the disk backend; review callers return False def make_bucket_writer(self, storageserver, shnum, max_space_per_bucket, lease_info, canary): ! immsh = ImmutableS3Share(self.get_storage_index(), shnum, self._s3bucket, ! max_size=max_space_per_bucket) ! bw = BucketWriter(storageserver, immsh, lease_info, canary) ! return bw def _create_mutable_share(self, storageserver, shnum, write_enabler): - # TODO serverid = storageserver.get_serverid() ! return MutableS3Share(self.get_storage_index(), shnum, self._s3bucket, serverid, ! write_enabler, storageserver) def _clean_up_after_unlink(self): pass --- 79,130 ---- """ Generate IStorageBackendShare objects for shares we have for this storage index. ("Shares we have" means completed ones, excluding incoming ones.) """ ! d = self._s3bucket.list_objects(self._key, '/') ! def _get_shares(res): ! # XXX this enumerates all shares to get the set of SIs. ! # Is there a way to enumerate SIs more efficiently? ! shnums = [] ! for item in res.contents: ! assert item.key.startswith(self._key), item.key ! path = item.key.split('/') ! if len(path) == 4: ! shnumstr = path[3] ! if NUM_RE.match(shnumstr): ! shnums.add(int(shnumstr)) ! ! return gatherResults([self._load_share(shnum) for shnum in sorted(shnums)]) ! d.addCallback(_get_shares) ! return d ! ! def _load_share(self, shnum): ! d = self._s3bucket.get_object(self._key + str(shnum)) ! def _make_share(data): ! if data.startswith(MUTABLE_MAGIC): ! return load_mutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) ! else: ! # assume it's immutable ! return load_immutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) ! d.addCallback(_make_share) ! return d def has_incoming(self, shnum): # TODO: this might need to be more like the disk backend; review callers return False def make_bucket_writer(self, storageserver, shnum, max_space_per_bucket, lease_info, canary): ! d = create_immutable_s3_share(self._s3bucket, self.get_storage_index(), shnum, ! max_size=max_space_per_bucket) ! def _created(immsh): ! return BucketWriter(storageserver, immsh, lease_info, canary) ! d.addCallback(_created) ! return d def _create_mutable_share(self, storageserver, shnum, write_enabler): serverid = storageserver.get_serverid() ! return create_mutable_s3_share(self._s3bucket, self.get_storage_index(), shnum, serverid, ! write_enabler, storageserver) def _clean_up_after_unlink(self): pass diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/s3_common.py ticket999-v16/src/allmydata/storage/backends/s3/s3_common.py *** ticket999-v13a/src/allmydata/storage/backends/s3/s3_common.py 1970-01-01 01:00:00.000000000 +0100 --- ticket999-v16/src/allmydata/storage/backends/s3/s3_common.py 2011-09-29 05:13:51.666614998 +0100 *************** *** 0 **** --- 1,62 ---- + + import re + + from zope.interface import Interface + + from allmydata.storage.common import si_b2a + + + # The S3 bucket has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM . + + def get_s3_share_key(si, shnum=None): + sistr = si_b2a(si) + if shnum is None: + return "shares/%s/%s/" % (sistr[:2], sistr) + else: + return "shares/%s/%s/%d" % (sistr[:2], sistr, shnum) + + NUM_RE=re.compile("^[0-9]+$") + + + class IS3Bucket(Interface): + """ + I represent an S3 bucket. + """ + def create(self): + """ + Create this bucket. + """ + + def delete(self): + """ + Delete this bucket. + The bucket must be empty before it can be deleted. + """ + + def list_objects(self, prefix=""): + """ + Get a list of all the objects in this bucket whose object names start with + the given prefix. + """ + + def put_object(self, object_name, data, content_type=None, metadata={}): + """ + Put an object in this bucket. + Any existing object of the same name will be replaced. + """ + + def get_object(self, object_name): + """ + Get an object from this bucket. + """ + + def head_object(self, object_name): + """ + Retrieve object metadata only. + """ + + def delete_object(self, object_name): + """ + Delete an object from this bucket. + Once deleted, there is no method to restore or undelete an object. + """