Package conduit :: Module Database
[hide private]

Source Code for Module conduit.Database

  1  """ 
  2  Sqlite DB Abstraction layer and threadsafe wrapping around it. 
  3  Copyright (C) John Stowers 2007 <john.stowers@gmail.com> 
  4   
  5  GenericDB: 
  6  SQL based on http://vwdude.com/dropbox/pystore/ 
  7  Copyright (C) Christian Hergert 2007 <christian.hergert@gmail.com> 
  8   
  9  ThreadSafeGenericDB: 
 10  Wrapper based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/526618 
 11  Copyright (C) Louis RIVIERE 2007 
 12   
 13  You may redistribute it and/or modify it under the terms of the 
 14  GNU General Public License, as published by the Free Software 
 15  Foundation; either version 2 of the License, or (at your option) 
 16  any later version. 
 17    
 18  main.c is distributed in the hope that it will be useful, 
 19  but WITHOUT ANY WARRANTY; without even the implied warranty of 
 20  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
 21  See the GNU General Public License for more details. 
 22    
 23  You should have received a copy of the GNU General Public License 
 24  along with main.c.  If not, write to: 
 25   The Free Software Foundation, Inc., 
 26   51 Franklin Street, Fifth Floor 
 27   Boston, MA  02110-1301, USA. 
 28  """ 
 29  import gobject 
 30  import logging 
 31  log = logging.getLogger("Database") 
 32   
 33  #for generic db 
 34  try: 
 35      from sqlite3 import dbapi2 as sqlite 
 36  except ImportError: 
 37      from pysqlite2 import dbapi2 as sqlite 
 38   
 39  #for threadsafe db 
 40  from threading import Thread 
 41  from Queue import Queue 
 42   
 43  #for lru decorator 
 44  from collections import deque 
 45   
