Package conduit :: Module Synchronization
[hide private]

Source Code for Module conduit.Synchronization

  1  """ 
  2  Holds class used for the actual synchronisation phase 
  3   
  4  Copyright: John Stowers, 2006 
  5  License: GPLv2 
  6  """ 
  7   
  8  import traceback 
  9  import threading 
 10  import logging 
 11  log = logging.getLogger("Syncronization") 
 12   
 13   
 14  import conduit 
 15  import conduit.dataproviders.DataProvider as DataProvider 
 16  import conduit.Exceptions as Exceptions 
 17  import conduit.DeltaProvider as DeltaProvider 
 18   
 19  from conduit.Conflict import Conflict, CONFLICT_DELETE, CONFLICT_COPY_SOURCE_TO_SINK,CONFLICT_SKIP,CONFLICT_COPY_SINK_TO_SOURCE 
 20  from conduit.datatypes import DataType, Rid, COMPARISON_OLDER, COMPARISON_EQUAL, COMPARISON_NEWER, COMPARISON_UNKNOWN 
 21   
22 -def put_data(source, sink, sourceData, sourceDataRid, overwrite):
23 """ 24 Puts sourceData into sink, overwrites if overwrite is True. Updates 25 the mappingDB 26 """ 27 #get the existing mapping 28 mapping = conduit.GLOBALS.mappingDB.get_mapping( 29 sourceUID=source.get_UID(), 30 dataLUID=sourceDataRid.get_UID(), 31 sinkUID=sink.get_UID() 32 ) 33 sourceDataLUID = sourceDataRid.get_UID() 34 sinkDataLUID = mapping.get_sink_rid().get_UID() 35 36 #put the data 37 log.info("Putting data %s --> %s into %s" % (sourceDataLUID, sinkDataLUID, sink.get_UID())) 38 sinkRid = sink.module.put( 39 sourceData, 40 overwrite, 41 sinkDataLUID) 42 43 #Update the mapping and save 44 mapping.set_source_rid(sourceDataRid) 45 mapping.set_sink_rid(sinkRid) 46 conduit.GLOBALS.mappingDB.save_mapping(mapping)
47
48 -def delete_data(source, sink, dataLUID):
49 """ 50 Deletes data from sink and updates the mapping DB 51 """ 52 log.info("Deleting %s from %s" % (dataLUID, sink.get_UID())) 53 sink.module.delete(dataLUID) 54 mapping = conduit.GLOBALS.mappingDB.get_mapping( 55 sourceUID=source.get_UID(), 56 dataLUID=dataLUID, 57 sinkUID=sink.get_UID() 58 ) 59 conduit.GLOBALS.mappingDB.delete_mapping(mapping)
60
61 -class SyncManager:
62 """ 63 Given a dictionary of relationships this class synchronizes 64 the relevant sinks and sources. If there is a conflict then this is 65 handled by the conflictResolver 66 """
67 - def __init__ (self, typeConverter):
68 """ 69 Constructor. 70 71 Creates a dictionary of syncWorkers indexed by conduit 72 """ 73 self.syncWorkers = {} 74 self.typeConverter = typeConverter
75
76 - def _cancel_sync_thread(self, cond):
77 log.warn("Conduit already in queue (alive: %s)" % self.syncWorkers[cond].isAlive()) 78 #If the thread is alive then cancel it 79 if self.syncWorkers[cond].isAlive(): 80 log.warn("Cancelling thread") 81 self.syncWorkers[cond].cancel() 82 self.syncWorkers[cond].join() #Will block
83
84 - def _on_sync_completed(self, cond, abort, error, conflict, worker):
85 if cond in self.syncWorkers: 86 log.debug("Deleting worker: %s" % worker) 87 del(self.syncWorkers[cond])
88
89 - def _start_worker_thread(self, cond, worker):
90 log.info("Setting global cancel flag") 91 conduit.GLOBALS.cancelled = False 92 93 log.debug("Starting worker: %s" % worker) 94 cond.connect("sync-completed", self._on_sync_completed, worker) 95 self.syncWorkers[cond] = worker 96 self.syncWorkers[cond].start()
97
98 - def is_busy(self):
99 """ 100 Returns true if any conduit is currently undergoing a sync 101 """ 102 for cond in self.syncWorkers: 103 if self.syncWorkers[cond].isAlive(): 104 return True 105 return False
106
107 - def sync_in_progress(self, cond):
108 """ 109 Returns true if cond is currently undergoing sync, refresh etc 110 """ 111 return cond in self.syncWorkers and self.syncWorkers[cond].isAlive()
112
113 - def cancel_all(self):
114 """ 115 Cancels all threads and also joins() them. Will block 116 """ 117 for c in self.syncWorkers: 118 self._cancel_sync_thread(c)
119
120 - def join_one(self, cond, timeout=None):
121 """ 122 Blocks until the thread associated with the supplied conduit finishes 123 """ 124 log.info("Waiting for thread to finish") 125 self.syncWorkers[cond].join(timeout)
126
127 - def join_all(self, timeout=None):
128 """ 129 Joins all threads. This function will block the calling thread 130 """ 131 for c in self.syncWorkers: 132 self.syncWorkers[c].join(timeout)
133
134 - def run_blocking_dataprovider_function_calls(self, dataprovider, callback, *functions):
135 #need to get the conduit assocated with this dataprovider because the sync-completed 136 #signal is emmited from the conduit object 137 conds = [] 138 conds.extend(conduit.GLOBALS.app.guiSyncSet.get_all_conduits()) 139 conds.extend(conduit.GLOBALS.app.dbusSyncSet.get_all_conduits()) 140 for c in conds: 141 for dpw in c.get_all_dataproviders(): 142 if dataprovider == dpw.module: 143 #found it! 144 if c not in self.syncWorkers: 145 #connect the supplied callback 146 c.connect("sync-completed",callback) 147 #start the thread 148 bfcw = BlockingFunctionCallWorker(c, *functions) 149 self._start_worker_thread(c, bfcw) 150 return 151 152 log.info("Could not create BlockingFunctionCallWorker")
153
154 - def refresh_dataprovider(self, cond, dataproviderWrapper):
155 if cond in self.syncWorkers: 156 log.info("Refresh dataproviderWrapper already in progress") 157 self.join_one(cond) 158 159 threadedWorker = RefreshDataProviderWorker(cond, dataproviderWrapper) 160 self._start_worker_thread(cond, threadedWorker)
161
162 - def refresh_conduit(self, cond):
163 if cond in self.syncWorkers: 164 log.info("Refresh already in progress") 165 self.join_one(cond) 166 167 threadedWorker = SyncWorker(self.typeConverter, cond, False) 168 self._start_worker_thread(cond, threadedWorker)
169
170 - def sync_conduit(self, cond):
171 if cond in self.syncWorkers: 172 log.info("Sync already in progress") 173 self.join_one(cond) 174 175 threadedWorker = SyncWorker(self.typeConverter, cond, True) 176 self._start_worker_thread(cond, threadedWorker)
177
178 - def did_sync_abort(self, cond):
179 """ 180 Returns True if the supplied conduit aborted (the sync did not complete 181 due to an unhandled exception, a SynchronizeFatalError or the conduit was 182 unsyncable (source did not refresh, etc) 183 """ 184 return self.syncWorkers[cond].aborted
185
186 - def did_sync_error(self, cond):
187 """ 188 Returns True if the supplied conduit raised a non fatal 189 SynchronizeError during sync 190 """ 191 return self.syncWorkers[cond].did_sync_error()
192
193 - def did_sync_conflict(self, cond):
194 """ 195 Returns True if the supplied conduit encountered a conflict during processing 196 """ 197 return self.syncWorkers[cond].did_sync_conflict()
198
199 -class _ThreadedWorker(threading.Thread):
200 """ 201 Aa python thread, Base class for refresh and syncronization 202 operations 203 """ 204 CONFIGURE_STATE = 0 205 REFRESH_STATE = 1 206 SYNC_STATE = 2 207 DONE_STATE = 3 208
209 - def __init__(self):
210 threading.Thread.__init__(self) 211 212 #Python threads are not cancellable. Hopefully this will be fixed 213 #in Python 3000 214 self.cancelled = False 215 216 #true if the sync aborts via an unhandled exception 217 self.aborted = False 218 #Keep track of any non conflicts, fatal errors (or trapped exceptions) in the sync process. 219 #Class variable because these may occur in a data conversion. 220 #Needed so that the correct status is shown on the GUI at the end of the sync process 221 self.sinkErrors = {} 222 223 #Start at the beginning 224 self.state = self.CONFIGURE_STATE
225
226 - def _get_changes(self, source, sink):
227 """ 228 Returns all the data from the source to the sink. If the dataprovider 229 implements get_changes() then this is called. Otherwise the dataprovider 230 is proxied using DeltaProvider 231 232 @returns: added, modified, deleted 233 """ 234 try: 235 added, modified, deleted = source.module.get_changes() 236 except NotImplementedError: 237 delta = DeltaProvider.DeltaProvider(source, sink) 238 added, modified, deleted = delta.get_changes() 239 240 log.debug("%s Changes: New %s items\n%s" % (source.get_UID(), len(added), added)) 241 log.debug("%s Changes: Modified %s items\n%s" % (source.get_UID(), len(modified), modified)) 242 log.debug("%s Changes: Deleted %s items\n%s" % (source.get_UID(), len(deleted), deleted)) 243 244 #FIXME: Copy the lists because they are modified in place somewhere... 245 return added[:], modified[:], deleted[:]
246
247 - def cancel(self):
248 """ 249 Cancels the sync thread. Does not do so immediately but as soon as 250 possible. 251 """ 252 self.cancelled = True
253
254 - def did_sync_error(self):
255 #conflicts do not specifically count as errors so remove them 256 errors = self.sinkErrors.values() 257 while True: 258 try: errors.remove(DataProvider.STATUS_DONE_SYNC_CONFLICT) 259 except ValueError: break 260 return len(errors) > 0
261
262 - def did_sync_conflict(self):
263 errors = self.sinkErrors.values() 264 return DataProvider.STATUS_DONE_SYNC_CONFLICT in errors
265
266 -class SyncWorker(_ThreadedWorker):
267 """ 268 Class designed to be operated within a thread used to perform the 269 synchronization operation. Inherits from GObject because it uses 270 signals to communcate with the main GUI. 271 272 Operates on a per Conduit basis, so a single SyncWorker may synchronize 273 one source with many sinks within a single conduit 274 """ 275 276 PROGRESS_UPDATE_THRESHOLD = 5.0/100 277
278 - def __init__(self, typeConverter, cond, do_sync):
279 _ThreadedWorker.__init__(self) 280 self.typeConverter = typeConverter 281 self.cond = cond 282 self.source = cond.datasource 283 self.sinks = cond.datasinks 284 self.do_sync = do_sync 285 286 self._progress = 0 287 self._progressUIDs = [] 288 289 290 if self.cond.is_two_way(): 291 self.setName("%s <--> %s" % (self.source, self.sinks[0])) 292 else: 293 self.setName("%s |--> %s" % (self.source, self.sinks))
294
295 - def _emit_progress(self, progress, dataUID):
296 """ 297 Emits progress signals, if the elapsed progress since the last 298 call to this function is greater that 5%. This is necessary because 299 otherwise we starve the main loop with too frequent progress 300 events 301 """ 302 self._progressUIDs.append(dataUID) 303 if (progress - self._progress) > self.PROGRESS_UPDATE_THRESHOLD or progress == 1.0: 304 self._progress = progress 305 self.cond.emit("sync-progress", self._progress, self._progressUIDs) 306 self._progressUIDs = []
307
308 - def _get_data(self, source, sink, uid):
309 """ 310 Gets the data from source. Handles exceptions, etc. 311 312 @returns: The data that was got or None 313 """ 314 data = None 315 try: 316 data = source.module.get(uid) 317 except Exceptions.SyncronizeError, err: 318 log.warn("%s\n%s" % (err, traceback.format_exc())) 319 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_ERROR 320 return data
321
322 - def _put_data(self, source, sink, sourceData, sourceDataRid):
323 """ 324 Handles exceptions when putting data from source to sink. Default is 325 not to overwrite 326 327 @returns: True if the data was successfully put 328 """ 329 if sourceData != None: 330 try: 331 put_data(source, sink, sourceData, sourceDataRid, False) 332 return True 333 except Exceptions.SyncronizeError, err: 334 log.warn("%s\n%s" % (err, traceback.format_exc())) 335 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_ERROR 336 except Exceptions.SynchronizeConflictError, err: 337 comp = err.comparison 338 if comp == COMPARISON_EQUAL: 339 log.info("Skipping %s (Equal)" % sourceData) 340 else: 341 assert(err.fromData == sourceData) 342 self._apply_conflict_policy(source, sink, err.comparison, sourceData, sourceDataRid, err.toData, err.toData.get_rid()) 343 else: 344 log.info("Could not put data: Was None") 345 346 return False
347
348 - def _convert_data(self, source, sink, data):
349 """ 350 Converts data into a format acceptable for sink, handling exceptions, etc. 351 """ 352 newdata = None 353 try: 354 newdata = self.typeConverter.convert(source.get_output_type(), sink.get_input_type(), data) 355 except Exceptions.ConversionDoesntExistError, err: 356 log.warn("Error performing conversion:\n%s" % err) 357 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_SKIPPED 358 except Exceptions.ConversionError, err: 359 log.warn("Error performing conversion:\n%s" % err) 360 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_ERROR 361 except Exception: 362 log.critical("UNKNOWN CONVERSION ERROR\n%s" % traceback.format_exc()) 363 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_ERROR 364 return newdata
365
366 - def _apply_deleted_policy(self, sourceWrapper, sourceDataLUID, sinkWrapper, sinkDataLUID):
367 """ 368 Applies user policy when data has been deleted from source. 369 sourceDataLUID is the original UID of the data that has been deleted 370 sinkDataLUID is the uid of the data in sink that should now be deleted 371 """ 372 if self.cond.get_policy("deleted") == "skip": 373 log.debug("Deleted Policy: Skipping") 374 elif self.cond.get_policy("deleted") == "ask": 375 log.debug("Deleted Policy: Ask") 376 377 #FIXME: Delete should be handled differently from conflict 378 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT 379 380 #check if the source is visually on the left of the sink 381 if self.source == sourceWrapper: 382 #it is on the left 383 #dont support copying back yet 384 #(CONFLICT_COPY_SINK_TO_SOURCE,CONFLICT_SKIP) 385 validResolveChoices = (CONFLICT_DELETE, CONFLICT_SKIP) 386 else: 387 #dont support copying back yet 388 #(CONFLICT_SKIP,CONFLICT_COPY_SINK_TO_SOURCE) 389 validResolveChoices = (CONFLICT_DELETE, CONFLICT_SKIP) 390 391 sourceData = DeletedData(sourceDataLUID) 392 sinkData = DeletedData(sinkDataLUID) 393 c = Conflict( 394 self.cond, #the conduit this conflict belongs to 395 sourceWrapper, #datasource wrapper 396 sourceData, #from data 397 sourceData.get_rid(), #from data rid 398 sinkWrapper, #datasink wrapper 399 sinkData, #to data 400 sinkData.get_rid(), #to data rid 401 validResolveChoices, #valid resolve choices 402 True #This conflict is a deletion 403 ) 404 self.cond.emit_conflict(c) 405 406 elif self.cond.get_policy("deleted") == "replace": 407 log.debug("Deleted Policy: Delete") 408 #FIXME: Delete should be handled differently from conflict 409 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT 410 delete_data(sourceWrapper, sinkWrapper, sinkDataLUID)
411
412 - def _apply_conflict_policy(self, sourceWrapper, sinkWrapper, comparison, fromData, fromDataRid, toData, toDataRid):
413 """ 414 Applies user policy when a put() has failed. This may mean emitting 415 the conflict up to the GUI or skipping altogether 416 """ 417 if self.cond.get_policy("conflict") == "skip": 418 log.debug("Conflict Policy: Skipping") 419 elif self.cond.get_policy("conflict") == "ask": 420 log.debug("Conflict Policy: Ask") 421 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT 422 423 if sourceWrapper.module_type in ["twoway", "sink"]: 424 #in twoway case the user can copy back 425 avail = (CONFLICT_SKIP,CONFLICT_COPY_SOURCE_TO_SINK,CONFLICT_COPY_SINK_TO_SOURCE) 426 else: 427 avail = (CONFLICT_SKIP,CONFLICT_COPY_SOURCE_TO_SINK) 428 429 c = Conflict( 430 self.cond, 431 sourceWrapper, 432 fromData, 433 fromDataRid, 434 sinkWrapper, 435 toData, 436 toDataRid, 437 avail, 438 False 439 ) 440 self.cond.emit_conflict(c) 441 442 elif self.cond.get_policy("conflict") == "replace": 443 log.debug("Conflict Policy: Replace") 444 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT 445 446 try: 447 put_data(sourceWrapper, sinkWrapper, fromData, fromDataRid, True) 448 except: 449 log.warn("Forced Put Failed\n%s" % traceback.format_exc())
450
451 - def check_thread_not_cancelled(self, dataprovidersToCancel):
452 """ 453 Checks if the thread has been scheduled to be cancelled. If it has 454 then this function sets the status of the dataproviders to indicate 455 that they were stopped through a cancel operation. 456 """ 457 if self.cancelled: 458 for s in dataprovidersToCancel: 459 s.module.set_status(DataProvider.STATUS_DONE_SYNC_CANCELLED) 460 raise Exceptions.StopSync(self.state)
461
462 - def one_way_sync(self, source, sink):
463 """ 464 Transfers numItems of data from source to sink. 465 """ 466 log.info("Synchronizing %s |--> %s " % (source, sink)) 467 468 #get all the data 469 added, modified, deleted = self._get_changes(source, sink) 470 471 #handle deleted data 472 for d in deleted: 473 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(source.get_UID(), d, sink.get_UID()) 474 if matchingUID != None: 475 self._apply_deleted_policy(source, d, sink, matchingUID) 476 477 #one way sync treats added and modifed the same. Both get transferred 478 items = added + modified 479 numItems = len(items) 480 idx = 0 481 for i in items: 482 idx += 1.0 483 self.check_thread_not_cancelled([source, sink]) 484 485 #transfer the data 486 data = self._get_data(source, sink, i) 487 if data != None: 488 log.debug("1WAY PUT: %s (%s) -----> %s" % (source.name,data.get_UID(),sink.name)) 489 dataRid = data.get_rid() 490 data = self._convert_data(source, sink, data) 491 self._put_data(source, sink, data, dataRid) 492 493 #work out the percent complete 494 done = idx/(numItems*len(self.sinks)) + \ 495 float(self.sinks.index(sink))/len(self.sinks) 496 self._emit_progress(done, i)
497
498 - def two_way_sync(self, source, sink):
499 """ 500 Performs a two way sync from source to sink and back. 501 """ 502 def modified_and_deleted(dp1, modified, dp2, deleted): 503 found = [] 504 for i in modified[:]: 505 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(dp1.get_UID(), i, dp2.get_UID()) 506 if deleted.count(matchingUID) != 0: 507 log.debug("2WAY MOD+DEL: %s v %s" % (i, matchingUID)) 508 deleted.remove(matchingUID) 509 modified.remove(i) 510 found += [(dp2, matchingUID, dp1)] 511 return found
512 513 log.info("Synchronizing (Two Way) %s <--> %s " % (source, sink)) 514 #Need to do all the analysis before we touch the mapping db 515 toput = [] # (sourcedp, dataUID, sinkdp) 516 todelete = [] # (sourcedp, dataUID, sinkdp) 517 tocomp = [] # (dp1, data1UID, dp2, data2UID) 518 519 #PHASE ONE: CALCULATE WHAT NEEDS TO BE DONE 520 #get all the datauids 521 sourceAdded, sourceModified, sourceDeleted = self._get_changes(source, sink) 522 sinkAdded, sinkModified, sinkDeleted = self._get_changes(sink, source) 523 524 #added data can be put right away 525 toput += [(source, i, sink) for i in sourceAdded] 526 toput += [(sink, i, source) for i in sinkAdded] 527 528 #check first for data that had been simulatainously modified and deleted 529 todelete += modified_and_deleted(source, sourceModified, sink, sinkDeleted) 530 todelete += modified_and_deleted(sink, sinkModified, source, sourceDeleted) 531 532 #as can deleted data 533 todelete += [(source, i, sink) for i in sourceDeleted] 534 todelete += [(sink, i, source) for i in sinkDeleted] 535 536 #modified is a bit harder because we need to check if both side have 537 #been modified at the same time. First find items in both lists and seperate 538 #them out as they need to be compared. 539 for i in sourceModified[:]: 540 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(source.get_UID(), i, sink.get_UID()) 541 if sinkModified.count(matchingUID) != 0: 542 log.warn("2WAY BOTH MODIFIED: %s v %s" % (i, matchingUID)) 543 sourceModified.remove(i) 544 sinkModified.remove(matchingUID) 545 tocomp.append( (source, i, sink, matchingUID) ) 546 547 #all that remains in the original lists are to be put 548 toput += [(source, i, sink) for i in sourceModified] 549 toput += [(sink, i, source) for i in sinkModified] 550 551 total = len(toput) + len(todelete) + len(tocomp) 552 cnt = 0 553 554 #PHASE TWO: TRANSFER DATA 555 for sourcedp, dataUID, sinkdp in todelete: 556 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(sourcedp.get_UID(), dataUID, sinkdp.get_UID()) 557 log.debug("2WAY DEL: %s (%s)" % (sinkdp.name, matchingUID)) 558 if matchingUID != None: 559 self._apply_deleted_policy(sourcedp, dataUID, sinkdp, matchingUID) 560 561 #progress 562 cnt = cnt+1 563 self._emit_progress(float(cnt)/total, dataUID) 564 565 for sourcedp, dataUID, sinkdp in toput: 566 data = self._get_data(sourcedp, sinkdp, dataUID) 567 if data != None: 568 log.debug("2WAY PUT: %s (%s) -----> %s" % (sourcedp.name,dataUID,sinkdp.name)) 569 dataRid = data.get_rid() 570 data = self._convert_data(sourcedp, sinkdp, data) 571 self._put_data(sourcedp, sinkdp, data, dataRid) 572 573 cnt = cnt+1 574 self._emit_progress(float(cnt)/total, dataUID) 575 576 #FIXME: rename dp1 -> sourcedp1 and dp2 -> sinkdp2 because when both 577 #data is modified we might as well choost source -> sink as the comparison direction 578 for dp1, data1UID, dp2, data2UID in tocomp: 579 data1 = self._get_data(dp1, dp2, data1UID) 580 data1Rid = data1.get_rid() 581 data2 = self._get_data(dp2, dp1, data2UID) 582 data2Rid = data2.get_rid() 583 584 #Only need to convert one data to the other type 585 #choose to convert the source data for no reason other than convention 586 data1 = self._convert_data(dp1, dp2, data1) 587 588 log.debug("2WAY CMP: %s v %s" % (data1, data2)) 589 590 #compare the data 591 if data1 != None and data2 != None: 592 comparison = data1.compare(data2) 593 if comparison == conduit.datatypes.COMPARISON_OLDER: 594 self._apply_conflict_policy(dp2, dp1, COMPARISON_UNKNOWN, data2, data2Rid, data1, data1Rid) 595 else: 596 self._apply_conflict_policy(dp1, dp2, COMPARISON_UNKNOWN, data1, data1Rid, data2, data2Rid) 597 598 cnt = cnt+1 599 self._emit_progress(float(cnt)/total, data1UID)
600 601
602 - def run(self):
603 """ 604 The main syncronisation state machine. 605 606 Takes the conduit through the refresh->get,put,get,put->done 607 steps, setting its status at the appropriate time and performing 608 nicely in the case of errors. 609 610 It is also threaded so remember 611 1. Syncronization should not block the GUI 612 2. Due to pygtk/gtk single threadedness do not attempt to 613 communicate with the gui in any way other than signals, which 614 since Glib 2.10 are threadsafe. 615 616 If any error occurs during sync raise a L{conduit.Exceptions.StopSync} 617 exception otherwise exit normally 618 619 @raise Exceptions.StopSync: Raises a L{conduit.Exceptions.StopSync} 620 exception if the synchronisation state machine does not complete, in 621 some way, without success. 622 """ 623 try: 624 log.debug("Sync %s beginning. Slow: %s, Twoway: %s" % ( 625 self, 626 self.cond.do_slow_sync(), 627 self.cond.is_two_way() 628 )) 629 #Variable to exit the loop 630 finished = False 631 #Keep track of those sinks that didnt refresh ok 632 sinkDidntRefreshOK = {} 633 sinkDidntConfigureOK = {} 634 635 #Error handling is a bit complex because we need to send 636 #signals back to the gui for display, and because some errors 637 #are not fatal. If there is an error, set the 638 #'working' statuses immediately (Sync, Refresh) and set the 639 #Negative status (error, conflict, etc) at the end so they remain 640 #on the GUI and the user can see them. 641 #UNLESS the error is Fatal (causes us to throw a stopsync exceptiion) 642 #in which case set the error status immediately. 643 self.cond.emit("sync-started") 644 while not finished: 645 self.check_thread_not_cancelled([self.source] + self.sinks) 646 log.debug("Syncworker state %s" % self.state) 647 648 #Check dps have been configured 649 if self.state is self.CONFIGURE_STATE: 650 if not self.source.module.is_configured( 651 isSource=True, 652 isTwoWay=self.cond.is_two_way()): 653 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_NOT_CONFIGURED) 654 #Cannot continue if source not configured 655 raise Exceptions.StopSync(self.state) 656 657 for sink in self.sinks: 658 if not sink.module.is_configured( 659 isSource=False, 660 isTwoWay=self.cond.is_two_way()): 661 sinkDidntConfigureOK[sink] = True 662 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_NOT_CONFIGURED 663 #Need to have at least one successfully configured sink 664 if len(sinkDidntConfigureOK) < len(self.sinks): 665 #If this thread is a sync thread do a sync 666 self.state = self.REFRESH_STATE 667 else: 668 #We are finished 669 log.warn("Not enough configured datasinks") 670 self.aborted = True 671 self.state = self.DONE_STATE 672 673 #refresh state 674 elif self.state is self.REFRESH_STATE: 675 log.debug("Source Status = %s" % self.source.module.get_status()) 676 #Refresh the source 677 try: 678 self.source.module.refresh() 679 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK) 680 except Exceptions.RefreshError: 681 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_ERROR) 682 log.warn("RefreshError: %s" % self.source) 683 #Cannot continue with no source data 684 raise Exceptions.StopSync(self.state) 685 except Exception: 686 log.critical("UNKNOWN REFRESH ERROR: %s\n%s" % (self.source,traceback.format_exc())) 687 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_ERROR) 688 #Cannot continue with no source data 689 raise Exceptions.StopSync(self.state) 690 691 #Refresh all the sinks. At least one must refresh successfully 692 for sink in self.sinks: 693 self.check_thread_not_cancelled([self.source, sink]) 694 if sink not in sinkDidntConfigureOK: 695 try: 696 sink.module.refresh() 697 sink.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK) 698 except Exceptions.RefreshError: 699 log.warn("RefreshError: %s" % sink) 700 sinkDidntRefreshOK[sink] = True 701 self.sinkErrors[sink] = DataProvider.STATUS_DONE_REFRESH_ERROR 702 except Exception: 703 log.critical("UNKNOWN REFRESH ERROR: %s\n%s" % (sink,traceback.format_exc())) 704 sinkDidntRefreshOK[sink] = True 705 self.sinkErrors[sink] = DataProvider.STATUS_DONE_REFRESH_ERROR 706 707 #Need to have at least one successfully refreshed sink 708 if len(sinkDidntRefreshOK) < len(self.sinks): 709 #If this thread is a sync thread do a sync 710 if self.do_sync: 711 self.state = self.SYNC_STATE 712 else: 713 #This must be a refresh thread so we are done 714 self.state = self.DONE_STATE 715 else: 716 #We are finished 717 log.info("Not enough sinks refreshed") 718 self.aborted = True 719 self.state = self.DONE_STATE 720 721 #synchronize state 722 elif self.state is self.SYNC_STATE: 723 for sink in self.sinks: 724 self.check_thread_not_cancelled([self.source, sink]) 725 #only sync with those sinks that refresh'd OK 726 if sink not in sinkDidntRefreshOK: 727 try: 728 #now perform a one or two way sync depending on the user prefs 729 #and the capabilities of the dataprovider 730 if self.cond.is_two_way(): 731 #two way 732 self.two_way_sync(self.source, sink) 733 else: 734 #one way 735 self.one_way_sync(self.source, sink) 736 except Exceptions.SyncronizeFatalError, err: 737 log.warn("%s\n%s" % (err, traceback.format_exc())) 738 sink.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR) 739 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR) 740 #cannot continue with this source, sink pair 741 continue 742 except Exception: 743 log.critical("UNKNOWN SYNCHRONIZATION ERROR\n%s" % traceback.format_exc()) 744 sink.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR) 745 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR) 746 #cannot continue with this source, sink pair 747 continue 748 749 #Done go clean up 750 self.state = self.DONE_STATE 751 752 #Done successfully go home without raising exception 753 elif self.state is self.DONE_STATE: 754 #Now go back and check for errors, so that we can tell the GUI 755 #First update those sinks which had no errors 756 for sink in self.sinks: 757 if sink not in self.sinkErrors: 758 #Tell the gui if things went OK. 759 if self.do_sync: 760 sink.module.set_status(DataProvider.STATUS_DONE_SYNC_OK) 761 else: 762 sink.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK) 763 #Then those sinks which had some error 764 for sink in self.sinkErrors: 765 sink.module.set_status(self.sinkErrors[sink]) 766 767 #It is safe to put this call here because all other source related 768 #Errors raise a StopSync exception and the thread exits 769 if self.do_sync: 770 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_OK) 771 else: 772 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK) 773 774 #Exit thread 775 finished = True 776 777 except Exceptions.StopSync: 778 log.warn("Sync Aborted") 779 self.aborted = True 780 781 #Post sync cleanup and notification of sync success 782 error = self.did_sync_error() 783 conflict = self.did_sync_conflict() 784 for s in [self.source] + self.sinks: 785 s.module.finish(self.aborted, error, conflict) 786 conduit.GLOBALS.mappingDB.save() 787 self.cond.emit("sync-completed", self.aborted, error, conflict)
788
789 -class RefreshDataProviderWorker(_ThreadedWorker):
790 """ 791 Refreshes a single dataprovider, handling any errors, etc 792 """ 793
794 - def __init__(self, cond, dataproviderWrapper):
795 """ 796 @param dataproviderWrapper: The dp to refresh 797 """ 798 _ThreadedWorker.__init__(self) 799 self.dataproviderWrapper = dataproviderWrapper 800 self.cond = cond 801 802 self.setName("%s" % self.dataproviderWrapper)
803
804 - def run(self):
805 """ 806 The main refresh state machine. 807 808 Takes the conduit through the init->is_configured->refresh 809 steps, setting its status at the appropriate time and performing 810 nicely in the case of errors. 811 """ 812 try: 813 log.debug("Refresh %s beginning" % self) 814 self.cond.emit("sync-started") 815 816 if not self.dataproviderWrapper.module.is_configured( 817 isSource=self.cond.get_dataprovider_position(self.dataproviderWrapper)[0]==0, 818 isTwoWay=self.cond.is_two_way()): 819 self.dataproviderWrapper.module.set_status(DataProvider.STATUS_DONE_SYNC_NOT_CONFIGURED) 820 #Cannot continue if source not configured 821 raise Exceptions.StopSync(self.state) 822 823 self.state = self.REFRESH_STATE 824 try: 825 self.dataproviderWrapper.module.refresh() 826 self.dataproviderWrapper.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK) 827 except Exceptions.RefreshError: 828 self.dataproviderWrapper.module.set_status(DataProvider.STATUS_DONE_REFRESH_ERROR) 829 log.warn("RefreshError: %s" % self.dataproviderWrapper) 830 #Cannot continue with no source data 831 raise Exceptions.StopSync(self.state) 832 except Exception: 833 self.dataproviderWrapper.module.set_status(DataProvider.STATUS_DONE_REFRESH_ERROR) 834 log.critical("UNKNOWN REFRESH ERROR: %s\n%s" % (self.dataproviderWrapper,traceback.format_exc())) 835 #Cannot continue with no source data 836 raise Exceptions.StopSync(self.state) 837 838 except Exceptions.StopSync: 839 log.warn("Sync Aborted") 840 self.aborted = True 841 842 conduit.GLOBALS.mappingDB.save() 843 self.cond.emit("sync-completed", self.aborted, self.did_sync_error(), self.did_sync_conflict())
844
845 -class BlockingFunctionCallWorker(_ThreadedWorker):
846 """ 847 Calls the provided (blocking) function in a new thread. When 848 the function returns a sync-completed signal is sent 849 """
850 - def __init__(self, cond, *functions):