1 """
2 Contains classes for transmitting and receiving python objects over the network.
3
4 Copyright: John Stowers, 2006
5 License: GPLv2
6 """
7 import xmlrpclib
8 import threading
9 import time
10 import gobject
11 import logging
12 import socket
13 log = logging.getLogger("modules.Network")
14
15 import Peers
16 import XMLRPCUtils
17
18 import conduit.dataproviders.DataProvider as DataProvider
19 import conduit.dataproviders.DataProviderCategory as DataProviderCategory
20
22 """
23 Responsible for making networked Conduit resources available to the user. This includes:
24 1) Monitoring Avahi events to detect other Conduit instances on the network
25 2) Discovering remote conduit capabilities (i.e. what dataproviders it has advertised)
26 3) Data transmission to/from remote conduit instances
27 """
38
42
44 """
45 Callback which is triggered when a dataprovider is advertised on
46 a remote conduit instance
47 """
48 url = "http://%s" % host
49 log.debug("Remote host '%s' detected" % url)
50
51 if not self.peers.has_key(url):
52
53 self.categories[url] = DataProviderCategory.DataProviderCategory("On %s" % host, "computer", host)
54
55 self.dataproviders[url] = {}
56
57
58
59 request = _PeerLister(url, port)
60 request.connect("complete", self.dataprovider_process)
61 request.start()
62 self.peers[url] = request
63
65 """
66 Callback which is triggered when a host is no longer available
67 """
68 log.debug("Remote host '%s' removed" % url)
69 if self.peers.has_key(url):
70 self.categories.remove(url)
71 for uid, dp in self.dataproviders[url].items():
72 self.dataprovider_removed(dp)
73 self.dataproviders.remove(url)
74 self.peers.remove(url)
75
77 """
78 """
79 hostUrl = peerLister.url
80 currentSharedDps = self.dataproviders[hostUrl]
81
82 remoteSharedDps = {}
83 for dpInfo in peerLister.data_out:
84 remoteUid = "%s-%s" % (hostUrl,dpInfo['uid'])
85 remoteSharedDps[remoteUid] = dpInfo
86
87
88
89
90 for remoteUid,info in remoteSharedDps.items():
91 if remoteUid not in currentSharedDps:
92 self.dataprovider_added(hostUrl, remoteUid, info)
93
94 for remoteUid in currentSharedDps.keys():
95 if remoteUid not in remoteSharedDps:
96 self.dataprovider_removed(hostUrl, remoteUid)
97
99
100 dpUrl = "%s:%s/" % (hostUrl, info['dp_server_port'])
101
102 params = {}
103 for key, val in info.iteritems():
104 params['_' + key + '_'] = val
105
106 params['hostUrl'] = hostUrl
107 params['url'] = dpUrl
108 params['uid'] = uid
109
110
111
112 newdp = type(dpUrl, (XMLRPCUtils.DataProviderClient, ), params)
113
114 return newdp
115
117 """
118 Enroll a dataprovider with Conduit's ModuleManager.
119 """
120 newdp = self.dataprovider_create(hostUrl, uid, info)
121
122
123 key = self.emit_added(
124 klass=newdp,
125 initargs=(),
126 category=self.categories[newdp.hostUrl]
127 )
128
129
130 self.dataproviders[hostUrl][newdp.uid] = key
131
138
140 """
141 Connects to the remote dataprovider factory and queries
142 the shared dataproviders
143 """
144 __gsignals__ = {
145 "complete":
146 (gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [])
147 }
148
149 FREQ = 5
150 SLEEP = 0.1
151
153 threading.Thread.__init__(self)
154 gobject.GObject.__init__(self)
155 self.port = port
156 self.url = url
157 self.stopped = False
158 self._ticks = 0
159
162
164 server = xmlrpclib.Server("%s:%s/" % (self.url,self.port))
165
166 while not self.stopped:
167 if self._ticks > (self.FREQ / self.SLEEP):
168 try:
169 self.data_out = server.list_shared_dataproviders()
170 gobject.idle_add(self.emit, "complete")
171 except:
172
173 pass
174 self._ticks = 0
175 else:
176 time.sleep(self.SLEEP)
177 self._ticks += 1
178