46 -def lru_cache(maxsize):
47 """ 48 Decorator applying a least-recently-used cache with the given maximum size. 49 50 Arguments to the cached function must be hashable. 51 Cache performance statistics stored in f.hits and f.misses. 52 """ 53 if maxsize == 0: 54 decorating_function = lambda x: x 55 else: 56 def decorating_function(f): 57 cache = {} # mapping of args to results 58 queue = deque() # order that keys have been accessed 59 refcount = {} # number of times each key is in the access queue 60 def wrapper(*args): 61 # localize variable access (ugly but fast) 62 _cache=cache; _len=len; _refcount=refcount; _maxsize=maxsize 63 queue_append=queue.append; queue_popleft = queue.popleft 64 65 # get cache entry or compute if not found 66 try: 67 result = _cache[args] 68 wrapper.hits += 1 69 except KeyError: 70 result = _cache[args] = f(*args) 71 wrapper.misses += 1 72 73 # record that this key was recently accessed 74 queue_append(args) 75 _refcount[args] = _refcount.get(args, 0) + 1 76 77 # Purge least recently accessed cache contents 78 while _len(_cache) > _maxsize: 79 k = queue_popleft() 80 _refcount[k] -= 1 81 if not _refcount[k]: 82 del _cache[k] 83 del _refcount[k] 84 85 # Periodically compact the queue by duplicate keys 86 if _len(queue) > _maxsize * 4: 87 for i in [None] * _len(queue): 88 k = queue_popleft() 89 if _refcount[k] == 1: 90 queue_append(k) 91 else: 92 _refcount[k] -= 1 93 assert len(queue) == len(cache) == len(refcount) == sum(refcount.itervalues()) 94 95 return result
96 wrapper.__doc__ = f.__doc__ 97 wrapper.__name__ = f.__name__ 98 wrapper.hits = wrapper.misses = 0 99 return wrapper 100 101 return decorating_function 102
103 -class GenericDB(gobject.GObject):
104 """ 105 GenericDB abstraction layer. 106 Supports select, update, delete, etc 107 """ 108 __gsignals__ = { 109 "row-inserted" : ( 110 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 111 gobject.TYPE_INT]), #row oid 112 "row-modified" : ( 113 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 114 gobject.TYPE_INT]), #row oid 115 "row-deleted" : ( 116 gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ 117 gobject.TYPE_INT]) #row oid 118 } 119 DEBUG = False
120 - def __init__(self, filename=":memory:", **kwargs):
121 gobject.GObject.__init__(self) 122 #dictionary of field names, key is table name 123 self.tables = {} 124 self.filename = filename 125 self.options = kwargs 126 127 self._open() 128 self._get_tables()
129
130 - def _open(self):
131 #Open the DB and set options 132 if self.options.get("detect_types",False): 133 self.db = sqlite.connect(self.filename, detect_types=sqlite.PARSE_DECLTYPES) 134 else: 135 self.db = sqlite.connect(self.filename) 136 self.db.isolation_level = self.options.get("isolation_level",None) 137 if self.options.get("row_by_name",False) == True: 138 self.db.row_factory = sqlite.Row 139 self.cur = self.db.cursor()
140
141 - def _get_tables(self):
142 #get the field names for all tables 143 for name, in self.cur.execute("SELECT name FROM sqlite_master WHERE type='table' and name != 'sqlite_sequence'"): 144 self.tables[str(name)] = [row[1] for row in self.cur.execute("PRAGMA table_info('%s')" % name) if row[1] != 'oid']
145
146 - def _build_insert_sql(self, table, *values):
147 assert(self.tables.has_key(table)) 148 assert(len(values) == len(self.get_fields(table))) 149 sql = "INSERT INTO %s(oid" % table 150 for f in self.get_fields(table): 151 sql = sql + ", %s" % f 152 sql = sql + ") VALUES (" 153 154 #add None to values so that oid autoincrements 155 values = (None,) + values 156 #add ? for each value (including oid), strip last , 157 sql = sql + ("?, "*len(values))[0:-2] + ")" 158 return sql,values
159
160 - def _build_update_sql(self, table, oid, *values, **kwargs):
161 assert(self.tables.has_key(table)) 162 if len(kwargs) > 0: 163 values = kwargs.values() 164 fields = kwargs.keys() 165 else: 166 fields = self.get_fields(table) 167 168 assert( len(values) == len(fields) ) 169 sql = "UPDATE %s SET " % table 170 for f in fields: 171 sql = sql + "%s=?," % f 172 #strip trailing , 173 sql = sql[0:-1] + " " 174 sql = sql + "WHERE oid = %s" % oid 175 return sql, values
176
177 - def _build_create_sql(self, table, fields, fieldtypes):
178 #the default type is TEXT 179 if len(fieldtypes) == 0: 180 fieldtypes = ('TEXT',) * len(fields) 181 182 sql = "CREATE TABLE %s (oid INTEGER PRIMARY KEY AUTOINCREMENT" % table 183 for i in range(0,len(fields)): 184 sql = sql + ", %s %s" % (fields[i],fieldtypes[i]) 185 sql = sql + ")" 186 return sql
187
188 - def execute(self, sql, args=()):
189 if self.DEBUG: log.debug(sql) 190 self.cur.execute(sql, args)
191
192 - def select(self, sql, args=()):
193 self.execute(sql, args) 194 for raw in self.cur: 195 yield raw
196
197 - def select_one(self, sql, args=()):
198 for i in self.select(sql, args): 199 return i
200
201 - def create(self, table, fields=(), fieldtypes=()):
202 sql = self._build_create_sql(table, fields, fieldtypes) 203 self.execute(sql) 204 205 #save the field names 206 self.tables[table] = fields
207
208 - def insert(self, table, values=()):
209 sql,values = self._build_insert_sql(table, *values) 210 self.execute(sql, values) 211 self.emit("row-inserted",int(self.cur.lastrowid)) 212 return self.cur.lastrowid
213
214 - def update(self, table, oid, values=(), **kwargs):
215 sql, values = self._build_update_sql(table, oid, *values, **kwargs) 216 self.execute(sql, values) 217 self.emit("row-modified", int(oid))
218
219 - def delete(self, table, oid):
220 assert(self.tables.has_key(table)) 221 self.emit("row-deleted", int(oid)) 222 sql = "DELETE from %s where oid=?" % table 223 self.execute(sql,(oid,))
224
225 - def save(self):
226 self.db.commit()
227
228 - def close(self):
229 self.cur.close() 230 self.db.close()
231
232 - def debug(self, width=70, printoid=True):
233 for table in self.tables: 234 fields = self.get_fields(table) 235 #Decide whether to print the oid or not 236 if printoid: 237 fields = ('oid',) + tuple(fields) 238 fieldIndices = range(0, len(fields)) 239 else: 240 fieldIndices = range(1, len(fields)) 241 242 MAX_WIDTH = width 243 FIELD_MAX_WIDTH = MAX_WIDTH/len(fields) 244 245 # Print a header. 246 padding = '-'*MAX_WIDTH 247 print padding + "\nTABLE: %s\n" % table + padding 248 249 for field in fields: 250 print field.ljust(FIELD_MAX_WIDTH) , 251 print "\n" + padding 252 253 # For each row, print the value of each field left-justified within 254 # the maximum possible width of that field. 255 for row in self.select("SELECT * from %s" % table): 256 for fieldIndex in fieldIndices: 257 fieldValue = str(row[fieldIndex]) 258 print fieldValue.ljust(FIELD_MAX_WIDTH) , 259 260 print
261
262 - def get_fields(self, table):
263 """ 264 Returns the number of fields in the table EXCLUDING oid 265 """ 266 assert(self.tables.has_key(table)) 267 return self.tables[table]
268
269 - def get_tables(self):
270 return self.tables.keys()
271
272 -class ThreadSafeGenericDB(Thread, GenericDB):
273 """ 274 Threadsafe wrapper around GenericDB Abstraction layer. Serializes all requests 275 into one thread using a queue 276 """
277 - def __init__(self, filename=":memory:", **kwargs):
278 GenericDB.__init__(self,filename,**kwargs) 279 Thread.__init__(self) 280 self.reqs=Queue() 281 self.stopped = False 282 self.start()
283
284 - def _open(self):
285 #open the db in the thread where it is used 286 pass
287
288 - def _get_tables(self):
289 db = sqlite.connect(self.filename) 290 cur = db.cursor() 291 #get the field names for all tables 292 for name, in cur.execute("SELECT name FROM sqlite_master WHERE type='table' and name != 'sqlite_sequence'"): 293 self.tables[str(name)] = [row[1] for row in cur.execute("PRAGMA table_info('%s')" % name) if row[1] != 'oid']
294
295 - def run(self):
296 GenericDB._open(self) 297 while not self.stopped: 298 req, args, res, operation = self.reqs.get() 299 if req=='--stop--': 300 self.stopped = True 301 elif req=='--save--': 302 self.db.commit() 303 else: 304 self.cur.execute(req, args) 305 306 #res is used to return a result to the caller 307 #in a blocking way 308 if res: 309 if operation == "SELECT": 310 for rec in self.cur: 311 res.put(rec) 312 res.put('--no more--') 313 elif operation == "INSERT": 314 res.put(self.cur.lastrowid) 315 else: 316 assert(False) 317 318 self.cur.close() 319 self.db.close()
320
321 - def execute(self, req, args=(), res=None, operation=""):
322 if self.DEBUG: log.debug(req) 323 if not self.stopped: 324 self.reqs.put((req, args, res, operation))
325
326 - def select(self, req, args=()):
327 res=Queue() 328 self.execute(req, args, res, "SELECT") 329 while not self.stopped: 330 rec=res.get() 331 if rec=='--no more--': break 332 yield rec
333
334 - def close(self):
335 self.execute('--stop--')
336
337 - def save(self):
338 self.execute('--save--')
339
340 - def insert(self, table, values=()):
341 sql,values = self._build_insert_sql(table, *values) 342 res=Queue() 343 self.execute(sql, values, res, "INSERT") 344 while not self.stopped: 345 newId = res.get() 346 self.emit("row-inserted",int(newId)) 347 return newId
348