Package conduit :: Package modules :: Package NetworkModule :: Module XMLRPCUtils
[hide private]

Source Code for Module conduit.modules.NetworkModule.XMLRPCUtils

  1  """ 
  2  Utilty functions shared between the xml-rpc client and server 
  3   
  4  Copyright: John Stowers, 2006 
  5  License: GPLv2 
  6  """ 
  7  import socket 
  8  import select 
  9  import traceback 
 10  import threading 
 11  import cPickle 
 12  import xmlrpclib 
 13  import SimpleXMLRPCServer 
 14  import logging 
 15   
 16  #One log for the client 
 17  clog = logging.getLogger("modules.Network.C") 
 18  #One log for the server 
 19  slog = logging.getLogger("modules.Network.S") 
 20   
 21  import conduit.Exceptions as Exceptions 
 22  import conduit.dataproviders.DataProvider as DataProvider 
 23  import conduit.utils as Utils 
 24   
 25  XML_RPC_EASY_EXCEPTIONS = ( 
 26      "RefreshError", 
 27      "SyncronizeError", 
 28      "SyncronizeFatalError", 
 29      "StopSync" 
 30      ) 
 31       
32 -def marshal_fault_to_exception(fault, **kwargs):
33 if fault.faultCode in XML_RPC_EASY_EXCEPTIONS: 34 klass = getattr(Exceptions,fault.faultCode) 35 #exception.message = fault.faultString 36 raise klass(fault.faultString) 37 elif fault.faultCode == "SynchronizeConflictError": 38 fromData = kwargs['server'].get(kwargs['fromDataLUID']) 39 toData = kwargs['toData'] 40 raise Exceptions.SynchronizeConflictError(fault.faultString, fromData, toData) 41 else: 42 raise Exception("Remote Exception:\n%s" % fault.faultString)
43
44 -def marshal_exception_to_fault(exception):
45 klassName = exception.__class__.__name__ 46 if klassName in XML_RPC_EASY_EXCEPTIONS: 47 #exception.message = fault.faultString 48 raise xmlrpclib.Fault(klassName, exception.message) 49 elif klassName == "SynchronizeConflictError": 50 #only put the comparison in the fault, getting the other data 51 #requires subsequent xmlrpc calls 52 raise xmlrpclib.Fault("SynchronizeConflictError", exception.comparison) 53 else: 54 raise xmlrpclib.Fault("Exception",traceback.format_exc())
55
56 -def pickle_obj_to_binary(obj):
57 bin = xmlrpclib.Binary(cPickle.dumps(obj)) 58 return bin
59
60 -def unpickle_obj_from_binary(bin):
61 obj = cPickle.loads(bin.data) 62 return obj
63
64 -class StoppableXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
65 """ 66 A variant of SimpleXMLRPCServer that can be stopped. From 67 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/520583 68 """ 69 allow_reuse_address = True
70 - def __init__( self, host, port):
71 SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, 72 addr=(host,port), 73 logRequests=False, 74 allow_none=True 75 ) 76 self.closed = False
77
78 - def serve(self):
79 self.socket.setblocking(0) 80 while not self.closed: 81 self.handle_request()
82
83 - def get_request(self):
84 inputObjects = [] 85 while not inputObjects and not self.closed: 86 try: 87 inputObjects, outputObjects, errorObjects = select.select([self.socket], [], [], 0.2) 88 sock, addr = self.socket.accept() 89 return (sock, addr) 90 except socket.timeout: 91 if self.closed: 92 raise 93 except socket.error: 94 #Occurs at shutdown, raise to stop serving 95 if self.closed: 96 raise 97 except select.error: 98 #Occurs sometimes at start up, race condition, ignore 99 pass
100
101 - def start(self):
102 threading.Thread(target=self.serve).start()
103
104 - def stop(self):
105 self.closed = True
106
107 -class DataProviderClient(DataProvider.TwoWay):
108 """ 109 Provides the Client portion of dataprovider proxying. 110 """ 111 _configurable_ = False
112 - def __init__(self, *args):
113 DataProvider.TwoWay.__init__(self) 114 clog.info("Connecting to remote DP on %s" % self.url) 115 #Add use_datetime arg for >= python 2.5 116 self.server = xmlrpclib.Server( 117 self.url, 118 allow_none=True)
119 120 @Utils.log_function_call(clog)
121 - def refresh(self):
122 DataProvider.TwoWay.refresh(self) 123 try: 124 self.server.refresh() 125 except xmlrpclib.Fault, f: 126 marshal_fault_to_exception(f)
127 128 @Utils.log_function_call(clog)
129 - def get_all(self):
130 DataProvider.TwoWay.get_all(self) 131 try: 132 return self.server.get_all() 133 except xmlrpclib.Fault, f: 134 marshal_fault_to_exception(f)
135 136 @Utils.log_function_call(clog)
137 - def get(self, LUID):
138 DataProvider.TwoWay.get(self, LUID) 139 try: 140 binaryData = self.server.get(LUID) 141 return unpickle_obj_from_binary(binaryData) 142 except xmlrpclib.Fault, f: 143 marshal_fault_to_exception(f)
144 145 @Utils.log_function_call(clog)
146 - def put(self, data, overwrite=False, LUID=None):
147 DataProvider.TwoWay.put(self, data, overwrite, LUID) 148 binaryData = pickle_obj_to_binary(data) 149 try: 150 binaryRid = self.server.put(binaryData, overwrite, LUID) 151 return unpickle_obj_from_binary(binaryRid) 152 except xmlrpclib.Fault, f: 153 #Supply additional info because the conflict exception 154 #includes details of the conflict 155 #FIXME: Check from and to isnt backwards... 156 marshal_fault_to_exception( 157 f, 158 server=self, 159 fromDataLUID=LUID, 160 toData=data 161 )
162 163 @Utils.log_function_call(clog)
164 - def delete(self, LUID):
165 DataProvider.TwoWay.delete(self, LUID) 166 try: 167 return self.server.delete(LUID) 168 except xmlrpclib.Fault, f: 169 marshal_fault_to_exception(f)
170 171 @Utils.log_function_call(clog)
172 - def finish(self, aborted, error, conflict):
173 DataProvider.TwoWay.finish(self) 174 try: 175 self.server.finish(aborted, error, conflict) 176 except xmlrpclib.Fault, f: 177 marshal_fault_to_exception(f)
178 179 @Utils.log_function_call(clog)
180 - def get_UID(self):
181 return self.uid
182 183 @Utils.log_function_call(clog)
184 - def set_status(self, newStatus):
185 self.server.set_status(newStatus)
186 187 @Utils.log_function_call(clog)
188 - def get_status(self):
189 return self.server.get_status()
190
191 - def get_name(self):
192 return "Remote %s" % self._name_
193
194 -class DataproviderServer(StoppableXMLRPCServer):
195 """ 196 Wraps a dataproviderwrapper in order to pickle args 197 and deal with exceptions in the sync process 198 """
199 - def __init__(self, wrapper, port):
200 StoppableXMLRPCServer.__init__(self,'',port) 201 slog.info("Starting server for %s on port %s" % (wrapper,port)) 202 self.port = port 203 self.dpw = wrapper 204 205 #Additional functions not part of the normal dp api 206 self.register_function(self.get_info) 207 208 #register individual functions, not the whole object, 209 #because in some cases we need to pickle function arguments 210 #and deal with exceptions 211 self.register_function(self.refresh) 212 self.register_function(self.get_all) 213 self.register_function(self.get) 214 self.register_function(self.put) 215 self.register_function(self.delete) 216 self.register_function(self.finish) 217 218 #These functions will never throw exceptions so register them in 219 #the module directly 220 self.register_function(self.dpw.module.set_status) 221 self.register_function(self.dpw.module.get_status)
222 223
224 - def get_info(self):
225 """ 226 Return information about this dataprovider 227 (so that client can show correct icon, name, description etc) 228 """ 229 return {"uid": self.dpw.get_UID(), 230 "name": self.dpw.name, 231 "description": self.dpw.description, 232 "icon": self.dpw.icon_name, 233 "module_type": self.dpw.module_type, 234 "in_type": self.dpw.in_type, 235 "out_type": self.dpw.out_type, 236 "dp_server_port": self.port 237 }
238 239 @Utils.log_function_call(slog)
240 - def refresh(self):
241 try: 242 self.dpw.module.refresh() 243 except Exception, e: 244 return marshal_exception_to_fault(e)
245 246 @Utils.log_function_call(slog)
247 - def get_all(self):
248 try: 249 return self.dpw.module.get_all() 250 except Exception, e: 251 return marshal_exception_to_fault(e)
252 253 @Utils.log_function_call(slog)
254 - def get(self, LUID):
255 try: 256 return pickle_obj_to_binary(self.dpw.module.get(LUID)) 257 except Exception, e: 258 return marshal_exception_to_fault(e)
259 260 @Utils.log_function_call(slog)
261 - def put(self, binaryData, overwrite, LUID):
262 data = unpickle_obj_from_binary(binaryData) 263 try: 264 rid = self.dpw.module.put(data, overwrite, LUID) 265 return pickle_obj_to_binary(rid) 266 except Exception, e: 267 return marshal_exception_to_fault(e)
268 269 @Utils.log_function_call(slog)
270 - def delete(self, LUID):
271 try: 272 self.dpw.module.delete(LUID) 273 except Exception, e: 274 return marshal_exception_to_fault(e)
275 276 @Utils.log_function_call(slog)
277 - def finish(self, aborted, error, conflict):
278 try: 279 self.dpw.module.finish(aborted, error, conflict) 280 except Exception, e: 281 return marshal_exception_to_fault(e)
282