2 patches for repository davidsarah@dev.allmydata.org:/home/darcs/tahoe/trunk: Wed Jul 27 01:05:41 BST 2011 david-sarah@jacaranda.org * Drop upload frontend (updated), with more tests. Tests now pass on Windows. refs #1429 Wed Jul 27 03:30:03 BST 2011 david-sarah@jacaranda.org * drop-upload: make counts visible on the statistics page, and disable some debugging. refs #1429 New patches: [Drop upload frontend (updated), with more tests. Tests now pass on Windows. refs #1429 david-sarah@jacaranda.org**20110727000541 Ignore-this: d67c37a4db86c3d37a1c4b16ff299df5 ] { hunk ./src/allmydata/_auto_deps.py 22 "zope.interface == 3.3.1, == 3.5.3, == 3.6.1", # On Windows we need at least Twisted 9.0 to avoid an indirect dependency on pywin32. + # On Linux we need at least Twisted 10.1.0 for inotify support used by the drop-upload + # frontend. # We also need Twisted 10.1 for the FTP frontend in order for Twisted's FTP server to # support asynchronous close. "Twisted >= 10.1.0", hunk ./src/allmydata/client.py 153 # ControlServer and Helper are attached after Tub startup self.init_ftp_server() self.init_sftp_server() + self.init_drop_uploader() hotline_file = os.path.join(self.basedir, self.SUICIDE_PREVENTION_HOTLINE_FILE) hunk ./src/allmydata/client.py 441 sftp_portstr, pubkey_file, privkey_file) s.setServiceParent(self) + def init_drop_uploader(self): + if self.get_config("drop_upload", "enabled", False, boolean=True): + upload_uri = self.get_config("drop_upload", "upload.uri", None) + local_dir_utf8 = self.get_config("drop_upload", "local.directory", None) + + if upload_uri and local_dir_utf8: + try: + from allmydata.frontends import drop_upload + s = drop_upload.DropUploader(self, upload_uri, local_dir_utf8) + s.setServiceParent(self) + except Exception, e: + self.log("couldn't start drop-uploader: %r", args=(e,)) + else: + self.log("couldn't start drop-uploader: upload.uri or local.directory not specified") + def _check_hotline(self, hotline_file): if os.path.exists(hotline_file): mtime = os.stat(hotline_file)[stat.ST_MTIME] addfile ./src/allmydata/frontends/drop_upload.py hunk ./src/allmydata/frontends/drop_upload.py 1 + +import os, sys + +from twisted.internet import defer +from twisted.python.filepath import FilePath +from twisted.application import service + +from allmydata.interfaces import IDirectoryNode + +from allmydata.util.encodingutil import quote_output +from allmydata.immutable.upload import FileName + + +class DropUploader(service.MultiService): + def __init__(self, client, upload_uri, local_dir_utf8, inotify=None): + service.MultiService.__init__(self) + + try: + local_dir = os.path.expanduser(local_dir_utf8.decode('utf-8').encode(sys.getfilesystemencoding())) + except (UnicodeEncodeError, UnicodeDecodeError): + raise AssertionError("The drop-upload path %r was not valid UTF-8 or could not be represented in the filesystem encoding." + % quote_output(local_dir_utf8)) + + self._client = client + self._convergence = client.convergence + self._local_path = FilePath(local_dir) + self.uploaded = 0 + self.failed = 0 + self.disappeared = 0 + + if inotify is None: + from twisted.internet import inotify + self._inotify = inotify + + if not self._local_path.isdir(): + raise AssertionError("The drop-upload local path %r was not an existing directory." % quote_output(local_dir)) + + # TODO: allow a path rather than an URI. + self._parent = self._client.create_node_from_uri(upload_uri) + if not IDirectoryNode.providedBy(self._parent): + raise AssertionError("The drop-upload remote URI is not a directory URI.") + if self._parent.is_unknown() or self._parent.is_readonly(): + raise AssertionError("The drop-upload remote URI does not refer to a writeable directory.") + + self._uploaded_callback = lambda ign: None + + self._notifier = inotify.INotify() + self._notifier.startReading() + + # We don't watch for IN_CREATE, because that would cause us to read and upload a + # possibly-incomplete file before the application has closed it. There should always + # be an IN_CLOSE_WRITE after an IN_CREATE (I think). + # TODO: what about IN_MOVE_SELF? + mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO + self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify]) + + def _notify(self, opaque, path, events_mask): + self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) + + d = defer.succeed(None) + + # FIXME: if this already exists as a mutable file, we replace the directory entry, + # but we should probably modify the file (as the SFTP frontend does). + def _add_file(ign): + name = path.basename().decode(sys.getfilesystemencoding()) + u = FileName(path.path, self._convergence) + return self._parent.add_file(name, u) + d.addCallback(_add_file) + + def _succeeded(ign): + self.uploaded += 1 + def _failed(f): + if path.exists(): + self._log("drop-upload: %r failed to upload due to %r" % (path.path, f)) + self.failed += 1 + return f + else: + self._log("drop-upload: notified file %r disappeared " + "(this is normal for temporary files): %r" % (path.path, f)) + self.disappeared += 1 + return None + d.addCallbacks(_succeeded, _failed) + d.addBoth(self._uploaded_callback) + return d + + def set_uploaded_callback(self, callback): + """This sets a function that will be called after a file has been uploaded.""" + self._uploaded_callback = callback + + def finish(self): + self._notifier.stopReading() + + def _log(self, msg): + self._client.log(msg) + open("events", "ab+").write(msg) hunk ./src/allmydata/scripts/create_node.py 155 c.write("enabled = false\n") c.write("\n") + c.write("[drop_upload]\n") + c.write("# Shall this node automatically upload files created or modified in a local directory?\n") + c.write("enabled = false\n") + c.write("# This must be an URI for a writeable directory.\n") + c.write("upload.uri =\n") + c.write("local.directory = ~/drop_upload\n") + c.write("\n") + c.close() from allmydata.util import fileutil addfile ./src/allmydata/test/fake_inotify.py hunk ./src/allmydata/test/fake_inotify.py 1 + +# Most of this is copied from Twisted 11.0. The reason for this hack is that +# twisted.internet.inotify can't be imported when the platform does not support inotify. + + +# from /usr/src/linux/include/linux/inotify.h + +IN_ACCESS = 0x00000001L # File was accessed +IN_MODIFY = 0x00000002L # File was modified +IN_ATTRIB = 0x00000004L # Metadata changed +IN_CLOSE_WRITE = 0x00000008L # Writeable file was closed +IN_CLOSE_NOWRITE = 0x00000010L # Unwriteable file closed +IN_OPEN = 0x00000020L # File was opened +IN_MOVED_FROM = 0x00000040L # File was moved from X +IN_MOVED_TO = 0x00000080L # File was moved to Y +IN_CREATE = 0x00000100L # Subfile was created +IN_DELETE = 0x00000200L # Subfile was delete +IN_DELETE_SELF = 0x00000400L # Self was deleted +IN_MOVE_SELF = 0x00000800L # Self was moved +IN_UNMOUNT = 0x00002000L # Backing fs was unmounted +IN_Q_OVERFLOW = 0x00004000L # Event queued overflowed +IN_IGNORED = 0x00008000L # File was ignored + +IN_ONLYDIR = 0x01000000 # only watch the path if it is a directory +IN_DONT_FOLLOW = 0x02000000 # don't follow a sym link +IN_MASK_ADD = 0x20000000 # add to the mask of an already existing watch +IN_ISDIR = 0x40000000 # event occurred against dir +IN_ONESHOT = 0x80000000 # only send event once + +IN_CLOSE = IN_CLOSE_WRITE | IN_CLOSE_NOWRITE # closes +IN_MOVED = IN_MOVED_FROM | IN_MOVED_TO # moves +IN_CHANGED = IN_MODIFY | IN_ATTRIB # changes + +IN_WATCH_MASK = (IN_MODIFY | IN_ATTRIB | + IN_CREATE | IN_DELETE | + IN_DELETE_SELF | IN_MOVE_SELF | + IN_UNMOUNT | IN_MOVED_FROM | IN_MOVED_TO) + + +_FLAG_TO_HUMAN = [ + (IN_ACCESS, 'access'), + (IN_MODIFY, 'modify'), + (IN_ATTRIB, 'attrib'), + (IN_CLOSE_WRITE, 'close_write'), + (IN_CLOSE_NOWRITE, 'close_nowrite'), + (IN_OPEN, 'open'), + (IN_MOVED_FROM, 'moved_from'), + (IN_MOVED_TO, 'moved_to'), + (IN_CREATE, 'create'), + (IN_DELETE, 'delete'), + (IN_DELETE_SELF, 'delete_self'), + (IN_MOVE_SELF, 'move_self'), + (IN_UNMOUNT, 'unmount'), + (IN_Q_OVERFLOW, 'queue_overflow'), + (IN_IGNORED, 'ignored'), + (IN_ONLYDIR, 'only_dir'), + (IN_DONT_FOLLOW, 'dont_follow'), + (IN_MASK_ADD, 'mask_add'), + (IN_ISDIR, 'is_dir'), + (IN_ONESHOT, 'one_shot') +] + + + +def humanReadableMask(mask): + """ + Auxiliary function that converts an hexadecimal mask into a series + of human readable flags. + """ + s = [] + for k, v in _FLAG_TO_HUMAN: + if k & mask: + s.append(v) + return s + + +# This class is not copied from Twisted; it acts as a mock. +class INotify(object): + def startReading(self): + pass + + def stopReading(self): + pass + + def watch(self, filepath, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False): + self.callbacks = callbacks + + def event(self, filepath, mask): + for cb in self.callbacks: + cb(None, filepath, mask) + + +__all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS", + "IN_MODIFY", "IN_ATTRIB", "IN_CLOSE_NOWRITE", "IN_CLOSE_WRITE", + "IN_OPEN", "IN_MOVED_FROM", "IN_MOVED_TO", "IN_CREATE", + "IN_DELETE", "IN_DELETE_SELF", "IN_MOVE_SELF", "IN_UNMOUNT", + "IN_Q_OVERFLOW", "IN_IGNORED", "IN_ONLYDIR", "IN_DONT_FOLLOW", + "IN_MASK_ADD", "IN_ISDIR", "IN_ONESHOT", "IN_CLOSE", + "IN_MOVED", "IN_CHANGED"] addfile ./src/allmydata/test/test_drop_upload.py hunk ./src/allmydata/test/test_drop_upload.py 1 + +import os, sys, platform + +from twisted.trial import unittest +from twisted.python import filepath, runtime +from twisted.internet import defer, base + +from allmydata.interfaces import IDirectoryNode, NoSuchChildError + +from allmydata.util import fileutil +from allmydata.util.consumer import download_to_data +from allmydata.test.no_network import GridTestMixin +from allmydata.test.common_util import ReallyEqualMixin +from allmydata.test.common import ShouldFailMixin +from allmydata.test import fake_inotify + +from allmydata.frontends.drop_upload import DropUploader + + +class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin): + """ + These tests will be run both with a mock notifier, and (on platforms that support it) + with the real INotify. + """ + + def _test(self): + self.uploader = None + self.set_up_grid() + self.local_dir = os.path.join(self.basedir, "local_dir") + os.mkdir(self.local_dir) + + self.client = self.g.clients[0] + d = self.client.create_dirnode() + def _made_upload_dir(n): + self.failUnless(IDirectoryNode.providedBy(n)) + self.upload_dirnode = n + self.upload_uri = n.get_uri() + self.uploader = DropUploader(self.client, self.upload_uri, self.local_dir, inotify=self.inotify) + d.addCallback(_made_upload_dir) + + # Write something short enough for a LIT file. + d.addCallback(lambda ign: self._test_file("short", "test")) + + # Write to the same file again with different data. + d.addCallback(lambda ign: self._test_file("short", "different")) + + # Test that temporary files are not uploaded. + d.addCallback(lambda ign: self._test_file("tempfile", "test", temporary=True)) + + # Test that we tolerate creation of a subdirectory. + d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, "directory"))) + + # Write something longer, and also try to test a Unicode name if the fs can represent it. + try: + name = u"l\u00F8ng".encode(sys.getfilesystemencoding()) + except UnicodeEncodeError: + name = "long" + d.addCallback(lambda ign: self._test_file(name, "test"*100)) + + # TODO: test that causes an upload failure. + d.addCallback(lambda ign: self.failUnlessReallyEqual(self.uploader.failed, 0)) + + # Prevent unclean reactor errors. + def _cleanup(res): + if self.uploader is not None: + self.uploader.finish() + return res + d.addBoth(_cleanup) + return d + + def _test_file(self, name, data, temporary=False): + previously_uploaded = self.uploader.uploaded + previously_disappeared = self.uploader.disappeared + + d = defer.Deferred() + + # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file + # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that? + self.uploader.set_uploaded_callback(d.callback) + + path = filepath.FilePath(os.path.join(self.local_dir, name)) + unicode_name = name.decode(sys.getfilesystemencoding()) + + f = open(path.path, "wb") + try: + if temporary and sys.platform != "win32": + os.unlink(path.path) + f.write(data) + finally: + f.close() + if temporary and sys.platform == "win32": + os.unlink(path.path) + self.notify_close_write(path) + + if temporary: + d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None, + self.upload_dirnode.get, unicode_name)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self.uploader.disappeared, previously_disappeared + 1)) + else: + d.addCallback(lambda ign: self.upload_dirnode.get(unicode_name)) + d.addCallback(download_to_data) + d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self.uploader.uploaded, previously_uploaded + 1)) + return d + + +class MockTest(DropUploadTestMixin, unittest.TestCase): + """This can run on any platform, and even if twisted.internet.inotify can't be imported.""" + + def test_errors(self): + self.basedir = "drop_upload.MockTest.test_errors" + self.set_up_grid() + errors_dir = os.path.join(self.basedir, "errors_dir") + os.mkdir(errors_dir) + + client = self.g.clients[0] + d = client.create_dirnode() + def _made_upload_dir(n): + self.failUnless(IDirectoryNode.providedBy(n)) + upload_uri = n.get_uri() + readonly_uri = n.get_readonly_uri() + + self.shouldFail(AssertionError, 'invalid local dir', 'could not be represented', + DropUploader, client, upload_uri, '\xFF', inotify=fake_inotify) + self.shouldFail(AssertionError, 'non-existant local dir', 'not an existing directory', + DropUploader, client, upload_uri, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify) + + self.shouldFail(AssertionError, 'bad URI', 'not a directory URI', + DropUploader, client, 'bad', errors_dir, inotify=fake_inotify) + self.shouldFail(AssertionError, 'non-directory URI', 'not a directory URI', + DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify) + self.shouldFail(AssertionError, 'readonly directory URI', 'does not refer to a writeable directory', + DropUploader, client, readonly_uri, errors_dir, inotify=fake_inotify) + d.addCallback(_made_upload_dir) + return d + + def test_drop_upload(self): + self.inotify = fake_inotify + self.basedir = "drop_upload.MockTest.test_drop_upload" + return self._test() + + def notify_close_write(self, path): + self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE) + + +class RealTest(DropUploadTestMixin, unittest.TestCase): + """This is skipped unless both Twisted and the platform support inotify.""" + + def test_drop_upload(self): + # We should always have runtime.platform.supportsINotify, because we're using + # Twisted >= 10.1. + if not runtime.platform.supportsINotify(): + raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify.") + + self.inotify = None # use the real twisted.internet.inotify + self.basedir = "drop_upload.RealTest.test_drop_upload" + return self._test() + + def notify_close_write(self, path): + # Writing to the file causes the notification. + pass hunk ./src/allmydata/test/test_runner.py 257 self.failUnless(re.search(r"\n\[storage\]\n#.*\nenabled = true\n", content), content) self.failUnless("\nreserved_space = 1G\n" in content) + self.failUnless(re.search(r"\n\[drop_upload\]\n#.*\nenabled = false\n", content), content) + # creating the node a second time should be rejected rc, out, err = self.run_tahoe(argv) self.failIfEqual(rc, 0, str((out, err, rc))) } [drop-upload: make counts visible on the statistics page, and disable some debugging. refs #1429 david-sarah@jacaranda.org**20110727023003 Ignore-this: 4e25022cca41d6012da067e96fadb1bf ] { hunk ./src/allmydata/frontends/drop_upload.py 7 from twisted.internet import defer from twisted.python.filepath import FilePath from twisted.application import service +from foolscap.api import eventually from allmydata.interfaces import IDirectoryNode hunk ./src/allmydata/frontends/drop_upload.py 26 % quote_output(local_dir_utf8)) self._client = client + self._stats_provider = client.stats_provider self._convergence = client.convergence self._local_path = FilePath(local_dir) hunk ./src/allmydata/frontends/drop_upload.py 29 - self.uploaded = 0 - self.failed = 0 - self.disappeared = 0 if inotify is None: from twisted.internet import inotify hunk ./src/allmydata/frontends/drop_upload.py 48 self._notifier = inotify.INotify() self._notifier.startReading() + self._stats_provider.count('drop_upload.dirs_monitored', 1) # We don't watch for IN_CREATE, because that would cause us to read and upload a # possibly-incomplete file before the application has closed it. There should always hunk ./src/allmydata/frontends/drop_upload.py 60 def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) + self._stats_provider.count('drop_upload.files_queued', 1) + eventually(self._process, opaque, path, events_mask) + + def _process(self, opaque, path, events_mask): d = defer.succeed(None) # FIXME: if this already exists as a mutable file, we replace the directory entry, hunk ./src/allmydata/frontends/drop_upload.py 75 d.addCallback(_add_file) def _succeeded(ign): - self.uploaded += 1 + self._stats_provider.count('drop_upload.files_queued', -1) + self._stats_provider.count('drop_upload.files_uploaded', 1) def _failed(f): hunk ./src/allmydata/frontends/drop_upload.py 78 + self._stats_provider.count('drop_upload.files_queued', -1) if path.exists(): self._log("drop-upload: %r failed to upload due to %r" % (path.path, f)) hunk ./src/allmydata/frontends/drop_upload.py 81 - self.failed += 1 + self._stats_provider.count('drop_upload.files_failed', 1) return f else: self._log("drop-upload: notified file %r disappeared " hunk ./src/allmydata/frontends/drop_upload.py 86 "(this is normal for temporary files): %r" % (path.path, f)) - self.disappeared += 1 + self._stats_provider.count('drop_upload.files_disappeared', 1) return None d.addCallbacks(_succeeded, _failed) d.addBoth(self._uploaded_callback) hunk ./src/allmydata/frontends/drop_upload.py 98 def finish(self): self._notifier.stopReading() + self._stats_provider.count('drop_upload.dirs_monitored', -1) def _log(self, msg): self._client.log(msg) hunk ./src/allmydata/frontends/drop_upload.py 102 - open("events", "ab+").write(msg) + #open("events", "ab+").write(msg) hunk ./src/allmydata/test/test_drop_upload.py 26 with the real INotify. """ + def _get_count(self, name): + return self.stats_provider.get_stats()["counters"].get(name, 0) + def _test(self): self.uploader = None self.set_up_grid() hunk ./src/allmydata/test/test_drop_upload.py 36 os.mkdir(self.local_dir) self.client = self.g.clients[0] + self.stats_provider = self.client.stats_provider + d = self.client.create_dirnode() def _made_upload_dir(n): self.failUnless(IDirectoryNode.providedBy(n)) hunk ./src/allmydata/test/test_drop_upload.py 66 d.addCallback(lambda ign: self._test_file(name, "test"*100)) # TODO: test that causes an upload failure. - d.addCallback(lambda ign: self.failUnlessReallyEqual(self.uploader.failed, 0)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0)) # Prevent unclean reactor errors. def _cleanup(res): hunk ./src/allmydata/test/test_drop_upload.py 77 return d def _test_file(self, name, data, temporary=False): - previously_uploaded = self.uploader.uploaded - previously_disappeared = self.uploader.disappeared + previously_uploaded = self._get_count('drop_upload.files_uploaded') + previously_disappeared = self._get_count('drop_upload.files_disappeared') d = defer.Deferred() hunk ./src/allmydata/test/test_drop_upload.py 103 if temporary: d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None, self.upload_dirnode.get, unicode_name)) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self.uploader.disappeared, previously_disappeared + 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'), + previously_disappeared + 1)) else: d.addCallback(lambda ign: self.upload_dirnode.get(unicode_name)) d.addCallback(download_to_data) hunk ./src/allmydata/test/test_drop_upload.py 109 d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data)) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self.uploader.uploaded, previously_uploaded + 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), + previously_uploaded + 1)) + + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0)) return d hunk ./src/allmydata/web/statistics.xhtml 12