Package conduit :: Package modules :: Package NetworkModule :: Module XMLRPCUtils
[hide private]

Source Code for Module conduit.modules.NetworkModule.XMLRPCUtils

  1  """ 
  2  Utilty functions shared between the xml-rpc client and server 
  3   
  4  Copyright: John Stowers, 2006 
  5  License: GPLv2 
  6  """ 
  7  import socket 
  8  import select 
  9  import traceback 
 10  import threading 
 11  import cPickle 
 12  import xmlrpclib 
 13  import SimpleXMLRPCServer 
 14  import logging 
 15   
 16  #One log for the client 
 17  clog = logging.getLogger("modules.Network.C") 
 18  #One log for the server 
 19  slog = logging.getLogger("modules.Network.S") 
 20   
 21  import conduit.Exceptions as Exceptions 
 22  import conduit.dataproviders.DataProvider as DataProvider 
 23  import conduit.utils as Utils 
 24   
 25  XML_RPC_EASY_EXCEPTIONS = ( 
 26      "RefreshError", 
 27      "SyncronizeError", 
 28      "SyncronizeFatalError", 
 29      "StopSync" 
 30      ) 
 31       
32 -def marshal_fault_to_exception(fault, **kwargs):
33 if fault.faultCode in XML_RPC_EASY_EXCEPTIONS: 34 klass = getattr(Exceptions,fault.faultCode) 35 #exception.message = fault.faultString 36 raise klass(fault.faultString) 37 elif fault.faultCode == "SynchronizeConflictError": 38 fromData = kwargs['server'].get(kwargs['fromDataLUID']) 39 toData = kwargs['toData'] 40 raise Exceptions.SynchronizeConflictError(fault.faultString, fromData, toData) 41 else: 42 raise Exception("Remote Exception:\n%s" % fault.faultString)
43
44 -def marshal_exception_to_fault(exception):
45 klassName = exception.__class__.__name__ 46 if klassName in XML_RPC_EASY_EXCEPTIONS: 47 #exception.message = fault.faultString 48 raise xmlrpclib.Fault(klassName, exception.message) 49 elif klassName == "SynchronizeConflictError": 50 #only put the comparison in the fault, getting the other data 51 #requires subsequent xmlrpc calls 52 raise xmlrpclib.Fault("SynchronizeConflictError", exception.comparison) 53 else: 54 raise xmlrpclib.Fault("Exception",traceback.format_exc())
55
56 -def pickle_obj_to_binary(obj):
57 bin = xmlrpclib.Binary(cPickle.dumps(obj)) 58 return bin
59
60 -def unpickle_obj_from_binary(bin):
61 obj = cPickle.loads(bin.data) 62 return obj
63
64 -class StoppableXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
65 """ 66 A variant of SimpleXMLRPCServer that can be stopped. From 67 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/520583 68 """ 69 allow_reuse_address = True
70 - def __init__( self, host, port):
71 SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, 72 addr=(host,port), 73 logRequests=False, 74 allow_none=True 75 ) 76 self.closed = False
77
78 - def serve(self):
79 self.socket.setblocking(0) 80 while not self.closed: 81 self.handle_request()
82
83 - def get_request(self):
84 inputObjects = [] 85 while not inputObjects and not self.closed: 86 try: 87 inputObjects, outputObjects, errorObjects = select.select([self.socket], [], [], 0.2) 88 sock, addr = self.socket.accept() 89 return (sock, addr) 90 except socket.timeout: 91 if self.closed: 92 raise 93 except socket.error: 94 #Occurs at shutdown, raise to stop serving 95 if self.closed: 96 raise 97 except select.error: 98 #Occurs sometimes at start up, race condition, ignore 99 pass
100
101 - def start(self):
102 threading.Thread(target=self.serve).start()
103
104 - def stop(self):
105 self.closed = True
106
107 -class DataProviderClient(DataProvider.TwoWay):
108 """ 109 Provides the Client portion of dataprovider proxying. 110 """ 111 _configurable_ = False
112 - def __init__(self, *args):
113 DataProvider.TwoWay.__init__(self) 114 clog.info("Connecting to remote DP on %s" % self.url) 115 #Add use_datetime arg for >= python 2.5 116 self.server = xmlrpclib.Server( 117 self.url, 118 allow_none=True)
119 120 @Utils.log_function_call(clog)
121 - def refresh(self):
122 DataProvider.TwoWay.refresh(self) 123 try: 124 self.server.refresh() 125 except xmlrpclib.Fault, f: 126 marshal_fault_to_exception(f)
127 128 @Utils.log_function_call(clog)
129 - def get_all(self):
130 DataProvider.TwoWay.get_all(self) 131 try: 132 return self.server.get_all() 133 except xmlrpclib.Fault, f: 134 marshal_fault_to_exception(f)
135 136 @Utils.log_function_call(clog)
137 - def get(self, LUID):
138 DataProvider.TwoWay.get(self, LUID) 139 try: 140 binaryData = self.server.get(LUID) 141 return unpickle_obj_from_binary(binaryData) 142 except xmlrpclib.Fault, f: 143 marshal_fault_to_exception(f)
144 145 @Utils.log_function_call(clog)
146 - def put(self, data, overwrite=False, LUID=None):
147 DataProvider.TwoWay.put(self, data, overwrite, LUID) 148 binaryData = pickle_obj_to_binary(data) 149 try: 150 binaryRid = self.server.put(binaryData, overwrite, LUID) 151 return unpickle_obj_from_binary(binaryRid) 152 except xmlrpclib.Fault, f: 153 #Supply additional info because the conflict exception 154 #includes details of the conflict 155 #FIXME: Check from and to isnt backwards... 156 marshal_fault_to_exception( 157 f, 158 server=self, 159 fromDataLUID=LUID, 160 toData=data 161 )
162 163 @Utils.log_function_call(clog)
164 - def delete(self, LUID):
165 DataProvider.TwoWay.delete(self, LUID) 166 try: 167 return self.server.delete(LUID) 168 except xmlrpclib.Fault, f: 169 marshal_fault_to_exception(f)
170 171 @Utils.log_function_call(clog)
172 - def finish(self, aborted, error, conflict):