Package conduit :: Package modules :: Package PhoneModule :: Module PhoneModule
[hide private]

Source Code for Module conduit.modules.PhoneModule.PhoneModule

  1  import logging 
  2  log = logging.getLogger("modules.Phone") 
  3   
  4  import conduit.dataproviders.File as FileDataProvider 
  5  import conduit.dataproviders.DataProvider as DataProvider 
  6  import conduit.dataproviders.DataProviderCategory as DataProviderCategory 
  7  import conduit.utils as Utils 
  8  import conduit.Exceptions as Exceptions 
  9   
 10  try: 
 11      import bluetooth 
 12      MODULES = { 
 13      "PhoneFactory" : { "type": "dataprovider-factory" }, 
 14      } 
 15  except ImportError: 
 16      MODULES = {} 
 17      log.info("Phone support disabled (bluez/python-bluetooth not installed)") 
 18   
 19  Utils.dataprovider_add_dir_to_path(__file__, "") 
 20  import Gammu 
 21   
22 -class PhoneFactory(DataProvider.DataProviderFactory):
23 """ 24 Looks for phones connected via bluetooth 25 """
26 - def __init__(self, **kwargs):
27 DataProvider.DataProviderFactory.__init__(self, **kwargs) 28 self._cats = {} 29 self._phones = {} 30 31 #Scan multiple interfaces at once 32 self.threads = [] 33 if Gammu.GAMMU_SUPPORTED: 34 self.threads.append( 35 Gammu.Bluetooth(self._found_phone_callback) 36 #Gammu.Cable 37 )
38 #else: 39 # phonetooth based scan 40
41 - def _found_phone_callback(self, address, name, driver, info):
42 #Get/create the named phone category 43 if name not in self._cats: 44 self._cats[name] = DataProviderCategory.DataProviderCategory( 45 name, 46 "phone", 47 address) 48 category = self._cats[name] 49 50 #create the klass for controlling the phone 51 klass = None 52 if driver == "test": 53 self.emit_added( 54 klass=Test, 55 initargs=(address,), 56 category=category 57 ) 58 elif driver == "bluetooth": 59 #check it supports obex file transfer class for file dps 60 done = False 61 for i in ObexFileDataProvider.SUPPORTED_BLUETOOTH_CLASSES: 62 if done: break 63 for service in info.get('services',()): 64 if i in service['service-classes']: 65 self.emit_added( 66 klass=ObexFileDataProvider, 67 initargs=(address,), 68 category=category 69 ) 70 done = True 71 break 72 73 #check that gammu found a working connection to the phone 74 if info.get('connection',None): 75 if Gammu.GAMMU_SUPPORTED: 76 self.emit_added( 77 klass=Gammu.GammuDataProvider, 78 initargs=(address,info['connection']), 79 category=category 80 ) 81 #else phonetooth based contacts 82 # 83 84 else: 85 log.warn("No driver supports %s@%s" % (driver,address))
86
87 - def probe(self):
88 log.info("Starting Scan Threads") 89 for t in self.threads: 90 t.start()
91
92 - def quit(self):
93 log.info("Stopping Scan Threads") 94 for t in self.threads: 95 t.cancel()
96
97 -class ObexFileDataProvider(FileDataProvider.FolderTwoWay):
98 99 _name_ = "Pictures" 100 _configurable_ = False 101 102 #FIXME: Does gnomevfs-obexftp support obexpush also? 103 SUPPORTED_BLUETOOTH_CLASSES = ( 104 bluetooth.OBEX_FILETRANS_CLASS, 105 ) 106
107 - def __init__(self, address, *args):
108 FileDataProvider.FolderTwoWay.__init__( 109 self, 110 folder= "obex://[%s]" % address, 111 folderGroupName="Test", 112 includeHidden=False, 113 compareIgnoreMtime=False, 114 followSymlinks=False 115 ) 116 self.address = address
117 #FIXME: In the land of GIO, I think I need to gio-mount this 118 #location before I can do anything with it... 119
120 - def get_UID(self):
121 return self.address
122
123 -class Test(DataProvider.DataSource):
124 _name_ = "Test Phone" 125 _description_ = "Test Phone" 126 _module_type_ = "source" 127 _configurable_ = False
128 - def __init__(self, address, *args):
130
131 - def get_UID(self):
132 return ""
133