return True
class EmitBase:
- def create_table(self, cols, name, type, unique):
- create = 'create table if not exists %s (id int %s' % (name, unique)
-
- sorted = [None] * len(cols)
- for x in cols.iterkeys():
- sorted[cols[x]] = x
- for x in sorted:
- create += (", `%s` %s" % (x, type))
- create += ');'
- return create
-
-class EmitMysqlInfile(EmitBase):
+ ctx_field_ids = {}
+ ev_field_ids = {}
+ types = {}
+
+ def create_table(self, cols, name, defaulttype, keytype, extra=""):
+ c = "create table if not exists `%s` (\n" % name
+ c += "\t`id` %s\n" % keytype
+
+ for x in cols:
+ name = x
+ type = self.types[defaulttype]
+ if x[0] == '$':
+ name = x[1:]
+ type = self.types["text"]
+ elif x[0] == '?':
+ name = x[1:]
+ type = self.types["bool"]
+
+ c += "\t,`%s` %s\n" % (name, type)
+ c += extra
+ c += ");"
+ self.execute(c)
+
+# Abstraction for mysql sql connection and sql syntax
+class EmitMysql(EmitBase):
tmpfile_mode = stat.S_IREAD | stat.S_IROTH | stat.S_IWUSR
- def execute(self, query):
- c = self.conn.cursor()
- print query + "\n";
- c.execute(query);
+ def execute(self, query, *args):
+ #print query + " %s\n" % str(tuple(args))
+ self.cursor.execute(query, *args);
self.conn.commit()
- def __init__(self, options, tables, ctxcols, evcols):
+ def connect(self, options):
import MySQLdb
args = dict()
args['db'] = options.database
self.conn = MySQLdb.connect(**args)
- self.ctxcols = ctxcols
- self.evcols = evcols
self.options = options
- self.ctxtab = tables['ctx']
- self.evtab = tables['ev']
+ self.cursor = self.conn.cursor()
- params = (tempfile.gettempdir(), os.sep, os.getpid())
- self.evfifo = '%s%sstatev_ev_%d' % params
- self.ctxfifo = '%s%sstatev_ctx_%d' % params
+ def __init__(self, options, ctxcols, evcols):
+ self.connect(options)
- os.mkfifo(self.evfifo)
- os.mkfifo(self.ctxfifo)
+ self.types["text"] = "varchar(80) default null";
+ self.types["data"] = "double default null";
+ self.types["bool"] = "bool";
- os.chmod(self.evfifo, self.tmpfile_mode)
- os.chmod(self.ctxfifo, self.tmpfile_mode)
+ self.ctxtab = options.prefix + "ctx"
+ self.evtab = options.prefix + "ev"
- self.execute('drop table if exists ' + self.evtab)
- self.execute('drop table if exists ' + self.ctxtab)
- table_ctx = self.create_table(self.ctxcols, self.ctxtab, 'char(80)', '')
- self.execute(table_ctx)
- table_ev = self.create_table(self.evcols, self.evtab, 'double default null', '')
- self.execute(table_ev)
+ if not options.update:
+ self.execute('drop table if exists `%s`' % self.evtab)
+ self.execute('drop table if exists `%s`' % self.ctxtab)
- if options.verbose:
- print 'go for gold'
+ self.create_table(ctxcols, self.ctxtab, "text", "int auto_increment", extra = ", PRIMARY KEY (`id`)")
+ self.create_table(evcols, self.evtab, "data", "int not null", extra = ", INDEX(`id`)")
+
+ keys = "id, " + ", ".join(evcols)
+ marks = ",".join(['%s'] * (len(evcols)+1))
+ self.evinsert = "insert into `%s` values (%s)" % (self.evtab, marks)
- n = max(len(ctxcols), len(evcols)) + 1
- q = []
- self.quests = []
- for i in xrange(0, n):
- self.quests.append(','.join(q))
- q.append('%s')
+ keys = ", ".join(ctxcols)
+ marks = ",".join(['%s'] * len(ctxcols))
+ self.ctxinsert = "insert into `%s` (%s) values (%s)" % (self.ctxtab, keys, marks)
def ev(self, curr_id, evitems):
- keys = ""
- first = True
- for key in evitems.keys():
- if first:
- first = False
- else:
- keys += ", "
- keys += "`%s`" % (key)
-
- stmt = "insert into `%s` (id, %s) values (%s)" % (self.evtab, keys, self.quests[len(evitems)+1])
- c = self.conn.cursor()
- c.execute(stmt, (curr_id,) + tuple(evitems.values()))
-
- def ctx(self, curr_id, ctxitems):
- keys = ""
- first = True
- for key in ctxitems.keys():
- if first:
- first = False
- else:
- keys += ", "
- keys += "`%s`" % (key)
+ self.execute(self.evinsert, (curr_id,) + tuple(evitems))
- stmt = "insert into `%s` (id, %s) values (%s)" % (self.ctxtab, keys, self.quests[len(ctxitems)+1])
- c = self.conn.cursor()
- c.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
+ def ctx(self, ctxitems):
+ self.execute(self.ctxinsert, tuple(ctxitems))
+ self.conn.commit()
+ id = self.cursor.lastrowid
+ return id
def commit(self):
self.conn.commit()
+# Abstraction for sqlite3 databases and sql syntax
class EmitSqlite3(EmitBase):
- def __init__(self, options, tables, ctxcols, evcols):
+ def execute(self, query, *args):
+ #print query + " %s\n" % str(tuple(args))
+ self.cursor.execute(query, *args)
+
+ def __init__(self, options, ctxcols, evcols):
import sqlite3
if options.database == None:
print "Have to specify database (file-)name for sqlite"
sys.exit(1)
- if os.path.isfile(options.database):
- os.unlink(options.database)
+ if not options.update:
+ if os.path.isfile(options.database):
+ os.unlink(options.database)
- self.ctxtab = tables['ctx']
- self.evtab = tables['ev']
self.conn = sqlite3.connect(options.database)
- table_ctx = self.create_table(ctxcols, self.ctxtab, 'text', 'unique')
- self.conn.execute(table_ctx)
- self.conn.execute("CREATE INDEX IF NOT EXISTS ctxindex ON ctx(id)")
- table_ev = self.create_table(evcols, self.evtab, 'double', '')
- self.conn.execute(table_ev)
- self.conn.execute("CREATE INDEX IF NOT EXISTS evindex ON ev(id)")
-
- n = max(len(ctxcols), len(evcols)) + 1
- q = ['?']
- self.quests = []
- for i in xrange(0, n):
- self.quests.append(','.join(q))
- q.append('?')
+ self.cursor = self.conn.cursor()
+
+ self.types["data"] = "double"
+ self.types["text"] = "text"
+ self.types["bool"] = "int"
+ self.ctxtab = options.prefix + "ctx"
+ self.evtab = options.prefix + "ev"
+
+ if not options.update:
+ self.create_table(ctxcols, self.ctxtab, "text", "integer primary key")
+ #self.execute("CREATE INDEX IF NOT EXISTS `%sindex` ON `%s`(id)"
+ # % (self.ctxtab, self.ctxtab))
+ self.create_table(evcols, self.evtab, "data", "int")
+ self.execute("CREATE INDEX IF NOT EXISTS `%sindex` ON `%s`(id)"
+ % (self.evtab, self.evtab))
+
+ keys = "id, " + ", ".join(evcols)
+ marks = ",".join(["?"] * (len(evcols)+1))
+ self.evinsert = "insert into `%s` values (%s)" % (self.evtab, marks)
+
+ keys = ", ".join(ctxcols)
+ marks = ",".join(["?"] * len(ctxcols))
+ self.ctxinsert = "insert into `%s` (%s) values (%s)" % (self.ctxtab, keys, marks)
+
+ self.nextid = 0
def ev(self, curr_id, evitems):
- keys = ""
- first = True
- for key in evitems.keys():
- if first:
- first = False
- else:
- keys += ", "
- keys += "`%s`" % (key)
-
- stmt = "insert into `%s` (id, %s) values (%s)" % (self.evtab, keys, self.quests[len(evitems)])
- self.conn.execute(stmt, (curr_id,) + tuple(evitems.values()))
-
- def ctx(self, curr_id, ctxitems):
- keys = ""
- first = True
- for key in ctxitems.keys():
- if first:
- first = False
- else:
- keys += ", "
- keys += "`%s`" % (key)
+ self.execute(self.evinsert, (curr_id,) + tuple(evitems))
- stmt = "insert into `%s` (id, %s) values (%s)" % (self.ctxtab, keys, self.quests[len(ctxitems)])
- self.conn.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
+ def ctx(self, ctxitems):
+ curr_id = self.nextid
+ self.nextid += 1
+ self.execute(self.ctxinsert, tuple(ctxitems))
+ self.conn.commit()
+ return self.cursor.lastrowid
def commit(self):
self.conn.commit()
class Conv:
- engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysqlInfile }
+ engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysql }
+
+ # Pass that determines event and context types
def find_heads(self):
n_ev = 0
ctxind = 0
evind = 0
ctxcols = dict()
evcols = dict()
+ linenr = 0
self.valid_keys = set()
inp = self.input()
for line in inp:
- if line[0] == 'P':
- ind = line.index(';', 2)
- key = line[2:ind]
- if not ctxcols.has_key(key):
- ctxcols[key] = ctxind
- ctxind += 1
-
- elif line[0] == 'E':
- ind = line.index(';', 2)
- key = line[2:ind]
- if self.filter.match(key):
- self.n_events += 1
+ linenr += 1
+ fields = line.strip().split(";")
+ if fields[0] == 'P':
+ if (len(fields)-1) % 2 != 0:
+ print "%s: Invalid number of fields after 'P'" % linenr
+
+ for i in range(1,len(fields),2):
+ key = fields[i]
+ if not ctxcols.has_key(key):
+ ctxcols[key] = ctxind
+ ctxind += 1
+
+ elif fields[0] == 'E':
+ if (len(fields)-1) % 2 != 0:
+ print "%s: Invalid number of fields after 'E'" % linenr
+
+ self.n_events += 1
+ for i in range(1,len(fields),2):
+ key = fields[i]
+ if not self.filter.match(key):
+ continue
+
if not evcols.has_key(key):
self.valid_keys.add(key)
evcols[key] = evind
evind += 1
+ self.ctxcols = ctxcols
+ self.evcols = evcols
return (ctxcols, evcols)
def input(self):
return fileinput.FileInput(files=self.files, openhook=fileinput.hook_compressed)
+ def flush_events(self, id):
+ isnull = True
+ for e in self.evvals:
+ if e != None:
+ isnull = False
+ break
+ if isnull:
+ return
+
+ self.emit.ev(id, self.evvals)
+ self.evvals = [None] * len(self.evvals)
+
+ def flush_ctx(self):
+ if not self.pushpending:
+ return
+ self.pushpending = False
+ self.curr_id = self.emit.ctx(self.ctxvals)
+
def fill_tables(self):
lineno = 0
ids = 0
- curr_id = 0
- last_push_curr_id = 0
+ self.curr_id = -1
keystack = []
idstack = []
curr_event = 0
last_prec = -1
- evcols = dict()
- ctxcols = dict()
+ self.pushpending = False
+ self.ctxvals = [None] * len(self.ctxcols)
+ self.evvals = [None] * len(self.evcols)
for line in self.input():
lineno += 1
- items = line.strip().split(';')
- op = items[0]
+ items = line.strip().split(';')
+ op = items[0]
+ # Push context command
if op == 'P':
- # flush the current events
- if len(evcols):
- self.emit.ev(last_push_curr_id, evcols)
- evcols.clear()
-
- # push the key
- key = items[1]
- val = items[2]
- keystack.append(key)
- curr_id = ids
- last_push_curr_id = curr_id
- ids += 1
- idstack.append(curr_id)
- ctxcols[key] = val
-
- self.emit.ctx(curr_id, ctxcols)
+ self.flush_events(self.curr_id)
+
+ # push the keys
+ for p in range(1,len(items),2):
+ key = items[p]
+ val = items[p+1]
+ keystack.append(key)
+ idstack.append(self.curr_id)
+
+ keyidx = self.ctxcols[key]
+ assert self.ctxvals[keyidx] == None
+ self.ctxvals[keyidx] = val
+ self.pushpending = True
+
+ # Pop context command
elif op == 'O':
- popkey = items[1]
- key = keystack.pop()
-
- if popkey != key:
- print "unmatched pop in line %d, push key %s, pop key: %s" % (lineno, key, popkey)
-
- idstack.pop()
- if len(idstack) > 0:
- if len(evcols) > 0:
- self.emit.ev(curr_id, evcols)
- evcols.clear()
- del ctxcols[key]
- curr_id = idstack[-1]
- else:
- curr_id = -1
+
+ # For now... we could optimize this
+ self.flush_ctx()
+
+ # We process fields in reverse order to makes O's match the
+ # order of previous P's
+ for p in range(len(items)-1,0,-1):
+ self.flush_events(self.curr_id)
+
+ popkey = items[p]
+ key = keystack.pop()
+ self.curr_id = idstack.pop()
+
+ if popkey != key:
+ print "unmatched pop in line %d, push key %s, pop key: %s" % (lineno, key, popkey)
+
+ keyidx = self.ctxcols[key]
+ assert self.ctxvals[keyidx] != None
+ self.ctxvals[keyidx] = None
elif op == 'E':
- key = items[1]
- if key in self.valid_keys:
- curr_event += 1
- evcols[key] = items[2]
+ curr_event += 1
+
+ self.flush_ctx()
- if self.verbose:
- prec = curr_event * 10 / self.n_events
- if prec > last_prec:
- last_prec = prec
- print '%10d / %10d' % (curr_event, self.n_events)
+ # Show that we make progress
+ if self.verbose:
+ prec = curr_event * 10 / self.n_events
+ if prec > last_prec:
+ last_prec = prec
+ print '%10d / %10d' % (curr_event, self.n_events)
+
+ for p in range(1,len(items),2):
+ key = items[p]
+ if key not in self.evcols:
+ continue
+
+ keyidx = self.evcols[key]
+ if self.evvals[keyidx] != None:
+ self.flush_events(self.curr_id)
+
+ value = items[p+1]
+ self.evvals[keyidx] = value
def __init__(self):
parser = optparse.OptionParser('usage: %prog [options] <event file...>')
- parser.add_option("-c", "--clean", dest="clean", help="delete tables in advance", action="store_true", default=False)
+ parser.add_option("", "--update", dest="update", help="update database instead of dropping all existing values", action="store_true", default=False)
parser.add_option("-v", "--verbose", dest="verbose", help="verbose messages", action="store_true", default=False)
parser.add_option("-f", "--filter", dest="filter", help="regexp to filter event keys", metavar="REGEXP")
parser.add_option("-u", "--user", dest="user", help="user", metavar="USER")
parser.add_option("-H", "--host", dest="host", help="host", metavar="HOST")
parser.add_option("-p", "--password", dest="password", help="password", metavar="PASSWORD")
parser.add_option("-d", "--db", dest="database", help="database", metavar="DB")
- parser.add_option("-e", "--engine", dest="engine", help="engine", metavar="ENG", default='sqlite3')
+ parser.add_option("-e", "--engine", dest="engine", help="engine (sqlite3, mysql)", metavar="ENG", default='sqlite3')
parser.add_option("-P", "--prefix", dest="prefix", help="table prefix", metavar="PREFIX", default='')
(options, args) = parser.parse_args()
self.stmts = dict()
self.verbose = options.verbose
- tables = dict()
- tables['ctx'] = options.prefix + 'ctx'
- tables['ev'] = options.prefix + 'ev'
-
if len(args) < 1:
parser.print_help()
sys.exit(1)
print ctxcols
print "event schema:"
print evcols
- print "tables:"
- print tables
- self.emit = engine(options, tables, ctxcols, evcols)
+ self.emit = engine(options, ctxcols, evcols)
if options.verbose:
print "filling tables..."