-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdemo-1-blivet
executable file
·126 lines (94 loc) · 3.95 KB
/
demo-1-blivet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/python3
import argparse
import os
import sys
from blivet import Blivet
from blivet.devices import LUKSDevice
from blivet.formats import get_format
from blivet.size import Size
EXTENT_SIZE = Size("8 MiB")
MAX_SWAP_PORTION = 0.1
MAX_SWAP_SIZE = Size("1 GiB")
SWAP_LV_NAME = "swap"
DATA_LV_NAME = "data"
SWAP_LABEL = "demoswap"
DATA_LABEL = "demodata"
LUKS_PASSPHRASE = "seekrit"
def get_vg_name():
demo_name = os.path.basename(sys.argv[0]).split(".")[0]
return demo_name.replace("-", "_")
def get_blivet_obj(device_paths):
""" Return a blivet.Blivet instance with disk filter appropriate for this demo. """
blivet_obj = Blivet()
blivet_obj.exclusive_disks = [os.path.basename(d) for d in device_paths]
blivet_obj.encryption_passphrase = LUKS_PASSPHRASE
blivet_obj.reset()
return blivet_obj
def find_devices(device_paths, blivet_obj):
""" Return a list of blivet.devices.StorageDevice instances for the specified paths. """
devices = []
for path in device_paths:
device = blivet_obj.devicetree.resolve_device(path)
if device is None:
sys.stderr.write("ERROR: failed to locate device '%s'\n" % path)
sys.stderr.write("Here's what we did find:\n%s" % str(blivet_obj.devicetree))
raise RuntimeError("failed to look up device %s" % path)
devices.append(device)
return devices
def cleanup(device_paths):
blivet_obj = get_blivet_obj(device_paths)
devices = find_devices(device_paths, blivet_obj)
for device in devices:
# wipe existing metadata from the device
blivet_obj.devicetree.recursive_remove(device, remove_device=False)
# write all of the changes to disk
blivet_obj.do_it()
print(str(blivet_obj.devicetree))
def demo_1(device_paths):
blivet_obj = get_blivet_obj(device_paths)
devices = find_devices(device_paths, blivet_obj)
for device in devices:
# wipe existing metadata from the device
blivet_obj.devicetree.recursive_remove(device, remove_device=False)
# format it as a PV
blivet_obj.format_device(device, get_format("lvmpv", device=device.path))
# Now create the VG
vg = blivet_obj.new_vg(name=get_vg_name(), pe_size=EXTENT_SIZE, parents=devices)
blivet_obj.create_device(vg)
# swap LV
swap_size = min(MAX_SWAP_SIZE, vg.free_space * MAX_SWAP_PORTION)
swap_lv = blivet_obj.new_lv(name=SWAP_LV_NAME,
parents=[vg],
size=swap_size,
fmt_type="swap",
fmt_args={"label": SWAP_LABEL})
blivet_obj.create_device(swap_lv)
# data LV
data_lv = blivet_obj.new_lv(name=DATA_LV_NAME,
parents=[vg],
size=vg.free_space,
fmt_type="luks",
fmt_args={"passphrase": LUKS_PASSPHRASE})
blivet_obj.create_device(data_lv)
luks_dev = LUKSDevice("luks-%s" % data_lv.name,
parents=data_lv,
fmt=get_format("xfs", label=DATA_LABEL))
blivet_obj.create_device(luks_dev)
# write all of the changes to disk
blivet_obj.do_it()
print(str(blivet_obj.devicetree))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("devices", nargs='+', help="A list of block device nodes to use.")
parser.add_argument("--cleanup", action='store_true')
args = parser.parse_args(sys.argv[1:])
if not all(os.path.exists(dev) for dev in args.devices):
sys.stderr.write("ERROR: devices must be full paths to writable device nodes (disk or loop)\n")
sys.exit(1)
ans = input("About to remove all existing metadata from %s. Is this okay? [y/N] " % (", ".join(args.devices),))
if not ans.lower().startswith('y'):
sys.exit(0)
if args.cleanup:
cleanup(args.devices)
else:
demo_1(args.devices)