forked from baidu/tera
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster_setup.py
107 lines (95 loc) · 3.18 KB
/
cluster_setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
import argparse
import paramiko
import traceback
import socket
import time
import os
import zk
import hdfs
import tera
class SSH():
def __init__(self):
self.s = paramiko.SSHClient()
self.s.load_system_host_keys()
self.s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def run_cmd(self, ip, cmd):
try:
self.s.connect(ip)
stdin, stdout, stderr = self.s.exec_command(cmd)
self.s.close()
except:
traceback.print_exc()
return stdin, stdout, stderr
def parse_input():
parser = argparse.ArgumentParser()
parser.add_argument('--conf', type=str, help='A file describes the zk cluster')
parser.add_argument('--docker', type=str, default='lylei/tera:latest', help='ID of the docker image')
parser.add_argument('--zk', action='store_true', help='Launch zk')
parser.add_argument('--hdfs', action='store_true', help='Launch hdfs')
parser.add_argument('--tera', action='store_true', help='Launch tera')
args = parser.parse_args()
return args
def config(args):
config = {}
if args.conf is None:
local_ip = socket.gethostbyname(socket.gethostname())
log = os.path.expanduser('~')
config.update({"hdfs":1, "ip":local_ip, "tera":1, "zk":1, "log_prefix":log})
else:
config = json.load(open(args.conf, 'r'))
ip_list = config['ip'].split(':')
if config.has_key('log_prefix'):
log_prefix = config['log_prefix']
else:
log_prefix = os.path.expanduser('~')
zk_cluster = zk.ZkCluster(ip_list, config['zk'], log_prefix)
zk_cluster.populate_zk_cluster()
for z in zk_cluster.cluster:
print z.to_string()
hdfs_cluster = hdfs.HdfsCluster(ip_list, config['hdfs'], log_prefix)
ret = hdfs_cluster.populate_hdfs_cluster()
if ret is False:
exit(1)
for h in hdfs_cluster.cluster:
print h.to_string()
tera_cluster = tera.TeraCluster(ip_list, config['tera'], log_prefix)
tera_cluster.populate_tera_cluster()
for t in tera_cluster.cluster:
print t.to_string()
return zk_cluster, hdfs_cluster, tera_cluster
def start_zk(args, zk_cluster, s):
if (args.hdfs or args.tera) and not args.zk:
return
for zk_instance in zk_cluster.cluster:
#print zk_instance.to_string()
cmd = zk_instance.to_cmd(' '.join(zk_cluster.ip_zk), args.docker)
print cmd
s.run_cmd(zk_instance.ip, cmd)
def start_hdfs(args, hdfs_cluster, s):
if (args.zk or args.tera) and not args.hdfs:
return
for hdfs_instance in hdfs_cluster.cluster:
#print hdfs_instance.to_string()
cmd = hdfs_instance.to_cmd(args.docker, hdfs_cluster.master_ip, ' '.join(hdfs_cluster.slave_ip))
print cmd
s.run_cmd(hdfs_instance.ip, cmd)
def start_tera(args, tera_cluster, zk_cluster, hdfs_cluster, s):
if (args.zk or args.hdfs) and not args.tera:
return
for tera_instance in tera_cluster.cluster:
#print tera_instance.to_string()
cmd = tera_instance.to_cmd(args.docker, ','.join(zk_cluster.ip_tera), hdfs_cluster.master_ip, ':'.join(hdfs_cluster.slave_ip))
print cmd
if tera_instance.mode == 'master':
time.sleep(5)
s.run_cmd(tera_instance.ip, cmd)
def main():
args = parse_input()
zk_cluster, hdfs_cluster, tera_cluster = config(args)
s = SSH()
start_zk(args, zk_cluster, s)
start_hdfs(args, hdfs_cluster, s)
start_tera(args, tera_cluster, zk_cluster, hdfs_cluster, s)
if __name__ == '__main__':
main()