From 58a8518c2fa0264e086e67983061434cdab48452 Mon Sep 17 00:00:00 2001 From: Sebastian Hack Date: Sat, 8 Sep 2007 16:13:24 +0000 Subject: [PATCH] Rewrote anything adaption to new statev [r15723] --- ir/be/test/statev_sql.py | 401 ++++++++++++++++++++++++++++----------- 1 file changed, 287 insertions(+), 114 deletions(-) diff --git a/ir/be/test/statev_sql.py b/ir/be/test/statev_sql.py index 36b25fce1..78fa7f2f2 100755 --- a/ir/be/test/statev_sql.py +++ b/ir/be/test/statev_sql.py @@ -1,128 +1,301 @@ #! /usr/bin/env python import sys -import fileinput import os import re -import sha +import time +import stat +import profile import sqlite3 +import MySQLdb +import fileinput +import tempfile +import optparse + +class DummyFilter: + def match(self, dummy): + return True + +class EmitBase: + def create_table(self, cols, name, type, unique): + create = 'create table if not exists %s (id int %s' % (name, unique) + + sorted = [None] * len(cols) + for x in cols.iterkeys(): + sorted[cols[x]] = x + for x in sorted: + create += (', %s %s' % (x, type)) + create += ');' + return create + +class EmitMysqlInfile(EmitBase): + tmpfile_mode = stat.S_IREAD | stat.S_IROTH | stat.S_IWUSR + + def ex(self, args, tab, fname): + res = os.fork() + if res == 0: + stmt = """load data infile '%s' into table %s fields terminated by ';'""" % (fname, tab) + conn = MySQLdb.connect(**args) + c = conn.cursor() + c.execute(stmt) + conn.commit() + sys.exit(0) + return res + + def __init__(self, options, ctxcols, evcols): + args = dict() + if options.password: + args['passwd'] = options.password + if not options.host: + options.host = 'localhost' + args['user'] = options.user + args['host'] = options.host + args['db'] = options.database + + self.conn = MySQLdb.connect(**args) + self.ctxcols = ctxcols + self.evcols = evcols + self.options = options + + params = (tempfile.gettempdir(), os.sep, os.getpid()) + self.evfifo = '%s%sstatev_ev_%d' % params + self.ctxfifo = '%s%sstatev_ctx_%d' % params + + os.mkfifo(self.evfifo) + os.mkfifo(self.ctxfifo) + + os.chmod(self.evfifo, self.tmpfile_mode) + os.chmod(self.ctxfifo, self.tmpfile_mode) + + c = self.conn.cursor() + c.execute('drop table if exists ev') + c.execute('drop table if exists ctx') + c.execute(self.create_table(self.ctxcols, 'ctx', 'char(80)', 'unique')) + c.execute(self.create_table(self.evcols, 'ev', 'double default null', '')) + self.conn.commit() + + if options.verbose: + print 'go for gold' + + self.pidev = self.ex(args, 'ev', self.evfifo) + self.pidctx = self.ex(args, 'ctx', self.ctxfifo) + + if options.verbose: + print "forked two mysql leechers: %d, %d" % (self.pidev, self.pidctx) + + self.evfile = open(self.evfifo, 'w+t') + self.ctxfile = open(self.ctxfifo, 'w+t') + + if options.verbose: + print 'fifo: %s, %o' % (self.evfile.name, os.stat(self.evfile.name).st_mode) + print 'fifo: %s, %o' % (self.ctxfile.name, os.stat(self.ctxfile.name).st_mode) + + def ev(self, curr_id, evitems): + field = ['\N'] * len(self.evcols) + for key, val in evitems.iteritems(): + index = self.evcols[key] + field[index] = val + print >> self.evfile, ('%d;' % curr_id) + ';'.join(field) + + def ctx(self, curr_id, ctxitems): + field = ['\N'] * len(self.ctxcols) + for key, val in ctxitems.iteritems(): + index = self.ctxcols[key] + field[index] = val + print >> self.ctxfile, ('%d;' % curr_id) + ';'.join(field) + + def commit(self): + self.evfile.close() + self.ctxfile.close() + + os.waitpid(self.pidev, 0) + os.waitpid(self.pidctx, 0) + + os.unlink(self.evfile.name) + os.unlink(self.ctxfile.name) + + +class EmitSqlite3(EmitBase): + def __init__(self, options, ctxcols, evcols): + if os.path.isfile(options.database): + os.unlink(options.database) + + self.conn = sqlite3.connect(options.database) + self.conn.execute(self.create_table(ctxcols, 'ctx', 'text', 'unique')) + self.conn.execute(self.create_table(evcols, 'ev', 'double', '')) + + q = [] + self.quests = [] + for i in xrange(0, max(len(ctxcols), len(evcols))): + self.quests.append(','.join(q)) + q.append('?') + + def ev(self, curr_id, evitems): + keys = ','.join(evitems.keys()) + stmt = 'insert into ev (id, %s) values (%s)' % (keys, self.quests[len(evitems) + 1]) + self.conn.execute(stmt, (curr_id,) + tuple(evitems.values())) -from optparse import OptionParser - -def ev_create_stmt(heads): - create = 'create table if not exists ev (id int, time int, timeev int' - for x in heads: - create += (', %s real' % (x,)) - create += ');' - return create - -def ctx_create_stmt(heads): - create = 'create table if not exists ctx (id int unique on conflict ignore' - for x in heads: - create += (', %s text' % (x,)) - create += ');' - return create - -def find_heads(file): - ctx_heads = set() - ev_heads = set() - for line in fileinput.input(file): - items = re.split('\s+', line) - if items[0] == 'E': - ev_heads.add(items[2]) - elif items[0] == 'P': - ctx_heads.add(items[2]) - return (ev_heads, ctx_heads) - -def fill_tables(conn, file): - curr_id = 0 - ids = dict() - valstack = [] - keystack = [] - for line in fileinput.input(file): - items = re.split('\s+', line) - op = items[0] - id = items[1] - - if op == 'P': - key = items[2] - val = items[3] - keystack.append(key) - valstack.append(val) - - dig = sha.new() - for i in xrange(0,len(keystack)): - dig.update(".") - dig.update(keystack[i]) - dig.update("=") - dig.update(valstack[i]) - - hash = dig.digest() - if hash in ids: - id = ids[hash] + def ctx(self, curr_id, ctxitems): + keys = ','.join(ctxitems.keys()) + stmt = 'insert into ctx (id, %s) values (%s)' % (keys, self.quests[len(ctxitems) + 1]) + self.conn.execute(stmt, (curr_id,) + tuple(ctxitems.values())) + + def commit(self): + self.conn.commit() + +class Conv: + engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysqlInfile } + def find_heads(self): + n_ev = 0 + ctxind = 0 + evind = 0 + ctxcols = dict() + evcols = dict() + + self.valid_keys = set() + + for line in self.input(): + heads = None + if line[0] == 'P': + ind = line.index(';', 2) + key = line[2:ind] + if not key in ctxcols: + ctxcols[key] = ctxind + ctxind += 1 + + elif line[0] == 'E': + ind = line.index(';', 2) + key = line[2:ind] + if self.filter.match(key): + self.n_events += 1 + if not key in evcols: + self.valid_keys.add(key) + evcols[key] = evind + evind += 1 + + return (ctxcols, evcols) + + def input(self): + return fileinput.FileInput(files=self.files, openhook=fileinput.hook_compressed) + + def fill_tables(self): + ids = 0 + curr_id = 0 + keystack = [] + idstack = [] + curr_event = 0 + last_prec = -1 + evcols = dict() + ctxcols = dict() + + for line in self.input(): + items = line.strip().split(';') + op = items[0] + + if op == 'P': + # flush the current events + if len(evcols): + self.emit.ev(curr_id, evcols) + evcols.clear() + + # push the key + key = items[1] + val = items[2] + keystack.append(key) + curr_id = ids + ids += 1 + idstack.append(curr_id) + ctxcols[key] = val + + self.emit.ctx(curr_id, ctxcols) + + elif op == 'O': + key = keystack.pop() + idstack.pop() + if len(idstack) > 0: + if len(evcols) > 0: + self.emit.ev(curr_id, evcols) + evcols.clear() + del ctxcols[key] + curr_id = idstack[-1] + else: + curr_id = -1 + + elif op == 'E': + key = items[1] + if key in self.valid_keys: + curr_event += 1 + evcols[key] = items[2] + + if self.verbose: + prec = curr_event * 10 / self.n_events + if prec > last_prec: + last_prec = prec + print '%10d / %10d' % (curr_event, self.n_events) + + def __init__(self): + parser = optparse.OptionParser('usage: %prog [options] ') + parser.add_option("-c", "--clean", dest="clean", help="delete tables in advance", action="store_true", default=False) + parser.add_option("-v", "--verbose", dest="verbose", help="verbose messages", action="store_true", default=False) + parser.add_option("-f", "--filter", dest="filter", help="regexp to filter event keys", metavar="REGEXP") + parser.add_option("-u", "--user", dest="user", help="user", metavar="USER") + parser.add_option("-H", "--host", dest="host", help="host", metavar="HOST") + parser.add_option("-p", "--password", dest="password", help="password", metavar="PASSWORD") + parser.add_option("-d", "--db", dest="database", help="database", metavar="DB") + parser.add_option("-e", "--engine", dest="engine", help="eingine", metavar="ENG", default='sqlite3') + (options, args) = parser.parse_args() + + self.n_events = 0 + self.stmts = dict() + self.verbose = options.verbose + + if len(args) < 1: + parser.print_help() + sys.exit(1) + + self.files = [] + files = args + + for file in files: + if not os.path.isfile(file): + print "cannot find input file %s" % (file, ) else: - id = curr_id - ids[hash] = curr_id - curr_id = curr_id + 1 - - stmt = 'insert into ctx (id' - for x in keystack: - stmt += ', ' + x - stmt += ') values (' + str(id) - for x in valstack: - stmt += ', \'' + x + '\'' - stmt += ');' - conn.execute(stmt) - - elif op == 'O': - keystack.pop() - valstack.pop() - - elif op == 'E': - key = items[2] - val = items[3] - time = items[4] - timeev = items[5] - t = (id, val, time, timeev) - stmt = 'insert into ev (id, %s, time, timeev) values (?, ?, ?, ?)' % (key,) - conn.execute(stmt, (id, val, time, timeev)) - - -def main(): - parser = OptionParser('usage: %prog [options] ') - parser.add_option("-c", "--clean", action="store_true", help="clean existing data base before adding data") - (options, args) = parser.parse_args() - - if len(args) < 2: - parser.print_help() - sys.exit(1) - - db = args[0] - file = args[1] - - if not os.path.isfile(file): - print "cannot find input file %s" % (file, ) - sys.exit(2) - - have_to_clean = 0 - if os.path.isfile(db): - if options.clean: - have_to_clean = 1 - else: - print "database %s already exists (use different name or use -c)" % (db, ) + self.files.append(file) + + if len(self.files) < 1: + print "no input file to process" sys.exit(3) - (ev_heads, ctx_heads) = find_heads(file) + if options.filter: + self.filter = re.compile(options.filter) + else: + self.filter = DummyFilter() + + if options.engine in self.engines: + engine = self.engines[options.engine] + else: + print 'engine %s not found' % options.engine + print 'we offer: %s' % self.engines.keys() + sys.exit(0) + + if options.verbose: + print "determining schema..." + + (ctxcols, evcols) = self.find_heads() + if options.verbose: + print "context schema:" + print ctxcols + print "event schema:" + print evcols - conn = sqlite3.connect(db) - if have_to_clean: - conn.execute("drop table ctx") - conn.execute("drop table ev") + self.emit = engine(options, ctxcols, evcols) - conn.execute(ev_create_stmt(ev_heads)) - conn.execute(ctx_create_stmt(ctx_heads)) - fill_tables(conn, file) - conn.commit() + if options.verbose: + print "filling tables..." + self.fill_tables() + if options.verbose: + print "comitting..." + self.emit.commit() if __name__ == "__main__": - main() + Conv() -- 2.20.1