1 """
2 Holds class used for the actual synchronisation phase
3
4 Copyright: John Stowers, 2006
5 License: GPLv2
6 """
7
8 import traceback
9 import threading
10 import logging
11 log = logging.getLogger("Syncronization")
12
13
14 import conduit
15 import conduit.dataproviders.DataProvider as DataProvider
16 import conduit.Exceptions as Exceptions
17 import conduit.DeltaProvider as DeltaProvider
18
19 from conduit.Conflict import Conflict, CONFLICT_DELETE, CONFLICT_COPY_SOURCE_TO_SINK,CONFLICT_SKIP,CONFLICT_COPY_SINK_TO_SOURCE
20 from conduit.datatypes import DataType, Rid, COMPARISON_OLDER, COMPARISON_EQUAL, COMPARISON_NEWER, COMPARISON_UNKNOWN
21
22 -def put_data(source, sink, sourceData, sourceDataRid, overwrite):
47
60
62 """
63 Given a dictionary of relationships this class synchronizes
64 the relevant sinks and sources. If there is a conflict then this is
65 handled by the conflictResolver
66 """
68 """
69 Constructor.
70
71 Creates a dictionary of syncWorkers indexed by conduit
72 """
73 self.syncWorkers = {}
74 self.typeConverter = typeConverter
75
77 log.warn("Conduit already in queue (alive: %s)" % self.syncWorkers[cond].isAlive())
78
79 if self.syncWorkers[cond].isAlive():
80 log.warn("Cancelling thread")
81 self.syncWorkers[cond].cancel()
82 self.syncWorkers[cond].join()
83
85 if cond in self.syncWorkers:
86 log.debug("Deleting worker: %s" % worker)
87 del(self.syncWorkers[cond])
88
90 log.info("Setting global cancel flag")
91 conduit.GLOBALS.cancelled = False
92
93 log.debug("Starting worker: %s" % worker)
94 cond.connect("sync-completed", self._on_sync_completed, worker)
95 self.syncWorkers[cond] = worker
96 self.syncWorkers[cond].start()
97
99 """
100 Returns true if any conduit is currently undergoing a sync
101 """
102 for cond in self.syncWorkers:
103 if self.syncWorkers[cond].isAlive():
104 return True
105 return False
106
108 """
109 Returns true if cond is currently undergoing sync, refresh etc
110 """
111 return cond in self.syncWorkers and self.syncWorkers[cond].isAlive()
112
114 """
115 Cancels all threads and also joins() them. Will block
116 """
117 for c in self.syncWorkers:
118 self._cancel_sync_thread(c)
119
120 - def join_one(self, cond, timeout=None):
121 """
122 Blocks until the thread associated with the supplied conduit finishes
123 """
124 log.info("Waiting for thread to finish")
125 self.syncWorkers[cond].join(timeout)
126
128 """
129 Joins all threads. This function will block the calling thread
130 """
131 for c in self.syncWorkers:
132 self.syncWorkers[c].join(timeout)
133
153
161
163 if cond in self.syncWorkers:
164 log.info("Refresh already in progress")
165 self.join_one(cond)
166
167 threadedWorker = SyncWorker(self.typeConverter, cond, False)
168 self._start_worker_thread(cond, threadedWorker)
169
171 if cond in self.syncWorkers:
172 log.info("Sync already in progress")
173 self.join_one(cond)
174
175 threadedWorker = SyncWorker(self.typeConverter, cond, True)
176 self._start_worker_thread(cond, threadedWorker)
177
179 """
180 Returns True if the supplied conduit aborted (the sync did not complete
181 due to an unhandled exception, a SynchronizeFatalError or the conduit was
182 unsyncable (source did not refresh, etc)
183 """
184 return self.syncWorkers[cond].aborted
185
187 """
188 Returns True if the supplied conduit raised a non fatal
189 SynchronizeError during sync
190 """
191 return self.syncWorkers[cond].did_sync_error()
192
194 """
195 Returns True if the supplied conduit encountered a conflict during processing
196 """
197 return self.syncWorkers[cond].did_sync_conflict()
198
200 """
201 Aa python thread, Base class for refresh and syncronization
202 operations
203 """
204 CONFIGURE_STATE = 0
205 REFRESH_STATE = 1
206 SYNC_STATE = 2
207 DONE_STATE = 3
208
210 threading.Thread.__init__(self)
211
212
213
214 self.cancelled = False
215
216
217 self.aborted = False
218
219
220
221 self.sinkErrors = {}
222
223
224 self.state = self.CONFIGURE_STATE
225
227 """
228 Returns all the data from the source to the sink. If the dataprovider
229 implements get_changes() then this is called. Otherwise the dataprovider
230 is proxied using DeltaProvider
231
232 @returns: added, modified, deleted
233 """
234 try:
235 added, modified, deleted = source.module.get_changes()
236 except NotImplementedError:
237 delta = DeltaProvider.DeltaProvider(source, sink)
238 added, modified, deleted = delta.get_changes()
239
240 log.debug("%s Changes: New %s items\n%s" % (source.get_UID(), len(added), added))
241 log.debug("%s Changes: Modified %s items\n%s" % (source.get_UID(), len(modified), modified))
242 log.debug("%s Changes: Deleted %s items\n%s" % (source.get_UID(), len(deleted), deleted))
243
244
245 return added[:], modified[:], deleted[:]
246
248 """
249 Cancels the sync thread. Does not do so immediately but as soon as
250 possible.
251 """
252 self.cancelled = True
253
261
265
267 """
268 Class designed to be operated within a thread used to perform the
269 synchronization operation. Inherits from GObject because it uses
270 signals to communcate with the main GUI.
271
272 Operates on a per Conduit basis, so a single SyncWorker may synchronize
273 one source with many sinks within a single conduit
274 """
275
276 PROGRESS_UPDATE_THRESHOLD = 5.0/100
277
278 - def __init__(self, typeConverter, cond, do_sync):
279 _ThreadedWorker.__init__(self)
280 self.typeConverter = typeConverter
281 self.cond = cond
282 self.source = cond.datasource
283 self.sinks = cond.datasinks
284 self.do_sync = do_sync
285
286 self._progress = 0
287 self._progressUIDs = []
288
289
290 if self.cond.is_two_way():
291 self.setName("%s <--> %s" % (self.source, self.sinks[0]))
292 else:
293 self.setName("%s |--> %s" % (self.source, self.sinks))
294
296 """
297 Emits progress signals, if the elapsed progress since the last
298 call to this function is greater that 5%. This is necessary because
299 otherwise we starve the main loop with too frequent progress
300 events
301 """
302 self._progressUIDs.append(dataUID)
303 if (progress - self._progress) > self.PROGRESS_UPDATE_THRESHOLD or progress == 1.0:
304 self._progress = progress
305 self.cond.emit("sync-progress", self._progress, self._progressUIDs)
306 self._progressUIDs = []
307
309 """
310 Gets the data from source. Handles exceptions, etc.
311
312 @returns: The data that was got or None
313 """
314 data = None
315 try:
316 data = source.module.get(uid)
317 except Exceptions.SyncronizeError, err:
318 log.warn("%s\n%s" % (err, traceback.format_exc()))
319 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_ERROR
320 return data
321
322 - def _put_data(self, source, sink, sourceData, sourceDataRid):
323 """
324 Handles exceptions when putting data from source to sink. Default is
325 not to overwrite
326
327 @returns: True if the data was successfully put
328 """
329 if sourceData != None:
330 try:
331 put_data(source, sink, sourceData, sourceDataRid, False)
332 return True
333 except Exceptions.SyncronizeError, err:
334 log.warn("%s\n%s" % (err, traceback.format_exc()))
335 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_ERROR
336 except Exceptions.SynchronizeConflictError, err:
337 comp = err.comparison
338 if comp == COMPARISON_EQUAL:
339 log.info("Skipping %s (Equal)" % sourceData)
340 else:
341 assert(err.fromData == sourceData)
342 self._apply_conflict_policy(source, sink, err.comparison, sourceData, sourceDataRid, err.toData, err.toData.get_rid())
343 else:
344 log.info("Could not put data: Was None")
345
346 return False
347
365
367 """
368 Applies user policy when data has been deleted from source.
369 sourceDataLUID is the original UID of the data that has been deleted
370 sinkDataLUID is the uid of the data in sink that should now be deleted
371 """
372 if self.cond.get_policy("deleted") == "skip":
373 log.debug("Deleted Policy: Skipping")
374 elif self.cond.get_policy("deleted") == "ask":
375 log.debug("Deleted Policy: Ask")
376
377
378 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT
379
380
381 if self.source == sourceWrapper:
382
383
384
385 validResolveChoices = (CONFLICT_DELETE, CONFLICT_SKIP)
386 else:
387
388
389 validResolveChoices = (CONFLICT_DELETE, CONFLICT_SKIP)
390
391 sourceData = DeletedData(sourceDataLUID)
392 sinkData = DeletedData(sinkDataLUID)
393 c = Conflict(
394 self.cond,
395 sourceWrapper,
396 sourceData,
397 sourceData.get_rid(),
398 sinkWrapper,
399 sinkData,
400 sinkData.get_rid(),
401 validResolveChoices,
402 True
403 )
404 self.cond.emit_conflict(c)
405
406 elif self.cond.get_policy("deleted") == "replace":
407 log.debug("Deleted Policy: Delete")
408
409 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT
410 delete_data(sourceWrapper, sinkWrapper, sinkDataLUID)
411
412 - def _apply_conflict_policy(self, sourceWrapper, sinkWrapper, comparison, fromData, fromDataRid, toData, toDataRid):
413 """
414 Applies user policy when a put() has failed. This may mean emitting
415 the conflict up to the GUI or skipping altogether
416 """
417 if self.cond.get_policy("conflict") == "skip":
418 log.debug("Conflict Policy: Skipping")
419 elif self.cond.get_policy("conflict") == "ask":
420 log.debug("Conflict Policy: Ask")
421 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT
422
423 if sourceWrapper.module_type in ["twoway", "sink"]:
424
425 avail = (CONFLICT_SKIP,CONFLICT_COPY_SOURCE_TO_SINK,CONFLICT_COPY_SINK_TO_SOURCE)
426 else:
427 avail = (CONFLICT_SKIP,CONFLICT_COPY_SOURCE_TO_SINK)
428
429 c = Conflict(
430 self.cond,
431 sourceWrapper,
432 fromData,
433 fromDataRid,
434 sinkWrapper,
435 toData,
436 toDataRid,
437 avail,
438 False
439 )
440 self.cond.emit_conflict(c)
441
442 elif self.cond.get_policy("conflict") == "replace":
443 log.debug("Conflict Policy: Replace")
444 self.sinkErrors[sinkWrapper] = DataProvider.STATUS_DONE_SYNC_CONFLICT
445
446 try:
447 put_data(sourceWrapper, sinkWrapper, fromData, fromDataRid, True)
448 except:
449 log.warn("Forced Put Failed\n%s" % traceback.format_exc())
450
452 """
453 Checks if the thread has been scheduled to be cancelled. If it has
454 then this function sets the status of the dataproviders to indicate
455 that they were stopped through a cancel operation.
456 """
457 if self.cancelled:
458 for s in dataprovidersToCancel:
459 s.module.set_status(DataProvider.STATUS_DONE_SYNC_CANCELLED)
460 raise Exceptions.StopSync(self.state)
461
463 """
464 Transfers numItems of data from source to sink.
465 """
466 log.info("Synchronizing %s |--> %s " % (source, sink))
467
468
469 added, modified, deleted = self._get_changes(source, sink)
470
471
472 for d in deleted:
473 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(source.get_UID(), d, sink.get_UID())
474 if matchingUID != None:
475 self._apply_deleted_policy(source, d, sink, matchingUID)
476
477
478 items = added + modified
479 numItems = len(items)
480 idx = 0
481 for i in items:
482 idx += 1.0
483 self.check_thread_not_cancelled([source, sink])
484
485
486 data = self._get_data(source, sink, i)
487 if data != None:
488 log.debug("1WAY PUT: %s (%s) -----> %s" % (source.name,data.get_UID(),sink.name))
489 dataRid = data.get_rid()
490 data = self._convert_data(source, sink, data)
491 self._put_data(source, sink, data, dataRid)
492
493
494 done = idx/(numItems*len(self.sinks)) + \
495 float(self.sinks.index(sink))/len(self.sinks)
496 self._emit_progress(done, i)
497
499 """
500 Performs a two way sync from source to sink and back.
501 """
502 def modified_and_deleted(dp1, modified, dp2, deleted):
503 found = []
504 for i in modified[:]:
505 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(dp1.get_UID(), i, dp2.get_UID())
506 if deleted.count(matchingUID) != 0:
507 log.debug("2WAY MOD+DEL: %s v %s" % (i, matchingUID))
508 deleted.remove(matchingUID)
509 modified.remove(i)
510 found += [(dp2, matchingUID, dp1)]
511 return found
512
513 log.info("Synchronizing (Two Way) %s <--> %s " % (source, sink))
514
515 toput = []
516 todelete = []
517 tocomp = []
518
519
520
521 sourceAdded, sourceModified, sourceDeleted = self._get_changes(source, sink)
522 sinkAdded, sinkModified, sinkDeleted = self._get_changes(sink, source)
523
524
525 toput += [(source, i, sink) for i in sourceAdded]
526 toput += [(sink, i, source) for i in sinkAdded]
527
528
529 todelete += modified_and_deleted(source, sourceModified, sink, sinkDeleted)
530 todelete += modified_and_deleted(sink, sinkModified, source, sourceDeleted)
531
532
533 todelete += [(source, i, sink) for i in sourceDeleted]
534 todelete += [(sink, i, source) for i in sinkDeleted]
535
536
537
538
539 for i in sourceModified[:]:
540 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(source.get_UID(), i, sink.get_UID())
541 if sinkModified.count(matchingUID) != 0:
542 log.warn("2WAY BOTH MODIFIED: %s v %s" % (i, matchingUID))
543 sourceModified.remove(i)
544 sinkModified.remove(matchingUID)
545 tocomp.append( (source, i, sink, matchingUID) )
546
547
548 toput += [(source, i, sink) for i in sourceModified]
549 toput += [(sink, i, source) for i in sinkModified]
550
551 total = len(toput) + len(todelete) + len(tocomp)
552 cnt = 0
553
554
555 for sourcedp, dataUID, sinkdp in todelete:
556 matchingUID = conduit.GLOBALS.mappingDB.get_matching_UID(sourcedp.get_UID(), dataUID, sinkdp.get_UID())
557 log.debug("2WAY DEL: %s (%s)" % (sinkdp.name, matchingUID))
558 if matchingUID != None:
559 self._apply_deleted_policy(sourcedp, dataUID, sinkdp, matchingUID)
560
561
562 cnt = cnt+1
563 self._emit_progress(float(cnt)/total, dataUID)
564
565 for sourcedp, dataUID, sinkdp in toput:
566 data = self._get_data(sourcedp, sinkdp, dataUID)
567 if data != None:
568 log.debug("2WAY PUT: %s (%s) -----> %s" % (sourcedp.name,dataUID,sinkdp.name))
569 dataRid = data.get_rid()
570 data = self._convert_data(sourcedp, sinkdp, data)
571 self._put_data(sourcedp, sinkdp, data, dataRid)
572
573 cnt = cnt+1
574 self._emit_progress(float(cnt)/total, dataUID)
575
576
577
578 for dp1, data1UID, dp2, data2UID in tocomp:
579 data1 = self._get_data(dp1, dp2, data1UID)
580 data1Rid = data1.get_rid()
581 data2 = self._get_data(dp2, dp1, data2UID)
582 data2Rid = data2.get_rid()
583
584
585
586 data1 = self._convert_data(dp1, dp2, data1)
587
588 log.debug("2WAY CMP: %s v %s" % (data1, data2))
589
590
591 if data1 != None and data2 != None:
592 comparison = data1.compare(data2)
593 if comparison == conduit.datatypes.COMPARISON_OLDER:
594 self._apply_conflict_policy(dp2, dp1, COMPARISON_UNKNOWN, data2, data2Rid, data1, data1Rid)
595 else:
596 self._apply_conflict_policy(dp1, dp2, COMPARISON_UNKNOWN, data1, data1Rid, data2, data2Rid)
597
598 cnt = cnt+1
599 self._emit_progress(float(cnt)/total, data1UID)
600
601
603 """
604 The main syncronisation state machine.
605
606 Takes the conduit through the refresh->get,put,get,put->done
607 steps, setting its status at the appropriate time and performing
608 nicely in the case of errors.
609
610 It is also threaded so remember
611 1. Syncronization should not block the GUI
612 2. Due to pygtk/gtk single threadedness do not attempt to
613 communicate with the gui in any way other than signals, which
614 since Glib 2.10 are threadsafe.
615
616 If any error occurs during sync raise a L{conduit.Exceptions.StopSync}
617 exception otherwise exit normally
618
619 @raise Exceptions.StopSync: Raises a L{conduit.Exceptions.StopSync}
620 exception if the synchronisation state machine does not complete, in
621 some way, without success.
622 """
623 try:
624 log.debug("Sync %s beginning. Slow: %s, Twoway: %s" % (
625 self,
626 self.cond.do_slow_sync(),
627 self.cond.is_two_way()
628 ))
629
630 finished = False
631
632 sinkDidntRefreshOK = {}
633 sinkDidntConfigureOK = {}
634
635
636
637
638
639
640
641
642
643 self.cond.emit("sync-started")
644 while not finished:
645 self.check_thread_not_cancelled([self.source] + self.sinks)
646 log.debug("Syncworker state %s" % self.state)
647
648
649 if self.state is self.CONFIGURE_STATE:
650 if not self.source.module.is_configured(
651 isSource=True,
652 isTwoWay=self.cond.is_two_way()):
653 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_NOT_CONFIGURED)
654
655 raise Exceptions.StopSync(self.state)
656
657 for sink in self.sinks:
658 if not sink.module.is_configured(
659 isSource=False,
660 isTwoWay=self.cond.is_two_way()):
661 sinkDidntConfigureOK[sink] = True
662 self.sinkErrors[sink] = DataProvider.STATUS_DONE_SYNC_NOT_CONFIGURED
663
664 if len(sinkDidntConfigureOK) < len(self.sinks):
665
666 self.state = self.REFRESH_STATE
667 else:
668
669 log.warn("Not enough configured datasinks")
670 self.aborted = True
671 self.state = self.DONE_STATE
672
673
674 elif self.state is self.REFRESH_STATE:
675 log.debug("Source Status = %s" % self.source.module.get_status())
676
677 try:
678 self.source.module.refresh()
679 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK)
680 except Exceptions.RefreshError:
681 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_ERROR)
682 log.warn("RefreshError: %s" % self.source)
683
684 raise Exceptions.StopSync(self.state)
685 except Exception:
686 log.critical("UNKNOWN REFRESH ERROR: %s\n%s" % (self.source,traceback.format_exc()))
687 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_ERROR)
688
689 raise Exceptions.StopSync(self.state)
690
691
692 for sink in self.sinks:
693 self.check_thread_not_cancelled([self.source, sink])
694 if sink not in sinkDidntConfigureOK:
695 try:
696 sink.module.refresh()
697 sink.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK)
698 except Exceptions.RefreshError:
699 log.warn("RefreshError: %s" % sink)
700 sinkDidntRefreshOK[sink] = True
701 self.sinkErrors[sink] = DataProvider.STATUS_DONE_REFRESH_ERROR
702 except Exception:
703 log.critical("UNKNOWN REFRESH ERROR: %s\n%s" % (sink,traceback.format_exc()))
704 sinkDidntRefreshOK[sink] = True
705 self.sinkErrors[sink] = DataProvider.STATUS_DONE_REFRESH_ERROR
706
707
708 if len(sinkDidntRefreshOK) < len(self.sinks):
709
710 if self.do_sync:
711 self.state = self.SYNC_STATE
712 else:
713
714 self.state = self.DONE_STATE
715 else:
716
717 log.info("Not enough sinks refreshed")
718 self.aborted = True
719 self.state = self.DONE_STATE
720
721
722 elif self.state is self.SYNC_STATE:
723 for sink in self.sinks:
724 self.check_thread_not_cancelled([self.source, sink])
725
726 if sink not in sinkDidntRefreshOK:
727 try:
728
729
730 if self.cond.is_two_way():
731
732 self.two_way_sync(self.source, sink)
733 else:
734
735 self.one_way_sync(self.source, sink)
736 except Exceptions.SyncronizeFatalError, err:
737 log.warn("%s\n%s" % (err, traceback.format_exc()))
738 sink.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR)
739 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR)
740
741 continue
742 except Exception:
743 log.critical("UNKNOWN SYNCHRONIZATION ERROR\n%s" % traceback.format_exc())
744 sink.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR)
745 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_ERROR)
746
747 continue
748
749
750 self.state = self.DONE_STATE
751
752
753 elif self.state is self.DONE_STATE:
754
755
756 for sink in self.sinks:
757 if sink not in self.sinkErrors:
758
759 if self.do_sync:
760 sink.module.set_status(DataProvider.STATUS_DONE_SYNC_OK)
761 else:
762 sink.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK)
763
764 for sink in self.sinkErrors:
765 sink.module.set_status(self.sinkErrors[sink])
766
767
768
769 if self.do_sync:
770 self.source.module.set_status(DataProvider.STATUS_DONE_SYNC_OK)
771 else:
772 self.source.module.set_status(DataProvider.STATUS_DONE_REFRESH_OK)
773
774
775 finished = True
776
777 except Exceptions.StopSync:
778 log.warn("Sync Aborted")
779 self.aborted = True
780
781
782 error = self.did_sync_error()
783 conflict = self.did_sync_conflict()
784 for s in [self.source] + self.sinks:
785 s.module.finish(self.aborted, error, conflict)
786 conduit.GLOBALS.mappingDB.save()
787 self.cond.emit("sync-completed", self.aborted, error, conflict)
788
790 """
791 Refreshes a single dataprovider, handling any errors, etc
792 """
793
794 - def __init__(self, cond, dataproviderWrapper):
795 """
796 @param dataproviderWrapper: The dp to refresh
797 """
798 _ThreadedWorker.__init__(self)
799 self.dataproviderWrapper = dataproviderWrapper
800 self.cond = cond
801
802 self.setName("%s" % self.dataproviderWrapper)
803
844
846 """
847 Calls the provided (blocking) function in a new thread. When
848 the function returns a sync-completed signal is sent
849 """