Package conduit :: Package modules :: Package NetworkModule :: Module Client
[hide private]

Source Code for Module conduit.modules.NetworkModule.Client

  1  """ 
  2  Contains classes for transmitting and receiving python objects over the network. 
  3   
  4  Copyright: John Stowers, 2006 
  5  License: GPLv2 
  6  """ 
  7  import xmlrpclib 
  8  import threading 
  9  import time 
 10  import gobject 
 11  import logging 
 12  import socket 
 13  log = logging.getLogger("modules.Network") 
 14   
 15  import Peers 
 16  import XMLRPCUtils 
 17   
 18  import conduit.dataproviders.DataProvider as DataProvider 
 19  import conduit.dataproviders.DataProviderCategory as DataProviderCategory 
 20   
21 -class NetworkClientFactory(DataProvider.DataProviderFactory):
22 """ 23 Responsible for making networked Conduit resources available to the user. This includes: 24 1) Monitoring Avahi events to detect other Conduit instances on the network 25 2) Discovering remote conduit capabilities (i.e. what dataproviders it has advertised) 26 3) Data transmission to/from remote conduit instances 27 """
28 - def __init__(self, **kwargs):
29 DataProvider.DataProviderFactory.__init__(self) 30 31 self.categories = {} 32 self.dataproviders = {} 33 self.peers = {} 34 try: 35 self.monitor = Peers.AvahiMonitor(self.host_available, self.host_removed) 36 except: 37 log.warn("Error starting client")
38
39 - def quit(self):
40 for p in self.peers.values(): 41 p.stop()
42
43 - def host_available(self, name, host, address, port, extra_info):
44 """ 45 Callback which is triggered when a dataprovider is advertised on 46 a remote conduit instance 47 """ 48 url = "http://%s" % host 49 log.debug("Remote host '%s' detected" % url) 50 51 if not self.peers.has_key(url): 52 #Create a category group for this host 53 self.categories[url] = DataProviderCategory.DataProviderCategory("On %s" % host, "computer", host) 54 # Create a dataproviders list for this host 55 self.dataproviders[url] = {} 56 # Request all dp's for this host. Because there is no 57 # avahi signal when the text entry in a avahi publish group 58 # is changed, we must poll detected peers.... 59 request = _PeerLister(url, port) 60 request.connect("complete", self.dataprovider_process) 61 request.start() 62 self.peers[url] = request
63
64 - def host_removed(self, url):
65 """ 66 Callback which is triggered when a host is no longer available 67 """ 68 log.debug("Remote host '%s' removed" % url) 69 if self.peers.has_key(url): 70 self.categories.remove(url) 71 for uid, dp in self.dataproviders[url].items(): 72 self.dataprovider_removed(dp) 73 self.dataproviders.remove(url) 74 self.peers.remove(url)
75
76 - def dataprovider_process(self, peerLister):
77 """ 78 """ 79 hostUrl = peerLister.url 80 currentSharedDps = self.dataproviders[hostUrl] 81 #A remote dps uid is the url + the original dp uid 82 remoteSharedDps = {} 83 for dpInfo in peerLister.data_out: 84 remoteUid = "%s-%s" % (hostUrl,dpInfo['uid']) 85 remoteSharedDps[remoteUid] = dpInfo 86 87 #log.debug("Processing Remote Dataprovider: URL:%s\tCurrent dps:%s\tRemote dps:%s" % (hostUrl,currentSharedDps,remoteSharedDps.keys())) 88 89 # loop through all dp's 90 for remoteUid,info in remoteSharedDps.items(): 91 if remoteUid not in currentSharedDps: 92 self.dataprovider_added(hostUrl, remoteUid, info) 93 94 for remoteUid in currentSharedDps.keys(): 95 if remoteUid not in remoteSharedDps: 96 self.dataprovider_removed(hostUrl, remoteUid)
97
98 - def dataprovider_create(self, hostUrl, uid, info):
99 # Each dataprovider is on its own port 100 dpUrl = "%s:%s/" % (hostUrl, info['dp_server_port']) 101 102 params = {} 103 for key, val in info.iteritems(): 104 params['_' + key + '_'] = val 105 106 params['hostUrl'] = hostUrl 107 params['url'] = dpUrl 108 params['uid'] = uid 109 110 # Actually create a new object type based on XMLRPCUtils.DataProviderClient 111 # but with the properties from the remote DataProvider 112 newdp = type(dpUrl, (XMLRPCUtils.DataProviderClient, ), params) 113 114 return newdp
115
116 - def dataprovider_added(self, hostUrl, uid, info):
117 """ 118 Enroll a dataprovider with Conduit's ModuleManager. 119 """ 120 newdp = self.dataprovider_create(hostUrl, uid, info) 121 122 # Register the new dataprovider with Conduit 123 key = self.emit_added( 124 klass=newdp, 125 initargs=(), #No init args, these are encoded as class params 126 category=self.categories[newdp.hostUrl] 127 ) 128 129 # Record the key so we can unregister the dp later (if needed) 130 self.dataproviders[hostUrl][newdp.uid] = key
131
132 - def dataprovider_removed(self, hostUrl, uid):
133 """ 134 Remove a dataprovider from ModuleManager 135 """ 136 self.emit_removed(self.dataproviders[hostUrl][uid]) 137 del(self.dataproviders[hostUrl][uid])
138
139 -class _PeerLister(threading.Thread, gobject.GObject):
140 """ 141 Connects to the remote dataprovider factory and queries 142 the shared dataproviders 143 """ 144 __gsignals__ = { 145 "complete": 146 (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []) 147 } 148 149 FREQ = 5 150 SLEEP = 0.1 151
152 - def __init__(self, url, port):
153 threading.Thread.__init__(self) 154 gobject.GObject.__init__(self) 155 self.port = port 156 self.url = url 157 self.stopped = False 158 self._ticks = 0
159
160 - def stop(self):
161 self.stopped = True
162
163 - def run(self):
164 server = xmlrpclib.Server("%s:%s/" % (self.url,self.port)) 165 #Gross cancellable spinning loop... 166 while not self.stopped: 167 if self._ticks > (self.FREQ / self.SLEEP): 168 try: 169 self.data_out = server.list_shared_dataproviders() 170 gobject.idle_add(self.emit, "complete") 171 except: 172 #If the server has died or not started yet 173 pass 174 self._ticks = 0 175 else: 176 time.sleep(self.SLEEP) 177 self._ticks += 1
178