-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathgit.py
231 lines (176 loc) · 7.27 KB
/
git.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# SPDX-FileCopyrightText: 2020–2022 Felix Gruber <[email protected]>
#
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
import os
from pathlib import Path
import re
import subprocess
from typing import Optional, Union
class BaseGit(metaclass=ABCMeta):
@abstractmethod
def working_directory_is_clean(self) -> bool: pass
@abstractmethod
def has_staged_files(self) -> bool: pass
@abstractmethod
def current_branch(self) -> str: pass
@abstractmethod
def change_branch(self, branch: str) -> None: pass
@abstractmethod
def add_file(self, file: Union[Path, str]) -> None: pass
@abstractmethod
def add_files(self, files: Iterable[Union[Path, str]]) -> None: pass
@abstractmethod
def add_files_to_annex(self, files: Sequence[Union[Path, str]]) -> None:
pass
@abstractmethod
def commit(self, message: str) -> None: pass
@abstractmethod
def merge(self, other_branch: str) -> None: pass
@abstractmethod
def reset_index_and_working_directory(self) -> None: pass
class Git(BaseGit):
def __init__(self,
work_tree: Union[Path, str],
git_dir: Union[Path, str]):
self.git_command = ['git', f'--work-tree={work_tree}',
f'--git-dir={git_dir}']
self._check_for_annex(git_dir)
@classmethod
def create(cls, work_tree: Union[Path, str]) -> Git:
"""Create a new git repository."""
work_tree = Path(work_tree)
git_dir = work_tree / '.git'
git = cls(work_tree=work_tree, git_dir=git_dir)
git._run_git_command(['init', str(work_tree)])
git._check_for_annex(git_dir)
return git
def has_annex(self) -> bool:
return self._has_annex
def _check_for_annex(self, git_dir: Union[Path, str]) -> None:
# was git annex initialized in the Git repository?
self._has_annex = Path(git_dir, 'annex').exists()
def _run_git_command(self,
args: list[str],
input: Optional[str] = None) -> str:
return subprocess.run(self.git_command + args,
capture_output=True, encoding='UTF8',
input=input, check=True).stdout
def working_directory_is_clean(self) -> bool:
return not self._has_files_with_any_of_these_status_flags('MADRCU')
def has_staged_files(self) -> bool:
return self._has_files_with_any_of_these_status_flags('ADRC')
def _has_files_with_any_of_these_status_flags(self,
flags: Iterable[str]) -> bool:
status = self._run_git_command(['status', '--porcelain'])
change_flags = set(flags)
for line in status.split('\n')[:-1]:
stat = line.split()[0]
if not change_flags.isdisjoint(stat):
return True
return False
def current_branch(self) -> str:
return self._run_git_command(['branch', '--show-current']).strip()
def change_branch(self, branch: str) -> None:
try:
self._run_git_command(['switch', branch])
except subprocess.CalledProcessError as e:
m = re.match(r'fatal: invalid reference: (.*)\n$', e.stderr)
if m is None:
raise e
assert m.group(1) == branch
raise GitError(
f'Trying to switch to non-existent branch: {branch}.'
) from None
def create_branch(self,
branch: str,
start_point: Optional[str] = None,
*,
switch: bool = False,
) -> None:
start = [] if start_point is None else [start_point]
if not switch:
self._run_git_command(['branch', branch, *start])
else:
self._run_git_command(['switch', '-c', branch, *start])
def add_file(self, file: Union[Path, str]) -> None:
self._run_git_command(['add', str(file)])
def add_files(self, files: Iterable[Union[Path, str]]) -> None:
self._run_git_command(['add', *(str(file) for file in files)])
def add_files_to_annex(self, files: Sequence[Union[Path, str]]) -> None:
if not (self.has_annex() and len(files) > 0):
return
self._run_git_command(['annex', 'add', '--force-large',
*(str(file) for file in files)])
def commit(self, message: str) -> None:
try:
self._run_git_command(['commit', '--file=-'], input=message)
except subprocess.CalledProcessError as e:
if len(status_lines := e.output.split('\n')) >= 2:
error_line = status_lines[-2]
if (error_line.startswith('nothing added to commit')
or error_line == 'nothing to commit, working tree clean'):
raise GitEmptyCommitError(
'Trying to commit without any added files.'
) from None
raise
def merge(self, other_branch: str, message: Optional[str] = None) -> None:
try:
if message is None:
self._run_git_command(['merge', '--no-edit', other_branch])
else:
self._run_git_command(['merge', '--file=-', other_branch],
input=message)
except subprocess.CalledProcessError as e:
assert 'CONFLICT' in e.stdout
raise GitMergeConflictError.from_stdout(e.stdout) from None
def reset_index_and_working_directory(self) -> None:
self._run_git_command(['reset', '--hard', 'HEAD'])
class GitError(Exception):
pass
@dataclass(frozen=True)
class Conflict:
name: str
type: str
class GitMergeConflictError(GitError):
def __init__(self, conflicts: list[Conflict]):
self.conflicts = conflicts
@classmethod
def from_stdout(cls, stdout: str) -> GitMergeConflictError:
ms = re.finditer(r'CONFLICT \(([^\)]*)\): '
r'Merge conflict in ([^\n]*)\n',
stdout)
return cls([Conflict(name=m.group(2),
type=m.group(1))
for m in ms])
def __str__(self) -> str:
return ('Merge conflict in the following files:\n'
+ '\n'.join(f'({c.type}) {c.name}' for c in self.conflicts))
class GitEmptyCommitError(GitError):
pass
class FakeGit(BaseGit):
def __init__(self) -> None:
pass
def working_directory_is_clean(self) -> bool:
return True
def has_staged_files(self) -> bool:
return False
def current_branch(self) -> str:
return 'not using Git'
def change_branch(self, branch: str) -> None:
pass
def add_file(self, file: Union[Path, str]) -> None:
pass
def add_files(self, files: Iterable[Union[Path, str]]) -> None:
pass
def add_files_to_annex(self, files: Sequence[Union[Path, str]]) -> None:
pass
def commit(self, message: str) -> None:
pass
def merge(self, other_branch: str) -> None:
pass
def reset_index_and_working_directory(self) -> None:
pass