13 def match(self, dummy):
17 def create_table(self, cols, name, type, unique):
18 create = 'create table if not exists %s (id int %s' % (name, unique)
20 sorted = [None] * len(cols)
21 for x in cols.iterkeys():
24 create += (", '%s' %s" % (x, type))
28 class EmitMysqlInfile(EmitBase):
29 tmpfile_mode = stat.S_IREAD | stat.S_IROTH | stat.S_IWUSR
31 def ex(self, args, tab, fname):
34 stmt = """load data infile '%s' into table %s fields terminated by ';'""" % (fname, tab)
35 conn = MySQLdb.connect(**args)
42 def __init__(self, options, tables, ctxcols, evcols):
47 args['passwd'] = options.password
49 options.host = 'localhost'
50 args['user'] = options.user
51 args['host'] = options.host
52 args['db'] = options.database
54 self.conn = MySQLdb.connect(**args)
55 self.ctxcols = ctxcols
57 self.options = options
58 self.ctxtab = tables['ctx']
59 self.evtab = tables['ev']
61 params = (tempfile.gettempdir(), os.sep, os.getpid())
62 self.evfifo = '%s%sstatev_ev_%d' % params
63 self.ctxfifo = '%s%sstatev_ctx_%d' % params
65 os.mkfifo(self.evfifo)
66 os.mkfifo(self.ctxfifo)
68 os.chmod(self.evfifo, self.tmpfile_mode)
69 os.chmod(self.ctxfifo, self.tmpfile_mode)
71 c = self.conn.cursor()
72 c.execute('drop table if exists ' + self.evtab)
73 c.execute('drop table if exists ' + self.ctxtab)
74 table_ctx = self.create_table(self.ctxcols, self.ctxtab, 'char(80)', 'unique')
76 table_ev = self.create_table(self.evcols, self.evtab, 'double default null', '')
83 self.pidev = self.ex(args, self.evtab, self.evfifo)
84 self.pidctx = self.ex(args, self.ctxtab, self.ctxfifo)
87 print "forked two mysql leechers: %d, %d" % (self.pidev, self.pidctx)
89 self.evfile = open(self.evfifo, 'w+t')
90 self.ctxfile = open(self.ctxfifo, 'w+t')
93 print 'fifo: %s, %o' % (self.evfile.name, os.stat(self.evfile.name).st_mode)
94 print 'fifo: %s, %o' % (self.ctxfile.name, os.stat(self.ctxfile.name).st_mode)
96 def ev(self, curr_id, evitems):
97 field = ['\N'] * len(self.evcols)
98 for key, val in evitems.iteritems():
99 index = self.evcols[key]
101 print >> self.evfile, ('%d;' % curr_id) + ';'.join(field)
103 def ctx(self, curr_id, ctxitems):
104 field = ['\N'] * len(self.ctxcols)
105 for key, val in ctxitems.iteritems():
106 index = self.ctxcols[key]
108 print >> self.ctxfile, ('%d;' % curr_id) + ';'.join(field)
114 os.waitpid(self.pidev, 0)
115 os.waitpid(self.pidctx, 0)
117 os.unlink(self.evfile.name)
118 os.unlink(self.ctxfile.name)
121 class EmitSqlite3(EmitBase):
122 def __init__(self, options, tables, ctxcols, evcols):
125 if options.database == None:
126 print "Have to specify database (file-)name for sqlite"
129 if os.path.isfile(options.database):
130 os.unlink(options.database)
132 self.ctxtab = tables['ctx']
133 self.evtab = tables['ev']
134 self.conn = sqlite3.connect(options.database)
135 table_ctx = self.create_table(ctxcols, self.ctxtab, 'text', 'unique')
136 self.conn.execute(table_ctx)
137 self.conn.execute("CREATE INDEX IF NOT EXISTS ctxindex ON ctx(id)")
138 table_ev = self.create_table(evcols, self.evtab, 'double', '')
139 self.conn.execute(table_ev)
140 self.conn.execute("CREATE INDEX IF NOT EXISTS evindex ON ev(id)")
142 n = max(len(ctxcols), len(evcols)) + 1
145 for i in xrange(0, n):
146 self.quests.append(','.join(q))
149 def ev(self, curr_id, evitems):
152 for key in evitems.keys():
157 keys += "'%s'" % (key)
159 stmt = "insert into '%s' (id, %s) values (%s)" % (self.evtab, keys, self.quests[len(evitems)])
160 self.conn.execute(stmt, (curr_id,) + tuple(evitems.values()))
162 def ctx(self, curr_id, ctxitems):
165 for key in ctxitems.keys():
170 keys += "'%s'" % (key)
172 stmt = "insert into '%s' (id, %s) values (%s)" % (self.ctxtab, keys, self.quests[len(ctxitems)])
173 self.conn.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
179 engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysqlInfile }
180 def find_heads(self):
187 self.valid_keys = set()
193 ind = line.index(';', 2)
195 if not ctxcols.has_key(key):
196 ctxcols[key] = ctxind
200 ind = line.index(';', 2)
202 if self.filter.match(key):
204 if not evcols.has_key(key):
205 self.valid_keys.add(key)
209 return (ctxcols, evcols)
212 return fileinput.FileInput(files=self.files, openhook=fileinput.hook_compressed)
214 def fill_tables(self):
218 last_push_curr_id = 0
226 for line in self.input():
228 items = line.strip().split(';')
232 # flush the current events
234 self.emit.ev(last_push_curr_id, evcols)
242 last_push_curr_id = curr_id
244 idstack.append(curr_id)
247 self.emit.ctx(curr_id, ctxcols)
254 print "unmatched pop in line %d, push key %s, pop key: %s" % (lineno, key, popkey)
259 self.emit.ev(curr_id, evcols)
262 curr_id = idstack[-1]
268 if key in self.valid_keys:
270 evcols[key] = items[2]
273 prec = curr_event * 10 / self.n_events
276 print '%10d / %10d' % (curr_event, self.n_events)
279 parser = optparse.OptionParser('usage: %prog [options] <event file...>')
280 parser.add_option("-c", "--clean", dest="clean", help="delete tables in advance", action="store_true", default=False)
281 parser.add_option("-v", "--verbose", dest="verbose", help="verbose messages", action="store_true", default=False)
282 parser.add_option("-f", "--filter", dest="filter", help="regexp to filter event keys", metavar="REGEXP")
283 parser.add_option("-u", "--user", dest="user", help="user", metavar="USER")
284 parser.add_option("-H", "--host", dest="host", help="host", metavar="HOST")
285 parser.add_option("-p", "--password", dest="password", help="password", metavar="PASSWORD")
286 parser.add_option("-d", "--db", dest="database", help="database", metavar="DB")
287 parser.add_option("-e", "--engine", dest="engine", help="engine", metavar="ENG", default='sqlite3')
288 parser.add_option("-P", "--prefix", dest="prefix", help="table prefix", metavar="PREFIX", default='')
289 (options, args) = parser.parse_args()
293 self.verbose = options.verbose
296 tables['ctx'] = options.prefix + 'ctx'
297 tables['ev'] = options.prefix + 'ev'
307 if not os.path.isfile(file):
308 print "cannot find input file %s" % (file, )
310 self.files.append(file)
312 if len(self.files) < 1:
313 print "no input file to process"
317 self.filter = re.compile(options.filter)
319 self.filter = DummyFilter()
321 if options.engine in self.engines:
322 engine = self.engines[options.engine]
324 print 'engine %s not found' % options.engine
325 print 'we offer: %s' % self.engines.keys()
329 print "determining schema..."
331 (ctxcols, evcols) = self.find_heads()
333 print "context schema:"
335 print "event schema:"
340 self.emit = engine(options, tables, ctxcols, evcols)
343 print "filling tables..."
349 if __name__ == "__main__":