forked from drjasondavis/Mixpanel-Puller
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmixpanelS3
executable file
·128 lines (107 loc) · 4.55 KB
/
mixpanelS3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#! /usr/bin/env python
import argparse
import sys
import lib.mixpanel_data_puller as puller
import uuid
import subprocess
import datetime
import os
class Runner:
def get_date(self, date_delta):
return datetime.datetime.strftime(datetime.datetime.now() +
datetime.timedelta(date_delta), "%Y-%m-%d")
def create_base_args(self, parser):
def argument_with_environ_default(arg_name, env_arg, help):
try:
bucket = os.environ[env_arg]
parser.add_argument(arg_name, default=bucket, help=help)
except:
parser.add_argument(arg_name, required=True, help=help)
argument_with_environ_default('--bucket', 'S3_BUCKET', 'S3 bucket')
argument_with_environ_default('--apikey', 'MIXPANEL_KEY', 'mixpanel api key')
argument_with_environ_default('--apisecret', 'MIXPANEL_SECRET', 'mixpanel api secret')
parser.add_argument('--startdate', required=True, type=puller.parse_date,
help='start date')
parser.add_argument('--enddate',
default = self.get_date(-1),
required=False, type=puller.parse_date,
help='end date')
parser.add_argument('--tmpdir', default="/tmp",
help='Temporary directory')
parser.add_argument('--destdir', default='mixpanel-archive',
help='Destination directory')
def parse_args(self, argv):
parser = self.create_parser()
self.create_base_args(parser)
self.args = parser.parse_args()
self.bucket = self.args.bucket
if self.bucket[-1] != '/':
self.bucket += '/'
self.input_bucket = "%sinput/" % self.bucket
self.code_bucket = "%scode/" % self.bucket
self.emr_code_dir = "/mnt/code/"
self.output_bucket = "%s%s/" % (self.bucket, self.args.destdir)
def rm(self, filename):
self.run_command(['rm', '-f', filename])
def gzip(self, filename):
gz_filname = "%s.gz" % filename
self.run_command(['gzip', filename])
return gz_filname
def run_command(self, cmd):
print "Running: %s" % " ".join(cmd)
if self.args.dry:
return
exit_code = subprocess.call(cmd)
if exit_code != 0:
raise Exception("Error: Exit code %d found for command: %s" % (exit_code, cmd))
def put_s3_string_iter(self, string_iter, s3_filename, zip=False):
tmp_file = "%s/%s.txt" % (self.args.tmpdir, str(uuid.uuid1()))
f = open(tmp_file, 'w')
for string in string_iter:
f.write(string)
f.close()
if zip:
tmp_file = self.gzip(tmp_file)
s3_filename = "%s.gz" % s3_filename
print "Writing to s3"
self.put_s3_file(tmp_file, s3_filename)
self.rm(tmp_file)
def put_s3_string(self, string, s3_filename, zip=False):
def string_iter():
yield string
self.put_s3_string_iter(string_iter, s3_filename, zip)
def put_s3_file(self, filename, bucket):
if bucket.startswith("s3://"):
self.run_command(("s3cmd put -r %s %s" % (filename, bucket)).split())
else:
self.run_command(("s3cmd put -r %s s3://%s" % (filename, bucket)).split())
def date_iter(self, start_date, end_date):
while start_date <= end_date:
yield start_date
start_date += datetime.timedelta(days=1)
def create_parser(self):
parser = argparse.ArgumentParser(
description='Retrieve data from Mixpanel.' +
'Will use the MIXPANEL_KEY, MIXPANEL_SECRET ' +
'and S3_BUCKET environment variables if they are set.')
return parser
def pull_data(self, date):
return puller.pull(date, date, self.args.apikey, self.args.apisecret)
def pull_data_for_date_range(self):
start_date, end_date = self.args.startdate, self.args.enddate
for date in self.date_iter(start_date, end_date):
date = puller.stringify_date(date)
print "Pulling data for %s" % date
data_iter = self.pull_data(date)
s3_output_file = "%s%s" % (self.output_bucket, date)
self.put_s3_string_iter(data_iter, s3_output_file, zip=True)
assert (subprocess
.check_output("s3cmd --version", shell=True)
.startswith("s3cmd version")), \
"This script requires s3cmd tools to be installed"
def run(argv):
runner = Runner()
runner.parse_args(argv)
runner.pull_data_for_date_range()
if __name__ == '__main__':
run(sys.argv)