diff --git a/src/allmydata/client.py b/src/allmydata/client.py index fa515d4..46b02bc 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -6,7 +6,6 @@ from zope.interface import implements from twisted.internet import reactor, defer from twisted.application import service from twisted.application.internet import TimerService -from foolscap.api import Referenceable from pycryptopp.publickey import rsa import allmydata @@ -22,7 +21,7 @@ from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date from allmydata.stats import StatsProvider from allmydata.history import History -from allmydata.interfaces import IStatsProducer, RIStubClient +from allmydata.interfaces import IStatsProducer from allmydata.nodemaker import NodeMaker @@ -32,9 +31,6 @@ GiB=1024*MiB TiB=1024*GiB PiB=1024*TiB -class StubClient(Referenceable): - implements(RIStubClient) - def _make_secret(): return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n" @@ -186,7 +182,8 @@ class Client(node.Node, pollmixin.PollMixin): ic = IntroducerClient(self.tub, self.introducer_furl, self.nickname, str(allmydata.__full_version__), - str(self.OLDEST_SUPPORTED_VERSION)) + str(self.OLDEST_SUPPORTED_VERSION), + self.get_app_versions()) self.introducer_client = ic # hold off on starting the IntroducerClient until our tub has been # started, so we'll have a useful address on our RemoteReference, so @@ -292,7 +289,6 @@ class Client(node.Node, pollmixin.PollMixin): self.terminator = Terminator() self.terminator.setServiceParent(self) self.add_service(Uploader(helper_furl, self.stats_provider)) - self.init_stub_client() self.init_nodemaker() def init_client_storage_broker(self): @@ -331,20 +327,6 @@ class Client(node.Node, pollmixin.PollMixin): def get_storage_broker(self): return self.storage_broker - def init_stub_client(self): - def _publish(res): - # we publish an empty object so that the introducer can count how - # many clients are connected and see what versions they're - # running. - sc = StubClient() - furl = self.tub.registerReference(sc) - ri_name = RIStubClient.__remote_name__ - self.introducer_client.publish(furl, "stub_client", ri_name) - d = self.when_tub_ready() - d.addCallback(_publish) - d.addErrback(log.err, facility="tahoe.init", - level=log.BAD, umid="OEHq3g") - def init_nodemaker(self): self.nodemaker = NodeMaker(self.storage_broker, self._secret_holder, diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 48094a9..9c2f1c8 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -26,14 +26,6 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests -class RIStubClient(RemoteInterface): - """Each client publishes a service announcement for a dummy object called - the StubClient. This object doesn't actually offer any services, but the - announcement helps the Introducer keep track of which clients are - subscribed (so the grid admin can keep track of things like the size of - the grid and the client versions in use. This is the (empty) - RemoteInterface for the StubClient.""" - class RIBucketWriter(RemoteInterface): """ Objects of this kind live on the server side. """ def write(offset=Offset, data=ShareData): diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index 31fbb5c..1e48fd8 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -1,29 +1,73 @@ -from base64 import b32decode +import time, simplejson from zope.interface import implements from twisted.application import service -from foolscap.api import Referenceable, SturdyRef, eventually +from foolscap.api import Referenceable, eventually, RemoteInterface, Violation from allmydata.interfaces import InsufficientVersionError -from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ - IIntroducerClient +from allmydata.introducer.interfaces import IIntroducerClient, \ + RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2 +from allmydata.introducer.common import sign, unsign, make_index, \ + convert_announcement_v1_to_v2, convert_announcement_v2_to_v1 from allmydata.util import log, idlib -from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref +from allmydata.util.rrefutil import add_version_to_remote_reference +from allmydata.util.ecdsa import BadSignatureError + +class ClientAdapter_v1(Referenceable): # for_v1 + """I wrap a v2 IntroducerClient to make it look like a v1 client, so it + can be attached to an old server.""" + implements(RIIntroducerSubscriberClient_v1) + + def __init__(self, original): + self.original = original + + def remote_announce(self, announcements): + lp = self.original.log("received %d announcements (v1)" % + len(announcements)) + anns_v1 = set([convert_announcement_v1_to_v2(ann_v1) + for ann_v1 in announcements]) + return self.original.got_announcements(anns_v1, lp) + + def remote_set_encoding_parameters(self, parameters): + self.original.remote_set_encoding_parameters(parameters) + +class RIStubClient(RemoteInterface): # for_v1 + """Each client publishes a service announcement for a dummy object called + the StubClient. This object doesn't actually offer any services, but the + announcement helps the Introducer keep track of which clients are + subscribed (so the grid admin can keep track of things like the size of + the grid and the client versions in use. This is the (empty) + RemoteInterface for the StubClient.""" + +class StubClient(Referenceable): # for_v1 + implements(RIStubClient) class IntroducerClient(service.Service, Referenceable): - implements(RIIntroducerSubscriberClient, IIntroducerClient) + implements(RIIntroducerSubscriberClient_v2, IIntroducerClient) def __init__(self, tub, introducer_furl, - nickname, my_version, oldest_supported): + nickname, my_version, oldest_supported, + app_versions): self._tub = tub self.introducer_furl = introducer_furl assert type(nickname) is unicode - self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 + self._nickname = nickname self._my_version = my_version self._oldest_supported = oldest_supported + self._app_versions = app_versions + + self._my_subscriber_info = { "version": 0, + "nickname": self._nickname, + "app-versions": self._app_versions, + "my-version": self._my_version, + "oldest-supported": self._oldest_supported, + } + self._stub_client = None # for_v1 + self._stub_client_furl = None - self._published_announcements = set() + self._published_announcements = {} + self._canary = Referenceable() self._publisher = None @@ -33,10 +77,11 @@ class IntroducerClient(service.Service, Referenceable): # _current_announcements remembers one announcement per # (servicename,serverid) pair. Anything that arrives with the same - # pair will displace the previous one. This stores unpacked - # announcement dictionaries, which can be compared for equality to - # distinguish re-announcement from updates. It also provides memory - # for clients who subscribe after startup. + # pair will displace the previous one. This stores tuples of + # (unpacked announcement dictionary, verifyingkey, rxtime). The ann_d + # dicts can be compared for equality to distinguish re-announcement + # from updates. It also provides memory for clients who subscribe + # after startup. self._current_announcements = {} self.encoding_parameters = None @@ -51,6 +96,11 @@ class IntroducerClient(service.Service, Referenceable): "new_announcement": 0, "outbound_message": 0, } + self._debug_outstanding = 0 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res def startService(self): service.Service.startService(self) @@ -95,22 +145,14 @@ class IntroducerClient(service.Service, Referenceable): def log(self, *args, **kwargs): if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" + kwargs["facility"] = "tahoe.introducer.client" return log.msg(*args, **kwargs) - - def publish(self, furl, service_name, remoteinterface_name): - assert type(self._nickname_utf8) is str # we always send UTF-8 - ann = (furl, service_name, remoteinterface_name, - self._nickname_utf8, self._my_version, self._oldest_supported) - self._published_announcements.add(ann) - self._maybe_publish() - def subscribe_to(self, service_name, cb, *args, **kwargs): self._local_subscribers.append( (service_name,cb,args,kwargs) ) self._subscribed_service_names.add(service_name) self._maybe_subscribe() - for (servicename,nodeid),ann_d in self._current_announcements.items(): + for (servicename,nodeid),(ann_d,key,when) in self._current_announcements.items(): if servicename == service_name: eventually(cb, nodeid, ann_d) @@ -124,87 +166,139 @@ class IntroducerClient(service.Service, Referenceable): # there is a race here, but the subscription desk ignores # duplicate requests. self._subscriptions.add(service_name) - d = self._publisher.callRemote("subscribe", self, service_name) - d.addErrback(trap_deadref) - d.addErrback(log.err, format="server errored during subscribe", - facility="tahoe.introducer", + self._debug_outstanding += 1 + d = self._publisher.callRemote("subscribe_v2", + self, service_name, + self._my_subscriber_info) + d.addBoth(self._debug_retired) + d.addErrback(self._subscribe_handle_v1, service_name) # for_v1 + d.addErrback(log.err, facility="tahoe.introducer.client", level=log.WEIRD, umid="2uMScQ") + def _subscribe_handle_v1(self, f, service_name): # for_v1 + f.trap(Violation, NameError) + # they don't have a 'subscribe_v2' method: must be a v1 introducer. + # Fall back to the v1 'subscribe' method, using a client adapter. + ca = ClientAdapter_v1(self) + self._debug_outstanding += 1 + d = self._publisher.callRemote("subscribe", ca, service_name) + d.addBoth(self._debug_retired) + # We must also publish an empty 'stub_client' object, so the + # introducer can count how many clients are connected and see what + # versions they're running. + if not self._stub_client_furl: + self._stub_client = sc = StubClient() + self._stub_client_furl = self._tub.registerReference(sc) + def _publish_stub_client(ignored): + ri_name = RIStubClient.__remote_name__ + self.publish(self._stub_client_furl, "stub_client", ri_name) + d.addCallback(_publish_stub_client) + return d + + def create_announcement(self, furl, service_name, remoteinterface_name, + signing_key=None): + ann_d = {"version": 0, + "service-name": service_name, + "FURL": furl, + "remoteinterface-name": remoteinterface_name, + + "nickname": self._nickname, + "app-versions": self._app_versions, + "my-version": self._my_version, + "oldest-supported": self._oldest_supported, + } + return simplejson.dumps(sign(ann_d, signing_key)) + + + def publish(self, furl, service_name, remoteinterface_name, + signing_key=None): + ann = self.create_announcement(furl, service_name, remoteinterface_name, + signing_key) + self._published_announcements[service_name] = ann + self._maybe_publish() + def _maybe_publish(self): if not self._publisher: self.log("want to publish, but no introducer yet", level=log.NOISY) return # this re-publishes everything. The Introducer ignores duplicates - for ann in self._published_announcements: + for ann in self._published_announcements.values(): self._debug_counts["outbound_message"] += 1 - d = self._publisher.callRemote("publish", ann) - d.addErrback(trap_deadref) - d.addErrback(log.err, - format="server errored during publish %(ann)s", - ann=ann, facility="tahoe.introducer", + self._debug_outstanding += 1 + d = self._publisher.callRemote("publish_v2", ann, self._canary) + d.addBoth(self._debug_retired) + d.addErrback(self._handle_v1_publisher, ann) # for_v1 + d.addErrback(log.err, ann=ann, facility="tahoe.introducer.client", level=log.WEIRD, umid="xs9pVQ") - - - def remote_announce(self, announcements): - self.log("received %d announcements" % len(announcements)) + def _handle_v1_publisher(self, f, ann): # for_v1 + f.trap(Violation, NameError) + # they don't have the 'publish_v2' method, so fall back to the old + # 'publish' method (which takes an unsigned tuple of bytestrings) + self.log("falling back to publish_v1", + level=log.UNUSUAL, umid="9RCT1A", failure=f) + ann_v1 = convert_announcement_v2_to_v1(ann) + self._debug_outstanding += 1 + d = self._publisher.callRemote("publish", ann_v1) + d.addBoth(self._debug_retired) + return d + + + def remote_announce_v2(self, announcements): + lp = self.log("received %d announcements (v2)" % len(announcements)) + return self.got_announcements(announcements, lp) + + def got_announcements(self, announcements, lp=None): + # this is the common entry point for both v1 and v2 announcements self._debug_counts["inbound_message"] += 1 - for ann in announcements: + for ann_s in announcements: try: - self._process_announcement(ann) - except: - log.err(format="unable to process announcement %(ann)s", - ann=ann) - # Don't let a corrupt announcement prevent us from processing - # the remaining ones. Don't return an error to the server, - # since they'd just ignore it anyways. - pass - - def _process_announcement(self, ann): + ann_d, key = unsign(ann_s) # might raise bad-sig error + except BadSignatureError: + self.log("bad signature on inbound announcement: %s" % (ann_s,), + parent=lp, level=log.WEIRD, umid="ZAU15Q") + # process other announcements that arrived with the bad one + continue + + self._process_announcement(ann_d, key) + + def _process_announcement(self, ann_d, key): self._debug_counts["inbound_announcement"] += 1 - (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann + service_name = str(ann_d["service-name"]) if service_name not in self._subscribed_service_names: self.log("announcement for a service we don't care about [%s]" % (service_name,), level=log.UNUSUAL, umid="dIpGNA") self._debug_counts["wrong_service"] += 1 return - self.log("announcement for [%s]: %s" % (service_name, ann), - umid="BoKEag") - assert type(furl) is str - assert type(service_name) is str - assert type(ri_name) is str - assert type(nickname_utf8) is str - nickname = nickname_utf8.decode("utf-8") - assert type(nickname) is unicode - assert type(ver) is str - assert type(oldest) is str - - nodeid = b32decode(SturdyRef(furl).tubID.upper()) - nodeid_s = idlib.shortnodeid_b2a(nodeid) - - ann_d = { "version": 0, - "service-name": service_name, - - "FURL": furl, - "nickname": nickname, - "app-versions": {}, # need #466 and v2 introducer - "my-version": ver, - "oldest-supported": oldest, - } - - index = (service_name, nodeid) - if self._current_announcements.get(index, None) == ann_d: - self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", + # for ASCII values, simplejson might give us unicode *or* bytes + if "nickname" in ann_d and isinstance(ann_d["nickname"], str): + ann_d["nickname"] = unicode(ann_d["nickname"]) + nick_s = ann_d.get("nickname",u"").encode("utf-8") + lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s", + nick=nick_s, svc=service_name, ann=ann_d, umid="BoKEag") + + index = make_index(ann_d, key) + nodeid = index[1] + nodeid_s = idlib.nodeid_b2a(nodeid) + + # is this announcement a duplicate? + if self._current_announcements.get(index, [None]*3)[0] == ann_d: + self.log(format="reannouncement for [%(service)s]:%(nodeid)s, ignoring", service=service_name, nodeid=nodeid_s, - level=log.UNUSUAL, umid="B1MIdA") + parent=lp2, level=log.UNUSUAL, umid="B1MIdA") self._debug_counts["duplicate_announcement"] += 1 return + # does it update an existing one? if index in self._current_announcements: self._debug_counts["update"] += 1 + self.log("replacing old announcement: %s" % (ann_d,), + parent=lp2, level=log.NOISY, umid="wxwgIQ") else: self._debug_counts["new_announcement"] += 1 + self.log("new announcement[%s]" % service_name, + parent=lp2, level=log.NOISY) - self._current_announcements[index] = ann_d + self._current_announcements[index] = (ann_d, key, time.time()) # note: we never forget an index, but we might update its value for (service_name2,cb,args,kwargs) in self._local_subscribers: diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py new file mode 100644 index 0000000..a057ecc --- /dev/null +++ b/src/allmydata/introducer/common.py @@ -0,0 +1,72 @@ + +import re, simplejson +from base64 import b32decode +from allmydata.util.ecdsa import VerifyingKey + +def make_index(ann_d, key): + """Return something that can be used as an index (e.g. a tuple of + strings), such that two messages that refer to the same 'thing' will have + the same index. For introducer announcements, this is a tuple of + (service-name, signing-key), or (service-name, tubid) if the announcement + is not signed.""" + + service_name = str(ann_d["service-name"]) + if key: + index = (service_name, key.to_string()) + else: + # otherwise, use the FURL to get a tubid + furl = str(ann_d["FURL"]) + m = re.match(r'pb://(\w+)@', furl) + assert m + tubid = b32decode(m.group(1).upper()) + index = (service_name, tubid) + return index + +def convert_announcement_v1_to_v2(ann_t): + (furl, service_name, ri_name, nickname, ver, oldest) = ann_t + assert type(furl) is str + assert type(service_name) is str + assert type(ri_name) is str + assert type(nickname) is str + assert type(ver) is str + assert type(oldest) is str + ann_d = {"version": 0, + "service-name": service_name, + "FURL": furl, + "remoteinterface-name": ri_name, + + "nickname": nickname.decode("utf-8"), + "app-versions": {}, + "my-version": ver, + "oldest-supported": oldest, + } + return simplejson.dumps( (simplejson.dumps(ann_d), None, None) ) + +def convert_announcement_v2_to_v1(ann_v2): + (msg, sig, pubkey) = simplejson.loads(ann_v2) + ann_d = simplejson.loads(msg) + assert ann_d["version"] == 0 + ann_t = (str(ann_d["FURL"]), str(ann_d["service-name"]), + str(ann_d["remoteinterface-name"]), + ann_d["nickname"].encode("utf-8"), + str(ann_d["my-version"]), + str(ann_d["oldest-supported"]), + ) + return ann_t + + +def sign(ann_d, sk): + msg = simplejson.dumps(ann_d) + if not sk: + return (msg, None, None) + vk = sk.get_verifying_key() + return (msg, sk.sign(msg).encode("hex"), vk.to_string().encode("hex")) + +def unsign(ann_s): + (msg_s, sig_s, key_s) = simplejson.loads(ann_s) + key = None + if sig_s and key_s: + key = VerifyingKey.from_string(key_s.decode("hex")) + key.verify(sig_s.decode("hex"), msg_s) + msg = simplejson.loads(msg_s) + return (msg, key) diff --git a/src/allmydata/introducer/interfaces.py b/src/allmydata/introducer/interfaces.py index 54f1701..03b0d9d 100644 --- a/src/allmydata/introducer/interfaces.py +++ b/src/allmydata/introducer/interfaces.py @@ -1,9 +1,12 @@ from zope.interface import Interface from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ - RemoteInterface + RemoteInterface, Referenceable +from old import RIIntroducerSubscriberClient_v1 FURL = StringConstraint(1000) +# old introducer protocol (v1): +# # Announcements are (FURL, service_name, remoteinterface_name, # nickname, my_version, oldest_supported) # the (FURL, service_name, remoteinterface_name) refer to the service being @@ -14,13 +17,17 @@ FURL = StringConstraint(1000) # incompatible peer. The second goal is to enable the development of # backwards-compatibility code. -Announcement = TupleOf(FURL, str, str, - str, str, str) +Announcement_v1 = TupleOf(FURL, str, str, + str, str, str) -class RIIntroducerSubscriberClient(RemoteInterface): - __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" +# new protocol: Announcements are strings, a JSON serialized 3-tuple of (msg, +# sig, pubkey). More details to come. +Announcement_v2 = str - def announce(announcements=SetOf(Announcement)): +class RIIntroducerSubscriberClient_v2(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com" + + def announce_v2(announcements=SetOf(Announcement_v2)): """I accept announcements from the publisher.""" return None @@ -41,38 +48,29 @@ class RIIntroducerSubscriberClient(RemoteInterface): """ return None -# When Foolscap can handle multiple interfaces (Foolscap#17), the -# full-powered introducer will implement both RIIntroducerPublisher and -# RIIntroducerSubscriberService. Until then, we define -# RIIntroducerPublisherAndSubscriberService as a combination of the two, and -# make everybody use that. +SubscriberInfo = DictOf(str, Any()) -class RIIntroducerPublisher(RemoteInterface): +class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface): """To publish a service to the world, connect to me and give me your - announcement message. I will deliver a copy to all connected subscribers.""" - __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" - - def publish(announcement=Announcement): - # canary? - return None - -class RIIntroducerSubscriberService(RemoteInterface): - __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" - - def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): - """Give me a subscriber reference, and I will call its new_peers() - method will any announcements that match the desired service name. I - will ignore duplicate subscriptions. - """ - return None - -class RIIntroducerPublisherAndSubscriberService(RemoteInterface): - __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" + announcement message. I will deliver a copy to all connected subscribers. + To hear about services, connect to me and subscribe to a specific + service_name.""" + __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com" def get_version(): return DictOf(str, Any()) - def publish(announcement=Announcement): + def publish(announcement=Announcement_v1): + return None + def publish_v2(announcement=Announcement_v2, canary=Referenceable): return None - def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): + def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): + return None + def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2, + service_name=str, subscriber_info=SubscriberInfo): + """Give me a subscriber reference, and I will call its announce_v2() + method with any announcements that match the desired service name. I + will ignore duplicate subscriptions. The subscriber_info dictionary + tells me about the subscriber, and is used for diagnostic/status + displays.""" return None class IIntroducerClient(Interface): @@ -80,13 +78,17 @@ class IIntroducerClient(Interface): publish their services to the rest of the world, and I help them learn about services available on other nodes.""" - def publish(furl, service_name, remoteinterface_name): + def publish(furl, service_name, remoteinterface_name, + signing_key=None): """Once you call this, I will tell the world that the Referenceable available at FURL is available to provide a service named SERVICE_NAME. The precise definition of the service being provided is identified by the Foolscap 'remote interface name' in the last parameter: this is supposed to be a globally-unique string that - identifies the RemoteInterface that is implemented.""" + identifies the RemoteInterface that is implemented. + + If signing_key= is set to an instance of ecdsa.SigningKey, it will be + used to sign the announcement.""" def subscribe_to(service_name, callback, *args, **kwargs): """Call this if you will eventually want to use services with the diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py new file mode 100644 index 0000000..e0bdacf --- /dev/null +++ b/src/allmydata/introducer/old.py @@ -0,0 +1,463 @@ + +import time +from base64 import b32decode +from zope.interface import implements, Interface +from twisted.application import service +import allmydata +from allmydata.interfaces import InsufficientVersionError +from allmydata.util import log, idlib, rrefutil +from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ + RemoteInterface, Referenceable, eventually, SturdyRef +FURL = StringConstraint(1000) + +# We keep a copy of the old introducer (both client and server) here to +# support compatibility tests. The old client is supposed to handle the new +# server, and new client is supposed to handle the old server. + + +# Announcements are (FURL, service_name, remoteinterface_name, +# nickname, my_version, oldest_supported) +# the (FURL, service_name, remoteinterface_name) refer to the service being +# announced. The (nickname, my_version, oldest_supported) refer to the +# client as a whole. The my_version/oldest_supported strings can be parsed +# by an allmydata.util.version.Version instance, and then compared. The +# first goal is to make sure that nodes are not confused by speaking to an +# incompatible peer. The second goal is to enable the development of +# backwards-compatibility code. + +Announcement = TupleOf(FURL, str, str, + str, str, str) + +class RIIntroducerSubscriberClient_v1(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" + + def announce(announcements=SetOf(Announcement)): + """I accept announcements from the publisher.""" + return None + + def set_encoding_parameters(parameters=(int, int, int)): + """Advise the client of the recommended k-of-n encoding parameters + for this grid. 'parameters' is a tuple of (k, desired, n), where 'n' + is the total number of shares that will be created for any given + file, while 'k' is the number of shares that must be retrieved to + recover that file, and 'desired' is the minimum number of shares that + must be placed before the uploader will consider its job a success. + n/k is the expansion ratio, while k determines the robustness. + + Introducers should specify 'n' according to the expected size of the + grid (there is no point to producing more shares than there are + peers), and k according to the desired reliability-vs-overhead goals. + + Note that setting k=1 is equivalent to simple replication. + """ + return None + +# When Foolscap can handle multiple interfaces (Foolscap#17), the +# full-powered introducer will implement both RIIntroducerPublisher and +# RIIntroducerSubscriberService. Until then, we define +# RIIntroducerPublisherAndSubscriberService as a combination of the two, and +# make everybody use that. + +class RIIntroducerPublisher_v1(RemoteInterface): + """To publish a service to the world, connect to me and give me your + announcement message. I will deliver a copy to all connected subscribers.""" + __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" + + def publish(announcement=Announcement): + # canary? + return None + +class RIIntroducerSubscriberService_v1(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" + + def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): + """Give me a subscriber reference, and I will call its new_peers() + method will any announcements that match the desired service name. I + will ignore duplicate subscriptions. + """ + return None + +class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface): + __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" + def get_version(): + return DictOf(str, Any()) + def publish(announcement=Announcement): + return None + def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): + return None + +class IIntroducerClient(Interface): + """I provide service introduction facilities for a node. I help nodes + publish their services to the rest of the world, and I help them learn + about services available on other nodes.""" + + def publish(furl, service_name, remoteinterface_name): + """Once you call this, I will tell the world that the Referenceable + available at FURL is available to provide a service named + SERVICE_NAME. The precise definition of the service being provided is + identified by the Foolscap 'remote interface name' in the last + parameter: this is supposed to be a globally-unique string that + identifies the RemoteInterface that is implemented.""" + + def subscribe_to(service_name, callback, *args, **kwargs): + """Call this if you will eventually want to use services with the + given SERVICE_NAME. This will prompt me to subscribe to announcements + of those services. Your callback will be invoked with at least two + arguments: a serverid (binary string), and an announcement + dictionary, followed by any additional callback args/kwargs you give + me. I will run your callback for both new announcements and for + announcements that have changed, but you must be prepared to tolerate + duplicates. + + The announcement dictionary that I give you will have the following + keys: + + version: 0 + service-name: str('storage') + + FURL: str(furl) + remoteinterface-name: str(ri_name) + nickname: unicode + app-versions: {} + my-version: str + oldest-supported: str + + Note that app-version will be an empty dictionary until #466 is done + and both the introducer and the remote client have been upgraded. For + current (native) server types, the serverid will always be equal to + the binary form of the FURL's tubid. + """ + + def connected_to_introducer(): + """Returns a boolean, True if we are currently connected to the + introducer, False if not.""" + + +class IntroducerClient_v1(service.Service, Referenceable): + implements(RIIntroducerSubscriberClient_v1, IIntroducerClient) + + def __init__(self, tub, introducer_furl, + nickname, my_version, oldest_supported): + self._tub = tub + self.introducer_furl = introducer_furl + + assert type(nickname) is unicode + self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 + self._my_version = my_version + self._oldest_supported = oldest_supported + + self._published_announcements = set() + + self._publisher = None + + self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples + self._subscribed_service_names = set() + self._subscriptions = set() # requests we've actually sent + + # _current_announcements remembers one announcement per + # (servicename,serverid) pair. Anything that arrives with the same + # pair will displace the previous one. This stores unpacked + # announcement dictionaries, which can be compared for equality to + # distinguish re-announcement from updates. It also provides memory + # for clients who subscribe after startup. + self._current_announcements = {} + + self.encoding_parameters = None + + # hooks for unit tests + self._debug_counts = { + "inbound_message": 0, + "inbound_announcement": 0, + "wrong_service": 0, + "duplicate_announcement": 0, + "update": 0, + "new_announcement": 0, + "outbound_message": 0, + } + self._debug_outstanding = 0 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res + + def startService(self): + service.Service.startService(self) + self._introducer_error = None + rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) + self._introducer_reconnector = rc + def connect_failed(failure): + self.log("Initial Introducer connection failed: perhaps it's down", + level=log.WEIRD, failure=failure, umid="c5MqUQ") + d = self._tub.getReference(self.introducer_furl) + d.addErrback(connect_failed) + + def _got_introducer(self, publisher): + self.log("connected to introducer, getting versions") + default = { "http://allmydata.org/tahoe/protocols/introducer/v1": + { }, + "application-version": "unknown: no get_version()", + } + d = rrefutil.add_version_to_remote_reference(publisher, default) + d.addCallback(self._got_versioned_introducer) + d.addErrback(self._got_error) + + def _got_error(self, f): + # TODO: for the introducer, perhaps this should halt the application + self._introducer_error = f # polled by tests + + def _got_versioned_introducer(self, publisher): + self.log("got introducer version: %s" % (publisher.version,)) + # we require a V1 introducer + needed = "http://allmydata.org/tahoe/protocols/introducer/v1" + if needed not in publisher.version: + raise InsufficientVersionError(needed, publisher.version) + self._publisher = publisher + publisher.notifyOnDisconnect(self._disconnected) + self._maybe_publish() + self._maybe_subscribe() + + def _disconnected(self): + self.log("bummer, we've lost our connection to the introducer") + self._publisher = None + self._subscriptions.clear() + + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + + def publish(self, furl, service_name, remoteinterface_name): + assert type(self._nickname_utf8) is str # we always send UTF-8 + ann = (furl, service_name, remoteinterface_name, + self._nickname_utf8, self._my_version, self._oldest_supported) + self._published_announcements.add(ann) + self._maybe_publish() + + def subscribe_to(self, service_name, cb, *args, **kwargs): + self._local_subscribers.append( (service_name,cb,args,kwargs) ) + self._subscribed_service_names.add(service_name) + self._maybe_subscribe() + for (servicename,nodeid),ann_d in self._current_announcements.items(): + if servicename == service_name: + eventually(cb, nodeid, ann_d) + + def _maybe_subscribe(self): + if not self._publisher: + self.log("want to subscribe, but no introducer yet", + level=log.NOISY) + return + for service_name in self._subscribed_service_names: + if service_name not in self._subscriptions: + # there is a race here, but the subscription desk ignores + # duplicate requests. + self._subscriptions.add(service_name) + self._debug_outstanding += 1 + d = self._publisher.callRemote("subscribe", self, service_name) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, format="server errored during subscribe", + facility="tahoe.introducer", + level=log.WEIRD, umid="2uMScQ") + + def _maybe_publish(self): + if not self._publisher: + self.log("want to publish, but no introducer yet", level=log.NOISY) + return + # this re-publishes everything. The Introducer ignores duplicates + for ann in self._published_announcements: + self._debug_counts["outbound_message"] += 1 + self._debug_outstanding += 1 + d = self._publisher.callRemote("publish", ann) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="server errored during publish %(ann)s", + ann=ann, facility="tahoe.introducer", + level=log.WEIRD, umid="xs9pVQ") + + + + def remote_announce(self, announcements): + self.log("received %d announcements" % len(announcements)) + self._debug_counts["inbound_message"] += 1 + for ann in announcements: + try: + self._process_announcement(ann) + except: + log.err(format="unable to process announcement %(ann)s", + ann=ann) + # Don't let a corrupt announcement prevent us from processing + # the remaining ones. Don't return an error to the server, + # since they'd just ignore it anyways. + pass + + def _process_announcement(self, ann): + self._debug_counts["inbound_announcement"] += 1 + (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann + if service_name not in self._subscribed_service_names: + self.log("announcement for a service we don't care about [%s]" + % (service_name,), level=log.UNUSUAL, umid="dIpGNA") + self._debug_counts["wrong_service"] += 1 + return + self.log("announcement for [%s]: %s" % (service_name, ann), + umid="BoKEag") + assert type(furl) is str + assert type(service_name) is str + assert type(ri_name) is str + assert type(nickname_utf8) is str + nickname = nickname_utf8.decode("utf-8") + assert type(nickname) is unicode + assert type(ver) is str + assert type(oldest) is str + + nodeid = b32decode(SturdyRef(furl).tubID.upper()) + nodeid_s = idlib.shortnodeid_b2a(nodeid) + + ann_d = { "version": 0, + "service-name": service_name, + + "FURL": furl, + "nickname": nickname, + "app-versions": {}, # need #466 and v2 introducer + "my-version": ver, + "oldest-supported": oldest, + } + + index = (service_name, nodeid) + if self._current_announcements.get(index, None) == ann_d: + self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", + service=service_name, nodeid=nodeid_s, + level=log.UNUSUAL, umid="B1MIdA") + self._debug_counts["duplicate_announcement"] += 1 + return + if index in self._current_announcements: + self._debug_counts["update"] += 1 + else: + self._debug_counts["new_announcement"] += 1 + + self._current_announcements[index] = ann_d + # note: we never forget an index, but we might update its value + + for (service_name2,cb,args,kwargs) in self._local_subscribers: + if service_name2 == service_name: + eventually(cb, nodeid, ann_d, *args, **kwargs) + + def remote_set_encoding_parameters(self, parameters): + self.encoding_parameters = parameters + + def connected_to_introducer(self): + return bool(self._publisher) + +class IntroducerService_v1(service.MultiService, Referenceable): + implements(RIIntroducerPublisherAndSubscriberService_v1) + name = "introducer" + VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": + { }, + "application-version": str(allmydata.__full_version__), + } + + def __init__(self, basedir="."): + service.MultiService.__init__(self) + self.introducer_url = None + # 'index' is (service_name, tubid) + self._announcements = {} # dict of index -> (announcement, timestamp) + self._subscribers = {} # dict of (rref->timestamp) dicts + self._debug_counts = {"inbound_message": 0, + "inbound_duplicate": 0, + "inbound_update": 0, + "outbound_message": 0, + "outbound_announcements": 0, + "inbound_subscribe": 0} + self._debug_outstanding = 0 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res + + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + def get_announcements(self): + return self._announcements + def get_subscribers(self): + return self._subscribers + + def remote_get_version(self): + return self.VERSION + + def remote_publish(self, announcement): + try: + self._publish(announcement) + except: + log.err(format="Introducer.remote_publish failed on %(ann)s", + ann=announcement, level=log.UNUSUAL, umid="620rWA") + raise + + def _publish(self, announcement): + self._debug_counts["inbound_message"] += 1 + self.log("introducer: announcement published: %s" % (announcement,) ) + (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement + #print "PUB", service_name, nickname_utf8 + + nodeid = b32decode(SturdyRef(furl).tubID.upper()) + index = (service_name, nodeid) + + if index in self._announcements: + (old_announcement, timestamp) = self._announcements[index] + if old_announcement == announcement: + self.log("but we already knew it, ignoring", level=log.NOISY) + self._debug_counts["inbound_duplicate"] += 1 + return + else: + self.log("old announcement being updated", level=log.NOISY) + self._debug_counts["inbound_update"] += 1 + self._announcements[index] = (announcement, time.time()) + + for s in self._subscribers.get(service_name, []): + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += 1 + self._debug_outstanding += 1 + d = s.callRemote("announce", set([announcement])) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="subscriber errored on announcement %(ann)s", + ann=announcement, facility="tahoe.introducer", + level=log.UNUSUAL, umid="jfGMXQ") + + def remote_subscribe(self, subscriber, service_name): + self.log("introducer: subscription[%s] request at %s" % (service_name, + subscriber)) + self._debug_counts["inbound_subscribe"] += 1 + if service_name not in self._subscribers: + self._subscribers[service_name] = {} + subscribers = self._subscribers[service_name] + if subscriber in subscribers: + self.log("but they're already subscribed, ignoring", + level=log.UNUSUAL) + return + subscribers[subscriber] = time.time() + def _remove(): + self.log("introducer: unsubscribing[%s] %s" % (service_name, + subscriber)) + subscribers.pop(subscriber, None) + subscriber.notifyOnDisconnect(_remove) + + announcements = set( + [ ann + for (sn2,nodeid),(ann,when) in self._announcements.items() + if sn2 == service_name] ) + + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += len(announcements) + self._debug_outstanding += 1 + d = subscriber.callRemote("announce", announcements) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="subscriber errored during subscribe %(anns)s", + anns=announcements, facility="tahoe.introducer", + level=log.UNUSUAL, umid="1XChxA") diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py index 117fcb5..7868e8f 100644 --- a/src/allmydata/introducer/server.py +++ b/src/allmydata/introducer/server.py @@ -1,14 +1,15 @@ import time, os.path -from base64 import b32decode from zope.interface import implements from twisted.application import service -from foolscap.api import Referenceable, SturdyRef +from foolscap.api import Referenceable import allmydata from allmydata import node -from allmydata.util import log, rrefutil +from allmydata.util import log, base32, idlib from allmydata.introducer.interfaces import \ - RIIntroducerPublisherAndSubscriberService + RIIntroducerPublisherAndSubscriberService_v2 +from allmydata.introducer.common import convert_announcement_v1_to_v2, \ + convert_announcement_v2_to_v1, unsign, make_index class IntroducerNode(node.Node): PORTNUMFILE = "introducer.port" @@ -30,22 +31,53 @@ class IntroducerNode(node.Node): def _publish(res): self.introducer_url = self.tub.registerReference(introducerservice, "introducer") - self.log(" introducer is at %s" % self.introducer_url) + self.log(" introducer is at %s" % self.introducer_url, + umid="qF2L9A") self.write_config("introducer.furl", self.introducer_url + "\n") d.addCallback(_publish) d.addErrback(log.err, facility="tahoe.init", level=log.BAD, umid="UaNs9A") def init_web(self, webport): - self.log("init_web(webport=%s)", args=(webport,)) + self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA") from allmydata.webish import IntroducerWebishServer nodeurl_path = os.path.join(self.basedir, "node.url") ws = IntroducerWebishServer(self, webport, nodeurl_path) self.add_service(ws) +class SubscriberAdapter_v1: # for_v1 + """I wrap a RemoteReference that points at an old v1 subscriber, enabling + it to be treated like a v2 subscriber. + """ + + def __init__(self, original): + self.original = original + def __eq__(self, them): + return self.original == them + def __ne__(self, them): + return self.original != them + def __hash__(self): + return hash(self.original) + def getRemoteTubID(self): + return self.original.getRemoteTubID() + def getSturdyRef(self): + return self.original.getSturdyRef() + def getPeer(self): + return self.original.getPeer() + def callRemote(self, methname, *args, **kwargs): + m = getattr(self, "wrap_" + methname) + return m(*args, **kwargs) + def wrap_announce_v2(self, announcements): + anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements] + return self.original.callRemote("announce", set(anns_v1)) + def wrap_set_encoding_parameters(self, parameters): + return self.original.callRemote("set_encoding_parameters", parameters) + def notifyOnDisconnect(self, *args, **kwargs): + return self.original.notifyOnDisconnect(*args, **kwargs) + class IntroducerService(service.MultiService, Referenceable): - implements(RIIntroducerPublisherAndSubscriberService) + implements(RIIntroducerPublisherAndSubscriberService_v2) name = "introducer" VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { }, @@ -56,93 +88,214 @@ class IntroducerService(service.MultiService, Referenceable): service.MultiService.__init__(self) self.introducer_url = None # 'index' is (service_name, tubid) - self._announcements = {} # dict of index -> (announcement, timestamp) - self._subscribers = {} # dict of (rref->timestamp) dicts + self._announcements = {} # dict of index -> + # (ann_s, canary, ann_d, timestamp) + + # ann_d is cleaned up (nickname is always unicode, servicename is + # always ascii, etc, even though simplejson.loads sometimes returns + # either) + + # self._subscribers is a dict mapping servicename to subscriptions + # 'subscriptions' is a dict mapping rref to a subscription + # 'subscription' is a tuple of (subscriber_info, timestamp) + # 'subscriber_info' is a dict, provided directly for v2 clients, or + # synthesized for v1 clients. The expected keys are: + # version, nickname, app-versions, my-version, oldest-supported + self._subscribers = {} + + # self._stub_client_announcements contains the information provided + # by v1 clients. We stash this so we can match it up with their + # subscriptions. + self._stub_client_announcements = {} # maps tubid to sinfo # for_v1 + self._debug_counts = {"inbound_message": 0, "inbound_duplicate": 0, "inbound_update": 0, "outbound_message": 0, "outbound_announcements": 0, "inbound_subscribe": 0} + self._debug_outstanding = 0 # also covers SubscriberAdapter_v1 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res def log(self, *args, **kwargs): if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" + kwargs["facility"] = "tahoe.introducer.server" return log.msg(*args, **kwargs) def get_announcements(self): return self._announcements def get_subscribers(self): - return self._subscribers + """Return a list of (service_name, when, subscriber_info, rref) for + all subscribers. subscriber_info is a dict with the following keys: + version, nickname, app-versions, my-version, oldest-supported""" + s = [] + for service_name, subscriptions in self._subscribers.items(): + for rref,(subscriber_info,when) in subscriptions.items(): + s.append( (service_name, when, subscriber_info, rref) ) + return s def remote_get_version(self): return self.VERSION - def remote_publish(self, announcement): + def remote_publish(self, ann_s): # for_v1 + lp = self.log("introducer: old (v1) announcement published: %s" + % (ann_s,), umid="6zGOIw") + ann_v2 = convert_announcement_v1_to_v2(ann_s) + return self.publish(ann_v2, None, lp) + + def remote_publish_v2(self, ann_s, canary): + lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ") + return self.publish(ann_s, canary, lp) + + def publish(self, ann_s, canary, lp): try: - self._publish(announcement) + self._publish(ann_s, canary, lp) except: log.err(format="Introducer.remote_publish failed on %(ann)s", - ann=announcement, level=log.UNUSUAL, umid="620rWA") + ann=ann_s, + level=log.UNUSUAL, parent=lp, umid="620rWA") raise - def _publish(self, announcement): + def _publish(self, ann_s, canary, lp): self._debug_counts["inbound_message"] += 1 - self.log("introducer: announcement published: %s" % (announcement,) ) - (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement + self.log("introducer: announcement published: %s" % (ann_s,), + umid="wKHgCw") + ann_d, key = unsign(ann_s) # might raise BadSignatureError + index = make_index(ann_d, key) - nodeid = b32decode(SturdyRef(furl).tubID.upper()) - index = (service_name, nodeid) + service_name = str(ann_d["service-name"]) + if service_name == "stub_client": # for_v1 + self._attach_stub_client(ann_d, index, lp) + return if index in self._announcements: - (old_announcement, timestamp) = self._announcements[index] - if old_announcement == announcement: - self.log("but we already knew it, ignoring", level=log.NOISY) + (old_ann_s, canary, ann_d, timestamp) = self._announcements[index] + if old_ann_s == ann_s: + self.log("but we already knew it, ignoring", level=log.NOISY, + umid="myxzLw") self._debug_counts["inbound_duplicate"] += 1 return else: - self.log("old announcement being updated", level=log.NOISY) + self.log("old announcement being updated", level=log.NOISY, + umid="304r9g") self._debug_counts["inbound_update"] += 1 - self._announcements[index] = (announcement, time.time()) + self._announcements[index] = (ann_s, canary, ann_d, time.time()) + #if canary: + # canary.notifyOnDisconnect ... + # use a CanaryWatcher? with cw.is_connected()? + # actually we just want foolscap to give rref.is_connected(), since + # this is only for the status display for s in self._subscribers.get(service_name, []): self._debug_counts["outbound_message"] += 1 self._debug_counts["outbound_announcements"] += 1 - d = s.callRemote("announce", set([announcement])) - d.addErrback(rrefutil.trap_deadref) + self._debug_outstanding += 1 + d = s.callRemote("announce_v2", set([ann_s])) + d.addBoth(self._debug_retired) d.addErrback(log.err, format="subscriber errored on announcement %(ann)s", - ann=announcement, facility="tahoe.introducer", + ann=ann_s, facility="tahoe.introducer", level=log.UNUSUAL, umid="jfGMXQ") - def remote_subscribe(self, subscriber, service_name): - self.log("introducer: subscription[%s] request at %s" % (service_name, - subscriber)) + def _attach_stub_client(self, ann_d, index, lp): + # There might be a v1 subscriber for whom this is a stub_client. + # We might have received the subscription before the stub_client + # announcement, in which case we now need to fix up the record in + # self._subscriptions . + + # record it for later, in case the stub_client arrived before the + # subscription + subscriber_info = self._get_subscriber_info_from_ann_d(ann_d) + ann_tubid = index[1] + self._stub_client_announcements[ann_tubid] = subscriber_info + + lp2 = self.log("stub_client announcement, " + "looking for matching subscriber", + parent=lp, level=log.NOISY, umid="BTywDg") + + for sn in self._subscribers: + s = self._subscribers[sn] + for (subscriber, info) in s.items(): + # we correlate these by looking for a subscriber whose tubid + # matches this announcement + sub_tubid = base32.a2b(subscriber.getRemoteTubID()) # binary + if sub_tubid == ann_tubid: + self.log(format="found a match, nodeid=%(nodeid)s", + nodeid=idlib.nodeid_b2a(sub_tubid), + level=log.NOISY, parent=lp2, umid="xsWs1A") + # found a match. Does it need info? + if not info[0]: + self.log(format="replacing info", + level=log.NOISY, parent=lp2, umid="m5kxwA") + # yup + s[subscriber] = (subscriber_info, info[1]) + # and we don't remember or announce stub_clients beyond what we + # need to get the subscriber_info set up + + def _get_subscriber_info_from_ann_d(self, ann_d): # for_v1 + sinfo = { "version": ann_d["version"], + "nickname": ann_d["nickname"], + "app-versions": ann_d["app-versions"], + "my-version": ann_d["my-version"], + "oldest-supported": ann_d["oldest-supported"], + } + return sinfo + + def remote_subscribe(self, subscriber, service_name): # for_v1 + self.log("introducer: old (v1) subscription[%s] request at %s" + % (service_name, subscriber), umid="hJlGUg") + return self.add_subscriber(SubscriberAdapter_v1(subscriber), + service_name, None) + + def remote_subscribe_v2(self, subscriber, service_name, subscriber_info): + self.log("introducer: subscription[%s] request at %s" + % (service_name, subscriber), umid="U3uzLg") + return self.add_subscriber(subscriber, service_name, subscriber_info) + + def add_subscriber(self, subscriber, service_name, subscriber_info): self._debug_counts["inbound_subscribe"] += 1 if service_name not in self._subscribers: self._subscribers[service_name] = {} subscribers = self._subscribers[service_name] if subscriber in subscribers: self.log("but they're already subscribed, ignoring", - level=log.UNUSUAL) + level=log.UNUSUAL, umid="Sy9EfA") return - subscribers[subscriber] = time.time() + + if not subscriber_info: # for_v1 + # v1 clients don't provide subscriber_info, but they should + # publish a 'stub client' record which contains the same + # information. If we've already received this, it will be in + # self._stub_client_announcements + tubid_b32 = subscriber.getRemoteTubID() + tubid = base32.a2b(tubid_b32) + if tubid in self._stub_client_announcements: + subscriber_info = self._stub_client_announcements[tubid] + + subscribers[subscriber] = (subscriber_info, time.time()) def _remove(): self.log("introducer: unsubscribing[%s] %s" % (service_name, - subscriber)) + subscriber), + umid="vYGcJg") subscribers.pop(subscriber, None) subscriber.notifyOnDisconnect(_remove) - announcements = set( - [ ann - for (sn2,nodeid),(ann,when) in self._announcements.items() - if sn2 == service_name] ) - - self._debug_counts["outbound_message"] += 1 - self._debug_counts["outbound_announcements"] += len(announcements) - d = subscriber.callRemote("announce", announcements) - d.addErrback(rrefutil.trap_deadref) - d.addErrback(log.err, - format="subscriber errored during subscribe %(anns)s", - anns=announcements, facility="tahoe.introducer", - level=log.UNUSUAL, umid="mtZepQ") + # now tell them about any announcements they're interested in + announcements = set( [ ann_s + for idx,(ann_s,canary,ann_d,when) + in self._announcements.items() + if idx[0] == service_name] ) + if announcements: + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += len(announcements) + self._debug_outstanding += 1 + d = subscriber.callRemote("announce_v2", announcements) + d.addBoth(self._debug_retired) + d.addErrback(log.err, + format="subscriber errored during subscribe %(anns)s", + anns=announcements, facility="tahoe.introducer", + level=log.UNUSUAL, umid="mtZepQ") + return d diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 9d0f50e..69d7f1d 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -9,11 +9,12 @@ from twisted.python import log from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue from twisted.application import service from allmydata.interfaces import InsufficientVersionError -from allmydata.introducer.client import IntroducerClient +from allmydata.introducer.client import IntroducerClient, ClientAdapter_v1 from allmydata.introducer.server import IntroducerService +from allmydata.introducer import old # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode -from allmydata.util import pollmixin +from allmydata.util import pollmixin, ecdsa import allmydata.test.common_util as testutil class LoggingMultiService(service.MultiService): @@ -47,14 +48,14 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): def test_create(self): ic = IntroducerClient(None, "introducer.furl", u"my_nickname", - "my_version", "oldest_version") + "my_version", "oldest_version", {}) self.failUnless(isinstance(ic, IntroducerClient)) def test_listen(self): i = IntroducerService() i.setServiceParent(self.parent) - def test_duplicate(self): + def test_duplicate_publish(self): i = IntroducerService() self.failUnlessEqual(len(i.get_announcements()), 0) self.failUnlessEqual(len(i.get_subscribers()), 0) @@ -73,6 +74,152 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): self.failUnlessEqual(len(i.get_announcements()), 2) self.failUnlessEqual(len(i.get_subscribers()), 0) + + +class Client(unittest.TestCase): + def test_duplicate_receive_v1(self): + ic = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "my_version", "oldest_version", {}) + announcements = [] + ic.subscribe_to("storage", + lambda nodeid,ann_d: announcements.append(ann_d)) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra" + ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0") + ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0") + ca = ClientAdapter_v1(ic) + + ca.remote_announce([ann1]) + d = fireEventually() + def _then(ign): + self.failUnlessEqual(len(announcements), 1) + self.failUnlessEqual(announcements[0]["nickname"], u"nick1") + self.failUnlessEqual(announcements[0]["my-version"], "ver23") + self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["update"], 0) + self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0) + # now send a duplicate announcement: this should not notify clients + ca.remote_announce([ann1]) + return fireEventually() + d.addCallback(_then) + def _then2(ign): + self.failUnlessEqual(len(announcements), 1) + self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2) + self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["update"], 0) + self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) + # and a replacement announcement: same FURL, new other stuff. + # Clients should be notified. + ca.remote_announce([ann1b]) + return fireEventually() + d.addCallback(_then2) + def _then3(ign): + self.failUnlessEqual(len(announcements), 2) + self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3) + self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["update"], 1) + self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) + # test that the other stuff changed + self.failUnlessEqual(announcements[-1]["nickname"], u"nick1") + self.failUnlessEqual(announcements[-1]["my-version"], "ver24") + d.addCallback(_then3) + return d + + def test_duplicate_receive_v2(self): + ic1 = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "ver23", "oldest_version", {}) + # we use a second client just to create a different-looking + # announcement + ic2 = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "ver24","oldest_version",{}) + announcements = [] + def _received(nodeid, ann_d): + announcements.append( (nodeid, ann_d) ) + ic1.subscribe_to("storage", _received) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp" + furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp" + furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo" + + privkey = ecdsa.SigningKey.generate() + pubkey = privkey.get_verifying_key() + pubkey_hex = pubkey.to_string().encode("hex") + + # ann1: ic1, furl1 + # ann1a: ic1, furl1a (same SturdyRef, different connection hints) + # ann1b: ic2, furl1 + # ann2: ic2, furl2 + + self.ann1 = ic1.create_announcement(furl1, "storage", "RIStorage", + privkey) + self.ann1a = ic1.create_announcement(furl1a, "storage", "RIStorage", + privkey) + self.ann1b = ic2.create_announcement(furl1, "storage", "RIStorage", + privkey) + self.ann2 = ic2.create_announcement(furl2, "storage", "RIStorage", + privkey) + + ic1.remote_announce_v2([self.ann1]) # queues eventual-send + d = fireEventually() + def _then1(ign): + self.failUnlessEqual(len(announcements), 1) + nodeid,ann_d = announcements[0] + self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) + self.failUnlessEqual(ann_d["FURL"], furl1) + self.failUnlessEqual(ann_d["my-version"], "ver23") + d.addCallback(_then1) + + # now send a duplicate announcement. This should not fire the + # subscriber + d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1])) + d.addCallback(fireEventually) + def _then2(ign): + self.failUnlessEqual(len(announcements), 1) + d.addCallback(_then2) + + # and a replacement announcement: same FURL, new other stuff. The + # subscriber *should* be fired. + d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b])) + d.addCallback(fireEventually) + def _then3(ign): + self.failUnlessEqual(len(announcements), 2) + nodeid,ann_d = announcements[-1] + self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) + self.failUnlessEqual(ann_d["FURL"], furl1) + self.failUnlessEqual(ann_d["my-version"], "ver24") + d.addCallback(_then3) + + # and a replacement announcement with a different FURL (it uses + # different connection hints) + d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a])) + d.addCallback(fireEventually) + def _then4(ign): + self.failUnlessEqual(len(announcements), 3) + nodeid,ann_d = announcements[-1] + self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) + self.failUnlessEqual(ann_d["FURL"], furl1a) + self.failUnlessEqual(ann_d["my-version"], "ver23") + d.addCallback(_then4) + + # now add a new subscription, which should be called with the + # backlog. The introducer only records one announcement per index, so + # the backlog will only have the latest message. + announcements2 = [] + def _received2(nodeid, ann_d): + announcements2.append( (nodeid, ann_d) ) + d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2)) + d.addCallback(fireEventually) + def _then5(ign): + self.failUnlessEqual(len(announcements2), 1) + nodeid,ann_d = announcements2[-1] + self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) + self.failUnlessEqual(ann_d["FURL"], furl1a) + self.failUnlessEqual(ann_d["my-version"], "ver23") + d.addCallback(_then5) + return d + class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): def create_tub(self, portnum=0): @@ -88,36 +235,37 @@ class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): assert self.central_portnum == portnum tub.setLocation("localhost:%d" % self.central_portnum) +V1 = "v1"; V2 = "v2" class SystemTest(SystemTestMixin, unittest.TestCase): - def test_system(self): - self.basedir = "introducer/SystemTest/system" - os.makedirs(self.basedir) - return self.do_system_test(IntroducerService) - test_system.timeout = 480 # occasionally takes longer than 350s on "draco" - - def do_system_test(self, create_introducer): + def do_system_test(self, server_version): self.create_tub() - introducer = create_introducer() + if server_version == V1: + introducer = old.IntroducerService_v1() + else: + introducer = IntroducerService() introducer.setServiceParent(self.parent) iff = os.path.join(self.basedir, "introducer.furl") tub = self.central_tub ifurl = self.central_tub.registerReference(introducer, furlFile=iff) self.introducer_furl = ifurl - NUMCLIENTS = 5 - # we have 5 clients who publish themselves, and an extra one does - # which not. When the connections are fully established, all six nodes + # we have 5 clients who publish themselves as storage servers, and a + # sixth which does which not. All 6 clients subscriber to hear about + # storage. When the connections are fully established, all six nodes # should have 5 connections each. + NUM_STORAGE = 5 + NUM_CLIENTS = 6 clients = [] tubs = {} received_announcements = {} - NUM_SERVERS = NUMCLIENTS subscribing_clients = [] publishing_clients = [] + privkeys = {} + expected_announcements = [0 for c in range(NUM_CLIENTS)] - for i in range(NUMCLIENTS+1): + for i in range(NUM_CLIENTS): tub = Tub() #tub.setOption("logLocalFailures", True) #tub.setOption("logRemoteFailures", True) @@ -128,62 +276,163 @@ class SystemTest(SystemTestMixin, unittest.TestCase): tub.setLocation("localhost:%d" % portnum) log.msg("creating client %d: %s" % (i, tub.getShortTubID())) - c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i, - "version", "oldest") + if i == 0: + c = old.IntroducerClient_v1(tub, self.introducer_furl, + u"nickname-%d" % i, + "version", "oldest") + else: + c = IntroducerClient(tub, self.introducer_furl, + u"nickname-%d" % i, + "version", "oldest", + {"component": "component-v1"}) received_announcements[c] = {} def got(serverid, ann_d, announcements): announcements[serverid] = ann_d c.subscribe_to("storage", got, received_announcements[c]) subscribing_clients.append(c) - - if i < NUMCLIENTS: - node_furl = tub.registerReference(Referenceable()) - c.publish(node_furl, "storage", "ri_name") + expected_announcements[i] += 1 # all expect a 'storage' announcement + + node_furl = tub.registerReference(Referenceable()) + if i < NUM_STORAGE: + if i == 1: + # sign the announcement + privkey = privkeys[c] = ecdsa.SigningKey.generate() + c.publish(node_furl, "storage", "ri_name", privkey) + else: + c.publish(node_furl, "storage", "ri_name") publishing_clients.append(c) - # the last one does not publish anything + else: + # the last one does not publish anything + pass + + if i == 0: + # users of the V1 client were required to publish a + # 'stub_client' record (somewhat after they published the + # 'storage' record), so the introducer could see their + # version. Match that behavior. + c.publish(node_furl, "stub_client", "stub_ri_name") + + if i == 2: + # also publish something that nobody cares about + boring_furl = tub.registerReference(Referenceable()) + c.publish(boring_furl, "boring", "ri_name") c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub - def _wait_for_all_connections(): - for c in subscribing_clients: - if len(received_announcements[c]) < NUM_SERVERS: + + def _wait_for_connected(ign): + def _connected(): + for c in clients: + if not c.connected_to_introducer(): + return False + return True + return self.poll(_connected) + + # we watch the clients to determine when the system has settled down. + # Then we can look inside the server to assert things about its + # state. + + def _wait_for_expected_announcements(ign): + def _got_expected_announcements(): + for i,c in enumerate(subscribing_clients): + if len(received_announcements[c]) < expected_announcements[i]: + return False + return True + return self.poll(_got_expected_announcements) + + # before shutting down any Tub, we'd like to know that there are no + # messages outstanding + + def _wait_until_idle(ign): + def _idle(): + for c in subscribing_clients + publishing_clients: + if c._debug_outstanding: + return False + if introducer._debug_outstanding: return False - return True - d = self.poll(_wait_for_all_connections) + return True + return self.poll(_idle) + + d = defer.succeed(None) + d.addCallback(_wait_for_connected) + d.addCallback(_wait_for_expected_announcements) + d.addCallback(_wait_until_idle) def _check1(res): log.msg("doing _check1") dc = introducer._debug_counts - self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS) - self.failUnlessEqual(dc["inbound_duplicate"], 0) + if server_version == V1: + # each storage server publishes a record, and (after its + # 'subscribe' has been ACKed) also publishes a "stub_client". + # The non-storage client (which subscribes) also publishes a + # stub_client. There is also one "boring" service. The number + # of messages is higher, because the stub_clients aren't + # published until after we get the 'subscribe' ack (since we + # don't realize that we're dealing with a v1 server [which + # needs stub_clients] until then), and the act of publishing + # the stub_client causes us to re-send all previous + # announcements. + self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"], + NUM_STORAGE + NUM_CLIENTS + 1) + else: + # each storage server publishes a record. There is also one + # "stub_client" and one "boring" + self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) + self.failUnlessEqual(dc["inbound_duplicate"], 0) self.failUnlessEqual(dc["inbound_update"], 0) - self.failUnless(dc["outbound_message"]) + self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) + # the number of outbound messages is tricky.. I think it depends + # upon a race between the publish and the subscribe messages. + self.failUnless(dc["outbound_message"] > 0) + # each client subscribes to "storage", and each server publishes + self.failUnlessEqual(dc["outbound_announcements"], + NUM_STORAGE*NUM_CLIENTS) - for c in clients: - self.failUnless(c.connected_to_introducer()) for c in subscribing_clients: cdc = c._debug_counts self.failUnless(cdc["inbound_message"]) self.failUnlessEqual(cdc["inbound_announcement"], - NUM_SERVERS) + NUM_STORAGE) self.failUnlessEqual(cdc["wrong_service"], 0) self.failUnlessEqual(cdc["duplicate_announcement"], 0) self.failUnlessEqual(cdc["update"], 0) self.failUnlessEqual(cdc["new_announcement"], - NUM_SERVERS) + NUM_STORAGE) anns = received_announcements[c] - self.failUnlessEqual(len(anns), NUM_SERVERS) + self.failUnlessEqual(len(anns), NUM_STORAGE) nodeid0 = b32decode(tubs[clients[0]].tubID.upper()) ann_d = anns[nodeid0] nick = ann_d["nickname"] self.failUnlessEqual(type(nick), unicode) self.failUnlessEqual(nick, u"nickname-0") - for c in publishing_clients: - cdc = c._debug_counts - self.failUnlessEqual(cdc["outbound_message"], 1) + if server_version == V1: + for c in publishing_clients: + cdc = c._debug_counts + expected = 1 # storage + if c is clients[2]: + expected += 1 # boring + if c is not clients[0]: + # the v2 client tries to call publish_v2, which fails + # because the server is v1. It then re-sends + # everything it has so far, plus a stub_client record + expected = 2*expected + 1 + if c is clients[0]: + # we always tell v1 client to send stub_client + expected += 1 + self.failUnlessEqual(cdc["outbound_message"], expected) + else: + for c in publishing_clients: + cdc = c._debug_counts + expected = 1 + if c in [clients[0], # stub_client + clients[2], # boring + ]: + expected = 2 + self.failUnlessEqual(cdc["outbound_message"], expected) + log.msg("_check1 done") d.addCallback(_check1) # force an introducer reconnect, by shutting down the Tub it's using @@ -196,67 +445,52 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) - def _wait_for_introducer_loss(): - for c in clients: - if c.connected_to_introducer(): - return False - return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + def _wait_for_introducer_loss(ign): + def _introducer_lost(): + for c in clients: + if c.connected_to_introducer(): + return False + return True + return self.poll(_introducer_lost) + d.addCallback(_wait_for_introducer_loss) def _restart_introducer_tub(_ign): log.msg("restarting introducer's Tub") - - dc = introducer._debug_counts - self.expected_count = dc["inbound_message"] + NUM_SERVERS - self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 - introducer._debug0 = dc["outbound_message"] - for c in subscribing_clients: - cdc = c._debug_counts - c._debug0 = cdc["inbound_message"] - + # reset counters + for i in range(NUM_CLIENTS): + c = subscribing_clients[i] + for k in c._debug_counts: + c._debug_counts[k] = 0 + for k in introducer._debug_counts: + introducer._debug_counts[k] = 0 + expected_announcements[i] += 1 # new 'storage' for everyone self.create_tub(self.central_portnum) newfurl = self.central_tub.registerReference(introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer_tub) - def _wait_for_introducer_reconnect(): - # wait until: - # all clients are connected - # the introducer has received publish messages from all of them - # the introducer has received subscribe messages from all of them - # the introducer has sent (duplicate) announcements to all of them - # all clients have received (duplicate) announcements - dc = introducer._debug_counts - for c in clients: - if not c.connected_to_introducer(): - return False - if dc["inbound_message"] < self.expected_count: - return False - if dc["inbound_subscribe"] < self.expected_subscribe_count: - return False - for c in subscribing_clients: - cdc = c._debug_counts - if cdc["inbound_message"] < c._debug0+1: - return False - return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect)) + d.addCallback(_wait_for_connected) + d.addCallback(_wait_for_expected_announcements) + d.addCallback(_wait_until_idle) + d.addCallback(lambda _ign: log.msg(" reconnected")) def _check2(res): log.msg("doing _check2") # assert that the introducer sent out new messages, one per # subscriber dc = introducer._debug_counts - self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS) - self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS) - self.failUnlessEqual(dc["inbound_update"], 0) - self.failUnlessEqual(dc["outbound_message"], - introducer._debug0 + len(subscribing_clients)) - for c in clients: - self.failUnless(c.connected_to_introducer()) + self.failUnlessEqual(dc["outbound_announcements"], + NUM_STORAGE*NUM_CLIENTS) + self.failUnless(dc["outbound_message"] > 0) + self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) for c in subscribing_clients: cdc = c._debug_counts - self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS) + self.failUnlessEqual(cdc["inbound_message"], 1) + self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) + self.failUnlessEqual(cdc["new_announcement"], 0) + self.failUnlessEqual(cdc["wrong_service"], 0) + self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) d.addCallback(_check2) # Then force an introducer restart, by shutting down the Tub, @@ -267,71 +501,216 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d.addCallback(lambda _ign: log.msg("shutting down introducer")) d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) - d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + d.addCallback(_wait_for_introducer_loss) + d.addCallback(lambda _ign: log.msg("introducer lost")) def _restart_introducer(_ign): log.msg("restarting introducer") self.create_tub(self.central_portnum) - - for c in subscribing_clients: - # record some counters for later comparison. Stash the values - # on the client itself, because I'm lazy. - cdc = c._debug_counts - c._debug1 = cdc["inbound_announcement"] - c._debug2 = cdc["inbound_message"] - c._debug3 = cdc["new_announcement"] - newintroducer = create_introducer() - self.expected_message_count = NUM_SERVERS - self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients) - self.expected_subscribe_count = len(subscribing_clients) - newfurl = self.central_tub.registerReference(newintroducer, + # reset counters + for i in range(NUM_CLIENTS): + c = subscribing_clients[i] + for k in c._debug_counts: + c._debug_counts[k] = 0 + expected_announcements[i] += 1 # new 'storage' for everyone + if server_version == V1: + introducer = old.IntroducerService_v1() + else: + introducer = IntroducerService() + newfurl = self.central_tub.registerReference(introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer) - def _wait_for_introducer_reconnect2(): - # wait until: - # all clients are connected - # the introducer has received publish messages from all of them - # the introducer has received subscribe messages from all of them - # the introducer has sent announcements for everybody to everybody - # all clients have received all the (duplicate) announcements - # at that point, the system should be quiescent - dc = introducer._debug_counts - for c in clients: - if not c.connected_to_introducer(): - return False - if dc["inbound_message"] < self.expected_message_count: - return False - if dc["outbound_announcements"] < self.expected_announcement_count: - return False - if dc["inbound_subscribe"] < self.expected_subscribe_count: - return False - for c in subscribing_clients: - cdc = c._debug_counts - if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS: - return False - return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2)) + + d.addCallback(_wait_for_connected) + d.addCallback(_wait_for_expected_announcements) + d.addCallback(_wait_until_idle) def _check3(res): log.msg("doing _check3") - for c in clients: - self.failUnless(c.connected_to_introducer()) + dc = introducer._debug_counts + self.failUnlessEqual(dc["outbound_announcements"], + NUM_STORAGE*NUM_CLIENTS) + self.failUnless(dc["outbound_message"] > 0) + self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) for c in subscribing_clients: cdc = c._debug_counts - self.failUnless(cdc["inbound_announcement"] > c._debug1) - self.failUnless(cdc["inbound_message"] > c._debug2) - # there should have been no new announcements - self.failUnlessEqual(cdc["new_announcement"], c._debug3) - # and the right number of duplicate ones. There were - # NUM_SERVERS from the servertub restart, and there should be - # another NUM_SERVERS now - self.failUnlessEqual(cdc["duplicate_announcement"], - 2*NUM_SERVERS) + self.failUnless(cdc["inbound_message"] > 0) + self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) + self.failUnlessEqual(cdc["new_announcement"], 0) + self.failUnlessEqual(cdc["wrong_service"], 0) + self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) d.addCallback(_check3) return d + + def test_system_v2_server(self): + self.basedir = "introducer/SystemTest/system_v2_server" + os.makedirs(self.basedir) + return self.do_system_test(V2) + test_system_v2_server.timeout = 480 + # occasionally takes longer than 350s on "draco" + + def test_system_v1_server(self): + self.basedir = "introducer/SystemTest/system_v1_server" + os.makedirs(self.basedir) + return self.do_system_test(V1) + test_system_v1_server.timeout = 480 + # occasionally takes longer than 350s on "draco" + +from allmydata.util import base32 +class FakeRemoteReference: + def notifyOnDisconnect(self, *args, **kwargs): pass + def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y" + +class ClientInfo(unittest.TestCase): + def test_client_v2(self): + introducer = IntroducerService() + tub = introducer_furl = None + app_versions = {"whizzy": "fizzy"} + client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", + "my_version", "oldest", app_versions) + #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + #ann_s = client_v2.create_announcement(furl1, "storage", "RIStorage") + #introducer.remote_publish_v2(ann_s, Referenceable()) + subscriber = FakeRemoteReference() + introducer.remote_subscribe_v2(subscriber, "storage", + client_v2._my_subscriber_info) + s = introducer.get_subscribers() + self.failUnlessEqual(len(s), 1) + sn, when, si, rref = s[0] + self.failUnlessIdentical(rref, subscriber) + self.failUnlessEqual(sn, "storage") + self.failUnlessEqual(si["version"], 0) + self.failUnlessEqual(si["oldest-supported"], "oldest") + self.failUnlessEqual(si["app-versions"], app_versions) + self.failUnlessEqual(si["nickname"], u"nick-v2") + self.failUnlessEqual(si["my-version"], "my_version") + + def test_client_v1(self): + introducer = IntroducerService() + tub = introducer_furl = None + client_v1 = old.IntroducerClient_v1(tub, introducer_furl, u"nick-v1", + "my_version", "oldest") + subscriber = FakeRemoteReference() + introducer.remote_subscribe(subscriber, "storage") + # the v1 subscribe interface had no subscriber_info: that was usually + # sent in a separate stub_client pseudo-announcement + s = introducer.get_subscribers() + self.failUnlessEqual(len(s), 1) + sn, when, si, rref = s[0] + # rref will be a SubscriberAdapter_v1 around the real subscriber + self.failUnlessIdentical(rref.original, subscriber) + self.failUnlessEqual(si, None) # not known yet + self.failUnlessEqual(sn, "storage") + + # now submit the stub_client announcement + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + ann = (furl1, "stub_client", "RIStubClient", + u"nick-v1".encode("utf-8"), "my_version", "oldest") + introducer.remote_publish(ann) + # the server should correlate the two + s = introducer.get_subscribers() + self.failUnlessEqual(len(s), 1) + sn, when, si, rref = s[0] + self.failUnlessIdentical(rref.original, subscriber) + self.failUnlessEqual(sn, "storage") + + self.failUnlessEqual(si["version"], 0) + self.failUnlessEqual(si["oldest-supported"], "oldest") + # v1 announcements do not contain app-versions + self.failUnlessEqual(si["app-versions"], {}) + self.failUnlessEqual(si["nickname"], u"nick-v1") + self.failUnlessEqual(si["my-version"], "my_version") + + # a subscription that arrives after the stub_client announcement + # should be correlated too + subscriber2 = FakeRemoteReference() + introducer.remote_subscribe(subscriber2, "thing2") + + s = introducer.get_subscribers() + subs = dict([(sn, (si,rref)) for sn, when, si, rref in s]) + self.failUnlessEqual(len(subs), 2) + (si,rref) = subs["thing2"] + self.failUnlessIdentical(rref.original, subscriber2) + self.failUnlessEqual(si["version"], 0) + self.failUnlessEqual(si["oldest-supported"], "oldest") + # v1 announcements do not contain app-versions + self.failUnlessEqual(si["app-versions"], {}) + self.failUnlessEqual(si["nickname"], u"nick-v1") + self.failUnlessEqual(si["my-version"], "my_version") + +class Announcements(unittest.TestCase): + def test_client_v2_unsigned(self): + introducer = IntroducerService() + tub = introducer_furl = None + app_versions = {"whizzy": "fizzy"} + client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", + "my_version", "oldest", app_versions) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y") + ann_s0 = client_v2.create_announcement(furl1, "storage", "RIStorage") + canary0 = Referenceable() + introducer.remote_publish_v2(ann_s0, canary0) + a = introducer.get_announcements() + self.failUnlessEqual(len(a), 1) + (index, (ann_s, canary, ann_d, when)) = a.items()[0] + self.failUnlessIdentical(canary, canary0) + self.failUnlessEqual(index, ("storage", serverid)) + self.failUnlessEqual(ann_d["app-versions"], app_versions) + self.failUnlessEqual(ann_d["nickname"], u"nick-v2") + self.failUnlessEqual(ann_d["service-name"], "storage") + self.failUnlessEqual(ann_d["my-version"], "my_version") + self.failUnlessEqual(ann_d["FURL"], furl1) + + def test_client_v2_signed(self): + introducer = IntroducerService() + tub = introducer_furl = None + app_versions = {"whizzy": "fizzy"} + client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", + "my_version", "oldest", app_versions) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y") + sk = ecdsa.SigningKey.generate() + pk = sk.get_verifying_key() + pks = pk.to_string() + ann_s0 = client_v2.create_announcement(furl1, "storage", "RIStorage", sk) + canary0 = Referenceable() + introducer.remote_publish_v2(ann_s0, canary0) + a = introducer.get_announcements() + self.failUnlessEqual(len(a), 1) + (index, (ann_s, canary, ann_d, when)) = a.items()[0] + self.failUnlessIdentical(canary, canary0) + self.failUnlessEqual(index, ("storage", pks)) # index is pubkey string + self.failUnlessEqual(ann_d["app-versions"], app_versions) + self.failUnlessEqual(ann_d["nickname"], u"nick-v2") + self.failUnlessEqual(ann_d["service-name"], "storage") + self.failUnlessEqual(ann_d["my-version"], "my_version") + self.failUnlessEqual(ann_d["FURL"], furl1) + + def test_client_v1(self): + introducer = IntroducerService() + tub = introducer_furl = None + + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y") + ann = (furl1, "storage", "RIStorage", + u"nick-v1".encode("utf-8"), "my_version", "oldest") + introducer.remote_publish(ann) + + a = introducer.get_announcements() + self.failUnlessEqual(len(a), 1) + (index, (ann_s, canary, ann_d, when)) = a.items()[0] + self.failUnlessEqual(canary, None) + self.failUnlessEqual(index, ("storage", serverid)) + self.failUnlessEqual(ann_d["app-versions"], {}) + self.failUnlessEqual(ann_d["nickname"], u"nick-v1".encode("utf-8")) + self.failUnlessEqual(ann_d["service-name"], "storage") + self.failUnlessEqual(ann_d["my-version"], "my_version") + self.failUnlessEqual(ann_d["FURL"], furl1) + + class TooNewServer(IntroducerService): VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": { }, @@ -359,7 +738,7 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): tub.setLocation("localhost:%d" % portnum) c = IntroducerClient(tub, self.introducer_furl, - u"nickname-client", "version", "oldest") + u"nickname-client", "version", "oldest", {}) announcements = {} def got(serverid, ann_d): announcements[serverid] = ann_d @@ -388,3 +767,13 @@ class DecodeFurl(unittest.TestCase): nodeid = b32decode(m.group(1).upper()) self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") + +# add tests of StorageFarmBroker: if it receives duplicate announcements, it +# should leave the Reconnector in place, also if it receives +# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it +# should tear down the Reconnector and make a new one. This behavior used to +# live in the IntroducerClient, and thus used to be tested by test_introducer + +# copying more tests from old branch: + +# then also add Upgrade test diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index bf6af09..b7b3c22 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -778,7 +778,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): newappverstr = "%s: %s" % (allmydata.__appname__, altverstr) self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res)) - self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res) + self.failUnless("Announcement Summary: storage: 5" in res) self.failUnless("Subscription Summary: storage: 5" in res) except unittest.FailTest: print @@ -795,9 +795,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): self.failUnlessEqual(data["subscription_summary"], {"storage": 5}) self.failUnlessEqual(data["announcement_summary"], - {"storage": 5, "stub_client": 5}) + {"storage": 5}) self.failUnlessEqual(data["announcement_distinct_hosts"], - {"storage": 1, "stub_client": 1}) + {"storage": 1}) except unittest.FailTest: print print "GET %s?t=json output was:" % self.introweb_url diff --git a/src/allmydata/web/introweb.py b/src/allmydata/web/introweb.py index 28273bd..be950c4 100644 --- a/src/allmydata/web/introweb.py +++ b/src/allmydata/web/introweb.py @@ -29,15 +29,20 @@ class IntroducerRoot(rend.Page): def render_JSON(self, ctx): res = {} - clients = self.introducer_service.get_subscribers() - subscription_summary = dict([ (name, len(clients[name])) - for name in clients ]) - res["subscription_summary"] = subscription_summary + + counts = {} + subscribers = self.introducer_service.get_subscribers() + for (service_name, ign, ign, ign) in subscribers: + if service_name not in counts: + counts[service_name] = 0 + counts[service_name] += 1 + res["subscription_summary"] = counts announcement_summary = {} service_hosts = {} - for (ann,when) in self.introducer_service.get_announcements().values(): - (furl, service_name, ri_name, nickname, ver, oldest) = ann + for a in self.introducer_service.get_announcements().values(): + (_, _, ann_d, when) = a + service_name = ann_d["service-name"] if service_name not in announcement_summary: announcement_summary[service_name] = 0 announcement_summary[service_name] += 1 @@ -50,6 +55,7 @@ class IntroducerRoot(rend.Page): # enough: when multiple services are run on a single host, # they're usually either configured with the same addresses, # or setLocationAutomatically picks up the same interfaces. + furl = ann_d["FURL"] locations = SturdyRef(furl).getTubRef().getLocations() # list of tuples, ("ipv4", host, port) host = frozenset([hint[1] @@ -74,8 +80,9 @@ class IntroducerRoot(rend.Page): def render_announcement_summary(self, ctx, data): services = {} - for (ann,when) in self.introducer_service.get_announcements().values(): - (furl, service_name, ri_name, nickname, ver, oldest) = ann + for a in self.introducer_service.get_announcements().values(): + (_, _, ann_d, when) = a + service_name = ann_d["service-name"] if service_name not in services: services[service_name] = 0 services[service_name] += 1 @@ -85,64 +92,51 @@ class IntroducerRoot(rend.Page): for service_name in service_names]) def render_client_summary(self, ctx, data): + counts = {} clients = self.introducer_service.get_subscribers() - service_names = clients.keys() - service_names.sort() - return ", ".join(["%s: %d" % (service_name, len(clients[service_name])) - for service_name in service_names]) + for (service_name, ign, ign, ign) in clients: + if service_name not in counts: + counts[service_name] = 0 + counts[service_name] += 1 + return ", ".join([ "%s: %d" % (name, counts[name]) + for name in sorted(counts.keys()) ] ) def data_services(self, ctx, data): introsvc = self.introducer_service - ann = [(since,a) - for (a,since) in introsvc.get_announcements().values() - if a[1] != "stub_client"] - ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) + ann = [] + for a in introsvc.get_announcements().values(): + (_, _, ann_d, when) = a + if ann_d["service-name"] == "stub_client": + continue + ann.append( (when, ann_d) ) + ann.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"])) + # this used to be: + #ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) + # service_name was the primary key, then the whole tuple (starting + # with the furl) was the secondary key return ann - def render_service_row(self, ctx, (since,announcement)): - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - sr = SturdyRef(furl) + def render_service_row(self, ctx, (since,ann_d)): + sr = SturdyRef(ann_d["FURL"]) nodeid = sr.tubID advertised = self.show_location_hints(sr) - ctx.fillSlots("peerid", "%s %s" % (nodeid, nickname)) + ctx.fillSlots("peerid", "%s %s" % (nodeid, ann_d["nickname"])) ctx.fillSlots("advertised", " ".join(advertised)) ctx.fillSlots("connected", "?") TIME_FORMAT = "%H:%M:%S %d-%b-%Y" ctx.fillSlots("announced", time.strftime(TIME_FORMAT, time.localtime(since))) - ctx.fillSlots("version", ver) - ctx.fillSlots("service_name", service_name) + ctx.fillSlots("version", ann_d["my-version"]) + ctx.fillSlots("service_name", ann_d["service-name"]) return ctx.tag def data_subscribers(self, ctx, data): - # use the "stub_client" announcements to get information per nodeid - clients = {} - for (ann,when) in self.introducer_service.get_announcements().values(): - if ann[1] != "stub_client": - continue - (furl, service_name, ri_name, nickname, ver, oldest) = ann - sr = SturdyRef(furl) - nodeid = sr.tubID - clients[nodeid] = ann - - # then we actually provide information per subscriber - s = [] - introsvc = self.introducer_service - for service_name, subscribers in introsvc.get_subscribers().items(): - for (rref, timestamp) in subscribers.items(): - sr = rref.getSturdyRef() - nodeid = sr.tubID - ann = clients.get(nodeid) - s.append( (service_name, rref, timestamp, ann) ) - s.sort() - return s + return self.introducer_service.get_subscribers() def render_subscriber_row(self, ctx, s): - (service_name, rref, since, ann) = s - nickname = "?" - version = "?" - if ann: - (furl, service_name_2, ri_name, nickname, version, oldest) = ann + (service_name, since, info, rref) = s + nickname = info.get("nickname", "?") + version = info.get("my-version", "?") sr = rref.getSturdyRef() # if the subscriber didn't do Tub.setLocation, nodeid will be None