Package conduit :: Package utils :: Module Bluetooth
[hide private]

Source Code for Module conduit.utils.Bluetooth

 1  import threading 
 2  import logging 
 3  log = logging.getLogger("utils.Bluetooth") 
 4   
 5  try: 
 6      import bluetooth 
 7      BLUETOOTH_AVAILABLE = True 
 8   
9 - class _DeviceDiscovererFilter(bluetooth.DeviceDiscoverer):
10 - def __init__(self):
11 bluetooth.DeviceDiscoverer.__init__(self) 12 self._found = {}
13
14 - def device_discovered(self, address, device_class, name):
15 if not name: 16 name = bluetooth.lookup_name(address) 17 log.debug("Bluetooth Device Discovered: %s@%s" % (name,address)) 18 self._found[address] = (name, device_class)
19
20 - def inquiry_complete(self):
21 log.debug("Bluetooth Search Complete")
22
23 - def get_devices(self):
24 return self._found
25 26 except: 27 BLUETOOTH_AVAILABLE = False 28
29 - class _DeviceDiscovererFilter:
30 - def get_devices(self):
31 return {}
32 33 import conduit.utils.Thread as Thread 34 import conduit.utils.Singleton as Singleton 35
36 -def is_computer_class(device_class):
37 major_class = ( device_class & 0xf00 ) >> 8 38 return major_class == 1
39
40 -def is_phone_class(device_class):
41 major_class = ( device_class & 0xf00 ) >> 8 42 return major_class == 2
43
44 -class BluetoothSearcher(Singleton.Singleton, Thread.PauseCancelThread):
45 - def __init__(self):
46 Thread.PauseCancelThread.__init__(self) 47 self._disc = _DeviceDiscovererFilter() 48 self._cbs = {} 49 self._cblock = threading.Lock() 50 51 self.setDaemon(False) 52 self.start()
53
54 - def watch_for_devices(self, cb, class_check_func=is_phone_class):
55 self._cblock.acquire() 56 if cb not in self._cbs: 57 self._cbs[cb] = class_check_func 58 self._cblock.release()
59
60 - def call_callbacks(self, address, name, device_class):
61 self._cblock.acquire() 62 #check if any devices are the class 63 for cb, class_check_func in self._cbs.items(): 64 if class_check_func(device_class): 65 #call the registered callback 66 cb(address, name) 67 self._cblock.release()
68
69 - def get_devices(self):
70 return self._disc.get_devices()
71
72 - def run(self):
73 while self.is_cancelled() == False: 74 log.debug("Scanning..") 75 76 self._disc.find_devices() 77 self._disc.process_inquiry() 78 devices = self._disc.get_devices() 79 for address in devices: 80 self.call_callbacks(address, devices[address][0], devices[address][1]) 81 82 self.pause()
83