16 def match(self, dummy):
20 def create_table(self, cols, name, type, unique):
21 create = 'create table if not exists %s (id int %s' % (name, unique)
23 sorted = [None] * len(cols)
24 for x in cols.iterkeys():
27 create += (', %s %s' % (x, type))
31 class EmitMysqlInfile(EmitBase):
32 tmpfile_mode = stat.S_IREAD | stat.S_IROTH | stat.S_IWUSR
34 def ex(self, args, tab, fname):
37 stmt = """load data infile '%s' into table %s fields terminated by ';'""" % (fname, tab)
38 conn = MySQLdb.connect(**args)
45 def __init__(self, options, ctxcols, evcols):
48 args['passwd'] = options.password
50 options.host = 'localhost'
51 args['user'] = options.user
52 args['host'] = options.host
53 args['db'] = options.database
55 self.conn = MySQLdb.connect(**args)
56 self.ctxcols = ctxcols
58 self.options = options
60 params = (tempfile.gettempdir(), os.sep, os.getpid())
61 self.evfifo = '%s%sstatev_ev_%d' % params
62 self.ctxfifo = '%s%sstatev_ctx_%d' % params
64 os.mkfifo(self.evfifo)
65 os.mkfifo(self.ctxfifo)
67 os.chmod(self.evfifo, self.tmpfile_mode)
68 os.chmod(self.ctxfifo, self.tmpfile_mode)
70 c = self.conn.cursor()
71 c.execute('drop table if exists ev')
72 c.execute('drop table if exists ctx')
73 c.execute(self.create_table(self.ctxcols, 'ctx', 'char(80)', 'unique'))
74 c.execute(self.create_table(self.evcols, 'ev', 'double default null', ''))
80 self.pidev = self.ex(args, 'ev', self.evfifo)
81 self.pidctx = self.ex(args, 'ctx', self.ctxfifo)
84 print "forked two mysql leechers: %d, %d" % (self.pidev, self.pidctx)
86 self.evfile = open(self.evfifo, 'w+t')
87 self.ctxfile = open(self.ctxfifo, 'w+t')
90 print 'fifo: %s, %o' % (self.evfile.name, os.stat(self.evfile.name).st_mode)
91 print 'fifo: %s, %o' % (self.ctxfile.name, os.stat(self.ctxfile.name).st_mode)
93 def ev(self, curr_id, evitems):
94 field = ['\N'] * len(self.evcols)
95 for key, val in evitems.iteritems():
96 index = self.evcols[key]
98 print >> self.evfile, ('%d;' % curr_id) + ';'.join(field)
100 def ctx(self, curr_id, ctxitems):
101 field = ['\N'] * len(self.ctxcols)
102 for key, val in ctxitems.iteritems():
103 index = self.ctxcols[key]
105 print >> self.ctxfile, ('%d;' % curr_id) + ';'.join(field)
111 os.waitpid(self.pidev, 0)
112 os.waitpid(self.pidctx, 0)
114 os.unlink(self.evfile.name)
115 os.unlink(self.ctxfile.name)
118 class EmitSqlite3(EmitBase):
119 def __init__(self, options, ctxcols, evcols):
120 if os.path.isfile(options.database):
121 os.unlink(options.database)
123 self.conn = sqlite3.connect(options.database)
124 self.conn.execute(self.create_table(ctxcols, 'ctx', 'text', 'unique'))
125 self.conn.execute(self.create_table(evcols, 'ev', 'double', ''))
127 n = max(len(ctxcols), len(evcols)) + 1
130 for i in xrange(0, n):
131 self.quests.append(','.join(q))
134 def ev(self, curr_id, evitems):
135 keys = ','.join(evitems.keys())
136 stmt = 'insert into ev (id, %s) values (%s)' % (keys, self.quests[len(evitems)])
137 self.conn.execute(stmt, (curr_id,) + tuple(evitems.values()))
139 def ctx(self, curr_id, ctxitems):
140 keys = ','.join(ctxitems.keys())
141 stmt = 'insert into ctx (id, %s) values (%s)' % (keys, self.quests[len(ctxitems)])
142 self.conn.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
148 engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysqlInfile }
149 def find_heads(self):
156 self.valid_keys = set()
158 for line in self.input():
161 ind = line.index(';', 2)
163 if not key in ctxcols:
164 ctxcols[key] = ctxind
168 ind = line.index(';', 2)
170 if self.filter.match(key):
172 if not key in evcols:
173 self.valid_keys.add(key)
177 return (ctxcols, evcols)
180 return fileinput.FileInput(files=self.files, openhook=fileinput.hook_compressed)
182 def fill_tables(self):
193 for line in self.input():
195 items = line.strip().split(';')
199 # flush the current events
201 self.emit.ev(curr_id, evcols)
210 idstack.append(curr_id)
213 self.emit.ctx(curr_id, ctxcols)
220 print "unmatched pop in line %d, push key %s, pop key: %s" % (lineno, key, popkey)
225 self.emit.ev(curr_id, evcols)
228 curr_id = idstack[-1]
234 if key in self.valid_keys:
236 evcols[key] = items[2]
239 prec = curr_event * 10 / self.n_events
242 print '%10d / %10d' % (curr_event, self.n_events)
245 parser = optparse.OptionParser('usage: %prog [options] <event file...>')
246 parser.add_option("-c", "--clean", dest="clean", help="delete tables in advance", action="store_true", default=False)
247 parser.add_option("-v", "--verbose", dest="verbose", help="verbose messages", action="store_true", default=False)
248 parser.add_option("-f", "--filter", dest="filter", help="regexp to filter event keys", metavar="REGEXP")
249 parser.add_option("-u", "--user", dest="user", help="user", metavar="USER")
250 parser.add_option("-H", "--host", dest="host", help="host", metavar="HOST")
251 parser.add_option("-p", "--password", dest="password", help="password", metavar="PASSWORD")
252 parser.add_option("-d", "--db", dest="database", help="database", metavar="DB")
253 parser.add_option("-e", "--engine", dest="engine", help="eingine", metavar="ENG", default='sqlite3')
254 (options, args) = parser.parse_args()
258 self.verbose = options.verbose
268 if not os.path.isfile(file):
269 print "cannot find input file %s" % (file, )
271 self.files.append(file)
273 if len(self.files) < 1:
274 print "no input file to process"
278 self.filter = re.compile(options.filter)
280 self.filter = DummyFilter()
282 if options.engine in self.engines:
283 engine = self.engines[options.engine]
285 print 'engine %s not found' % options.engine
286 print 'we offer: %s' % self.engines.keys()
290 print "determining schema..."
292 (ctxcols, evcols) = self.find_heads()
294 print "context schema:"
296 print "event schema:"
299 self.emit = engine(options, ctxcols, evcols)
302 print "filling tables..."
308 if __name__ == "__main__":