-
Notifications
You must be signed in to change notification settings - Fork 0
/
git-forks
220 lines (187 loc) · 8.99 KB
/
git-forks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#!/usr/bin/env python
# This script is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This script is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this script. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import json
import argparse
import subprocess
import urllib.request
# This expression is intended to match the following patterns:
# [email protected]:owner/project
# git://example.com/owner/project
# https://example.com/owner/project
# It captures the domain, and the owner/project path
apiPattern = "^(?:https|git)(?:://|@)(.+?)(?:/|:)(.+)\.git$"
## git Utility functions
def git_getlocalremotes():
remotes = {}
try:
ret = subprocess.check_output(["git", "remote", "--verbose"], stdin=open(os.devnull,"w"))
except subprocess.CalledProcessError as e:
print("An error has occurred acquiring remotes.")
return None
else:
for line in ret.decode("utf-8").splitlines():
remote = line.split()
if remote[2] == "(fetch)":
remotes[remote[0]] = {"url": remote[1]}
return remotes
def git_addremote(rname, rurl):
ret = subprocess.call(["git", "remote", "add", rname, rurl])
if ret != 0:
print("An error has occurred adding remote `{}` ({}). Aborting.".format(rname, rurl))
return False
return True
## Forks functions
def deduceAPI(URI):
matches = re.search(apiPattern, URI)
if matches:
return matches.group(1)
return None
def deduceRepoInfo(URI):
matches = re.search(apiPattern, URI)
if matches:
return matches.group(2).split("/")
return None
def getForkList(upstream=None, remotes=None):
print("Checking remote named `{}` found at {} for forks...".format(upstream, remotes[upstream]["url"]))
forkList = {}
apiName = deduceAPI(remotes[upstream]["url"])
try:
if apiName == "github.com":
user, repo = deduceRepoInfo(remotes[upstream]["url"])
print("Finding all GitHub forks of {}'s repository named `{}`...".format(user, repo))
with urllib.request.urlopen("https://api.github.com/repos/{}/{}".format(user, repo)) as ghinfo:
ghinfojson = json.loads(ghinfo.read().decode('utf-8'))
currentPage = "{}?per_page=100".format(ghinfojson["forks_url"])
donePaging = False
while not donePaging:
with urllib.request.urlopen(currentPage) as ghforks:
ghforkjson = json.loads(ghforks.read().decode('utf-8'))
for fork in ghforkjson:
forkList[fork["owner"]["login"]] = {"url": fork["git_url"]}
# If pagination exists, parse subsequent pages
pageLinks = None
if "Link" in ghforks.info().keys():
pagePattern = "<(.*?)>; rel=\"(.*?)\""
pageLinks = {link[1]:link[0] for link in re.findall(pagePattern, ghforks.info().get("Link"))}
if pageLinks and "next" in pageLinks.keys():
currentPage = pageLinks["next"]
else:
donePaging = True
elif apiName == "bitbucket.org":
user, repo = deduceRepoInfo(remotes[upstream]["url"])
print("Finding all Bitbucket forks of {}'s repository named `{}`...".format(user, repo))
with urllib.request.urlopen("https://bitbucket.org/api/2.0/repositories/{}/{}".format(user, repo)) as hostinfo:
hostinfojson = json.loads(hostinfo.read().decode('utf-8'))
with urllib.request.urlopen("{}".format(hostinfojson["links"]["forks"]["href"], repo)) as hostforks:
hostforksjson = json.loads(hostforks.read().decode('utf-8'))
for fork in hostforksjson["values"]:
if fork["scm"] == "git":
for protocol in fork["links"]["clone"]:
if protocol["name"] == "https":
forkList[fork["owner"]["username"]] = {"url": protocol["href"]}
else:
print("API protocol unrecognized for '{}'. Please file a bug report to request support.".format(apiName))
except urllib.error.HTTPError as e:
print("Unable to complete API request: {}".format(e))
return forkList
## Command line functions
def command_add(args):
if not args.owners and not args.all:
print("Nothing to add. Please specify an owner or use --all. See --help for more information.")
return
remotes = git_getlocalremotes()
if not remotes:
print("No remotes found. Please add at least one remote before attempting to add its forks.")
return
upstream = args.upstream
if upstream not in remotes:
print("Specified remote `{}` does not exist.".format(upstream))
return
forkList = getForkList(upstream, remotes)
if not forkList:
print("No forks of remote `{}` found.".format(upstream))
return
if args.owners and args.all:
print("Redunant owner(s) provided: adding all forks.")
args.owners = None
if args.owners and args.excludes:
print("Explicit owner(s) provided: ignoring --exclude.")
queuedRemotes = []
if args.owners:
for owner in args.owners:
if owner in forkList.keys():
queuedRemotes.append(owner)
else:
print("Fork owned by {} not found. Skipping...".format(owner))
elif args.all:
queuedRemotes = [fork for fork in forkList]
if args.excludes:
for exclude in args.excludes:
if exclude in queuedRemotes:
queuedRemotes.remove(exclude)
addedRemotes = 0
existingRemoteURIs = [remotes[remote]["url"] for remote in remotes]
for remote in queuedRemotes:
if forkList[remote]["url"] not in existingRemoteURIs:
if git_addremote(remote, forkList[remote]["url"]):
addedRemotes += 1
print("{} forks out of {} added as remotes.".format(addedRemotes, len(forkList)))
def command_list(args):
upstream = args.upstream
remotes = git_getlocalremotes()
if not remotes:
print("No remotes found. Please add at least one remote before attempting to list its forks.")
return
if upstream in remotes:
forkList = getForkList(upstream, remotes)
try:
maxLength = len(max(forkList, key=len))
except ValueError:
print("No forks of remote `{}` found.".format(upstream))
else:
if not args.nolocal:
print("\nRemotes already tracked locally are marked with '*':\n")
existingRemoteURIs = [remotes[remote]["url"] for remote in remotes]
for fork in forkList:
isLocal = True if forkList[fork]["url"] in existingRemoteURIs else False
if not (isLocal and args.nolocal):
print("{:2}{:{width}}\t{}".format("*" if isLocal else "", fork, forkList[fork]["url"], width=maxLength))
else:
print("Specified remote `{}` does not exist.".format(upstream))
def main():
parser = argparse.ArgumentParser(description="A utility for interfacing with remote forks.", prog="git forks")
subparsers = parser.add_subparsers()
# Shared options
shared_arguments = [
dict(args=["-u", "--upstream"], kwargs=dict(dest="upstream", default="origin", help="Use specified remote as the root repository when finding forks. (default: origin)"))
]
# 'Add' command options
parser_add = subparsers.add_parser("add")
parser_add.add_argument(dest="owners", default=None, nargs="*", help="Name of repository owner(s) to add as remote.")
parser_add.add_argument("-x", "--exclude", dest="excludes", default=None, nargs="*", help="Name of repository owner(s) to exclude when using --all.")
parser_add.add_argument("-a", "--all", dest="all", action="store_true", help="Add all forks of the specified remote.")
for arg in shared_arguments:
parser_add.add_argument(*arg["args"], **arg["kwargs"])
parser_add.set_defaults(func=command_add)
# 'List' command options
parser_list = subparsers.add_parser("list")
parser_list.add_argument("-n", "--nonlocal", dest="nolocal", action="store_true", help="Show only forks which are not local remotes.")
for arg in shared_arguments:
parser_list.add_argument(*arg["args"], **arg["kwargs"])
parser_list.set_defaults(func=command_list)
args = parser.parse_args()
args.func(args)
if __name__ == '__main__': main()