13 def match(self, dummy):
17 def create_table(self, cols, name, type, unique):
18 create = 'create table if not exists %s (id int %s' % (name, unique)
20 sorted = [None] * len(cols)
21 for x in cols.iterkeys():
24 create += (", `%s` %s" % (x, type))
28 class EmitMysqlInfile(EmitBase):
29 tmpfile_mode = stat.S_IREAD | stat.S_IROTH | stat.S_IWUSR
31 def execute(self, query):
32 c = self.conn.cursor()
37 def __init__(self, options, tables, ctxcols, evcols):
42 args['passwd'] = options.password
44 options.host = 'localhost'
45 args['user'] = options.user
46 args['host'] = options.host
47 args['db'] = options.database
49 self.conn = MySQLdb.connect(**args)
50 self.ctxcols = ctxcols
52 self.options = options
53 self.ctxtab = tables['ctx']
54 self.evtab = tables['ev']
56 params = (tempfile.gettempdir(), os.sep, os.getpid())
57 self.evfifo = '%s%sstatev_ev_%d' % params
58 self.ctxfifo = '%s%sstatev_ctx_%d' % params
60 os.mkfifo(self.evfifo)
61 os.mkfifo(self.ctxfifo)
63 os.chmod(self.evfifo, self.tmpfile_mode)
64 os.chmod(self.ctxfifo, self.tmpfile_mode)
66 self.execute('drop table if exists ' + self.evtab)
67 self.execute('drop table if exists ' + self.ctxtab)
68 table_ctx = self.create_table(self.ctxcols, self.ctxtab, 'char(80)', '')
69 self.execute(table_ctx)
70 table_ev = self.create_table(self.evcols, self.evtab, 'double default null', '')
71 self.execute(table_ev)
76 n = max(len(ctxcols), len(evcols)) + 1
79 for i in xrange(0, n):
80 self.quests.append(','.join(q))
83 def ev(self, curr_id, evitems):
86 for key in evitems.keys():
91 keys += "`%s`" % (key)
93 stmt = "insert into `%s` (id, %s) values (%s)" % (self.evtab, keys, self.quests[len(evitems)+1])
94 c = self.conn.cursor()
95 c.execute(stmt, (curr_id,) + tuple(evitems.values()))
97 def ctx(self, curr_id, ctxitems):
100 for key in ctxitems.keys():
105 keys += "`%s`" % (key)
107 stmt = "insert into `%s` (id, %s) values (%s)" % (self.ctxtab, keys, self.quests[len(ctxitems)+1])
108 c = self.conn.cursor()
109 c.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
114 class EmitSqlite3(EmitBase):
115 def __init__(self, options, tables, ctxcols, evcols):
118 if options.database == None:
119 print "Have to specify database (file-)name for sqlite"
122 if os.path.isfile(options.database):
123 os.unlink(options.database)
125 self.ctxtab = tables['ctx']
126 self.evtab = tables['ev']
127 self.conn = sqlite3.connect(options.database)
128 table_ctx = self.create_table(ctxcols, self.ctxtab, 'text', 'unique')
129 self.conn.execute(table_ctx)
130 self.conn.execute("CREATE INDEX IF NOT EXISTS ctxindex ON ctx(id)")
131 table_ev = self.create_table(evcols, self.evtab, 'double', '')
132 self.conn.execute(table_ev)
133 self.conn.execute("CREATE INDEX IF NOT EXISTS evindex ON ev(id)")
135 n = max(len(ctxcols), len(evcols)) + 1
138 for i in xrange(0, n):
139 self.quests.append(','.join(q))
142 def ev(self, curr_id, evitems):
145 for key in evitems.keys():
150 keys += "`%s`" % (key)
152 stmt = "insert into `%s` (id, %s) values (%s)" % (self.evtab, keys, self.quests[len(evitems)])
153 self.conn.execute(stmt, (curr_id,) + tuple(evitems.values()))
155 def ctx(self, curr_id, ctxitems):
158 for key in ctxitems.keys():
163 keys += "`%s`" % (key)
165 stmt = "insert into `%s` (id, %s) values (%s)" % (self.ctxtab, keys, self.quests[len(ctxitems)])
166 self.conn.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
172 engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysqlInfile }
173 def find_heads(self):
180 self.valid_keys = set()
186 ind = line.index(';', 2)
188 if not ctxcols.has_key(key):
189 ctxcols[key] = ctxind
193 ind = line.index(';', 2)
195 if self.filter.match(key):
197 if not evcols.has_key(key):
198 self.valid_keys.add(key)
202 return (ctxcols, evcols)
205 return fileinput.FileInput(files=self.files, openhook=fileinput.hook_compressed)
207 def fill_tables(self):
211 last_push_curr_id = 0
219 for line in self.input():
221 items = line.strip().split(';')
225 # flush the current events
227 self.emit.ev(last_push_curr_id, evcols)
235 last_push_curr_id = curr_id
237 idstack.append(curr_id)
240 self.emit.ctx(curr_id, ctxcols)
247 print "unmatched pop in line %d, push key %s, pop key: %s" % (lineno, key, popkey)
252 self.emit.ev(curr_id, evcols)
255 curr_id = idstack[-1]
261 if key in self.valid_keys:
263 evcols[key] = items[2]
266 prec = curr_event * 10 / self.n_events
269 print '%10d / %10d' % (curr_event, self.n_events)
272 parser = optparse.OptionParser('usage: %prog [options] <event file...>')
273 parser.add_option("-c", "--clean", dest="clean", help="delete tables in advance", action="store_true", default=False)
274 parser.add_option("-v", "--verbose", dest="verbose", help="verbose messages", action="store_true", default=False)
275 parser.add_option("-f", "--filter", dest="filter", help="regexp to filter event keys", metavar="REGEXP")
276 parser.add_option("-u", "--user", dest="user", help="user", metavar="USER")
277 parser.add_option("-H", "--host", dest="host", help="host", metavar="HOST")
278 parser.add_option("-p", "--password", dest="password", help="password", metavar="PASSWORD")
279 parser.add_option("-d", "--db", dest="database", help="database", metavar="DB")
280 parser.add_option("-e", "--engine", dest="engine", help="engine", metavar="ENG", default='sqlite3')
281 parser.add_option("-P", "--prefix", dest="prefix", help="table prefix", metavar="PREFIX", default='')
282 (options, args) = parser.parse_args()
286 self.verbose = options.verbose
289 tables['ctx'] = options.prefix + 'ctx'
290 tables['ev'] = options.prefix + 'ev'
300 if not os.path.isfile(file):
301 print "cannot find input file %s" % (file, )
303 self.files.append(file)
305 if len(self.files) < 1:
306 print "no input file to process"
310 self.filter = re.compile(options.filter)
312 self.filter = DummyFilter()
314 if options.engine in self.engines:
315 engine = self.engines[options.engine]
317 print 'engine %s not found' % options.engine
318 print 'we offer: %s' % self.engines.keys()
322 print "determining schema..."
324 (ctxcols, evcols) = self.find_heads()
326 print "context schema:"
328 print "event schema:"
333 self.emit = engine(options, tables, ctxcols, evcols)
336 print "filling tables..."
342 if __name__ == "__main__":