Package conduit :: Module Conduit
[hide private]

Source Code for Module conduit.Conduit

  1  """ 
  2  Represents a conduit (The joining of one source to one or more sinks) 
  3   
  4  Copyright: John Stowers, 2006 
  5  License: GPLv2 
  6  """ 
  7  import gobject 
  8  import logging 
  9  log = logging.getLogger("Conduit") 
 10   
 11  import conduit 
 12  import conduit.utils as Utils 
 13   
 14  CONFLICT_POLICY_NAMES = ("conflict", "deleted") 
 15  CONFLICT_POLICY_VALUES = ("ask","skip","replace") 
 16  CONFLICT_POLICY_VALUE_ICONS = { 
 17      "conflict_ask"      :   "conduit-conflict-ask", 
 18      "conflict_skip"     :   "conduit-conflict-skip", 
 19      "conflict_replace"  :   "conduit-conflict-right", 
 20      "deleted_ask"       :   "conduit-conflict-ask", 
 21      "deleted_skip"      :   "conduit-conflict-skip", 
 22      "deleted_replace"   :   "conduit-conflict-delete" 
 23  } 
 24       
25 -class Conduit(gobject.GObject):
26 """ 27 Model of a Conduit, which is a one-to-many bridge of DataSources to 28 DataSinks. 29 30 @ivar datasource: The DataSource to synchronize from 31 @type datasource: L{conduit.Module.ModuleWrapper} 32 @ivar datasinks: List of DataSinks to synchronize to 33 @type datasinks: L{conduit.Module.ModuleWrapper}[] 34 """ 35 __gsignals__ = { 36 #Fired when a new instantiatable DP becomes available. It is described via 37 #a wrapper because we do not actually instantiate it till later - to save memory 38 "dataprovider-added" : ( 39 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 40 gobject.TYPE_PYOBJECT]), # The DataProvider that was added to this ConduitModel 41 "dataprovider-removed" : ( 42 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 43 gobject.TYPE_PYOBJECT]), # The DataProvider that was removed from this ConduitModel 44 "dataprovider-changed" : ( 45 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 46 gobject.TYPE_PYOBJECT, # The old DP 47 gobject.TYPE_PYOBJECT]), # The new DP 48 "parameters-changed" : ( 49 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []), 50 "sync-conflict": ( 51 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 52 gobject.TYPE_PYOBJECT]), #Conflict object 53 "sync-completed": ( 54 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 55 gobject.TYPE_BOOLEAN, #True if there was a fatal error 56 gobject.TYPE_BOOLEAN, #True if there was a non fatal error 57 gobject.TYPE_BOOLEAN]), #True if there was a conflict 58 "sync-started": ( 59 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, []), 60 "sync-progress": ( 61 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 62 gobject.TYPE_FLOAT, #percent complete 63 gobject.TYPE_PYOBJECT]) #list of successfully completed UIDs 64 } 65
66 - def __init__(self, syncManager, uid=""):
67 """ 68 Makes and empty conduit ready to hold one datasource and many 69 datasinks 70 """ 71 gobject.GObject.__init__(self) 72 73 self.syncManager = syncManager 74 75 if uid == "": 76 self.uid = Utils.uuid_string() 77 else: 78 self.uid = uid 79 80 #a conduit can hold one datasource and many datasinks (wrappers) 81 self.datasource = None 82 self.datasinks = [] 83 self.twoWaySyncEnabled = False 84 self.slowSyncEnabled = False 85 self.autoSyncEnabled = False 86 self.conflictPolicy = "" 87 self.deletedPolicy = "" 88 89 #set conduits to have the default conflict/deleted policy 90 for policyName in CONFLICT_POLICY_NAMES: 91 policyValue = conduit.GLOBALS.settings.get("default_policy_%s" % policyName) 92 self.set_policy(policyName,policyValue) 93 94 self._conflicts = {}
95
96 - def _parameters_changed(self):
97 self.emit("parameters-changed")
98
99 - def _change_detected(self, arg):
100 #Dont trigger a sync if we are already synchronising 101 if not self.is_busy() and self.do_auto_sync(): 102 log.debug("Triggering an auto sync...") 103 self.sync()
104
105 - def emit(self, *args):
106 """ 107 Override the gobject signal emission so that all signals are emitted 108 from the main loop on an idle handler 109 """ 110 gobject.idle_add(gobject.GObject.emit,self,*args)
111
112 - def add_dataprovider(self, dataprovider_wrapper, trySourceFirst=True):
113 """ 114 Adds a dataprovider to the conduit. 115 116 @param dataprovider_wrapper: The L{conduit.Module.ModuleWrapper} 117 containing a L{conduit.DataProvider.DataProviderBase} to add 118 @type dataprovider_wrapper: L{conduit.Module.ModuleWrapper} 119 """ 120 if dataprovider_wrapper.module_type == "source": 121 #only one source is allowed 122 if self.datasource == None: 123 self.datasource = dataprovider_wrapper 124 else: 125 log.warn("Only one datasource allowed per conduit") 126 return False 127 128 elif dataprovider_wrapper.module_type == "sink": 129 #only one sink of each kind is allowed 130 if dataprovider_wrapper in self.datasinks: 131 log.warn("This datasink already present in this conduit") 132 return False 133 else: 134 #temp reference for drawing the connector line 135 self.datasinks.append(dataprovider_wrapper) 136 137 elif dataprovider_wrapper.module_type == "twoway": 138 if self.datasource == None: 139 if trySourceFirst: 140 log.debug("Adding twoway dataprovider into source position") 141 self.datasource = dataprovider_wrapper 142 else: 143 log.debug("Adding twoway dataprovider into sink position") 144 self.datasinks.append(dataprovider_wrapper) 145 else: 146 log.debug("Adding twoway dataprovider into sink position") 147 self.datasinks.append(dataprovider_wrapper) 148 #Datasinks go on the right 149 else: 150 log.warn("Only sinks, sources or twoway dataproviders may be added") 151 return False 152 153 if dataprovider_wrapper.module != None: 154 dataprovider_wrapper.module.connect("change-detected", self._change_detected) 155 156 self.emit("dataprovider-added", dataprovider_wrapper) 157 return True
158
159 - def get_dataprovider_position(self, dataproviderWrapper):
160 """ 161 Returns the dp position, 162 Source = 0,0 163 Sink = 1, index 164 """ 165 if dataproviderWrapper == self.datasource: 166 return 0, 0 167 elif dataproviderWrapper in self.datasinks: 168 return 1, self.datasinks.index(dataproviderWrapper) 169 else: 170 return -1, -1
171
172 - def is_busy(self):
173 """ 174 Returns True if the conduit is currenlty performing a synchronisation 175 operaton on one or more of its contained DataProviders 176 """ 177 return self.syncManager.sync_in_progress(self)
178
179 - def can_sync(self):
180 """ 181 Returns True if this conduit can be synchronized. It must have a 182 source and a sync, that are not pending 183 """ 184 return self.datasource != None \ 185 and len(self.datasinks) > 0 \ 186 and not self.datasource.is_pending() \ 187 and not self.datasinks[0].is_pending()
188
189 - def get_dataproviders_by_key(self, key):
190 """ 191 Use list comprehension to return all dp's with a given key 192 193 @returns: A list of dataproviders with a given key 194 """ 195 return [dp for dp in [self.datasource] + self.datasinks if dp != None and dp.get_key()==key]
196
197 - def get_all_dataproviders(self):
198 """ 199 @returns: A list of dataproviders with a given key 200 """ 201 return [dp for dp in [self.datasource] + self.datasinks if dp != None]
202
203 - def is_empty(self):
204 """ 205 @returns: True if the conduit contains no dataproviders 206 """ 207 return self.datasource == None and len(self.datasinks) == 0
208
209 - def delete_dataprovider(self, dataprovider):
210 """ 211 Deletes dataprovider 212 """ 213 self.emit("dataprovider-removed", dataprovider) 214 215 #needed to close the db in file dataproviders 216 if dataprovider.module != None: 217 dataprovider.module.uninitialize() 218 219 #Sources and sinks are stored seperately so must be deleted from different 220 #places. Lucky there is only one source or this would be harder.... 221 if dataprovider == self.datasource: 222 del(self.datasource) 223 self.datasource = None 224 return True 225 elif dataprovider in self.datasinks: 226 i = self.datasinks.index(dataprovider) 227 del(self.datasinks[i]) 228 return True 229 else: 230 log.warn("Could not remove %s" % dataprovider) 231 return False
232
233 - def can_do_two_way_sync(self):
234 """ 235 Checks if the conduit is eleigable for two way sync, which is true 236 if it has one source and once sink. Two way doesnt make sense in 237 any other case 238 """ 239 if self.datasource != None and len(self.datasinks) == 1: 240 return self.datasource.module_type == "twoway" and self.datasinks[0].module_type == "twoway" 241 return False
242
243 - def enable_two_way_sync(self):
244 log.debug("Enabling Two Way Sync") 245 self.twoWaySyncEnabled = True 246 self._parameters_changed()
247
248 - def disable_two_way_sync(self):
249 log.debug("Disabling Two Way Sync") 250 self.twoWaySyncEnabled = False 251 self._parameters_changed()
252
253 - def is_two_way(self):
254 return self.can_do_two_way_sync() and self.twoWaySyncEnabled
255
256 - def enable_slow_sync(self):
257 log.debug("Enabling Slow Sync") 258 self.slowSyncEnabled = True 259 self._parameters_changed()
260
261 - def disable_slow_sync(self):
262 log.debug("Disabling Slow Sync") 263 self.slowSyncEnabled = False 264 self._parameters_changed()
265
266 - def do_slow_sync(self):
267 return self.slowSyncEnabled
268
269 - def enable_auto_sync(self):
270 log.debug("Enabling Auto Sync") 271 self.autoSyncEnabled = True 272 self._parameters_changed()
273
274 - def disable_auto_sync(self):
275 log.debug("Disabling Auto Sync") 276 self.autoSyncEnabled = False 277 self._parameters_changed()
278
279 - def do_auto_sync(self):
280 return self.autoSyncEnabled
281
282 - def get_policy(self, policy):
283 if policy not in CONFLICT_POLICY_NAMES: 284 raise Exception("Unknown policy: %s" % policy) 285 if policy == "conflict": 286 return self.conflictPolicy 287 else: 288 return self.deletedPolicy
289
290 - def set_policy(self, policy, value):
291 if policy not in CONFLICT_POLICY_NAMES: 292 raise Exception("Unknown policy: %s" % policy) 293 if value not in CONFLICT_POLICY_VALUES: 294 raise Exception("Unknown policy value: %s" % policy) 295 if policy == "conflict": 296 self.conflictPolicy = value 297 else: 298 self.deletedPolicy = value
299
300 - def change_dataprovider(self, oldDpw, newDpw):
301 """ 302 called when dpw becomes unavailable. 303 """ 304 x,y = self.get_dataprovider_position(oldDpw) 305 self.delete_dataprovider(oldDpw) 306 self.add_dataprovider( 307 dataprovider_wrapper=newDpw, 308 trySourceFirst=(x==0) 309 ) 310 if newDpw.module != None: 311 newDpw.module.connect("change-detected", self._change_detected) 312 313 self.emit("dataprovider-changed", oldDpw, newDpw)
314
315 - def refresh_dataprovider(self, dp, block=False):
316 if dp in self.get_all_dataproviders(): 317 self.syncManager.refresh_dataprovider(self, dp) 318 if block == True: 319 self.syncManager.join_one(self) 320 else: 321 log.warn("Could not refresh dataprovider: %s" % dp)
322
323 - def refresh(self, block=False):
324 if self.datasource is not None and len(self.datasinks) > 0: 325 self.syncManager.refresh_conduit(self) 326 if block == True: 327 self.syncManager.join_one(self) 328 else: 329 log.info("Conduit must have a datasource and a datasink")
330
331 - def sync(self, block=False):
332 if self.datasource is not None and len(self.datasinks) > 0: 333 self.syncManager.sync_conduit(self) 334 if block == True: 335 self.syncManager.join_one(self) 336 else: 337 log.info("Conduit must have a datasource and a datasink")
338
339 - def emit_conflict(self, conflict):
340 hc = hash(conflict) 341 if hc not in self._conflicts: 342 self._conflicts[hc] = conflict 343 self.emit("sync-conflict", conflict)
344
345 - def resolved_conflict(self, conflict):
346 try: 347 hc = hash(conflict) 348 del(self._conflicts[hc]) 349 except KeyError: 350 log.warn("Unknown conflict")
351