-
Notifications
You must be signed in to change notification settings - Fork 19
/
portlib.py
132 lines (114 loc) · 3.94 KB
/
portlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import json
import re
import subprocess
# shadowsocks.json
ss_path = "/etc/ssadmin/shadowsocks.json"
# port.json
port_path = "/etc/ssadmin/port.json"
# user.json
user_path = "/etc/ssadmin/user.json"
# add iptables rule
add_iptable_pre = "iptables -I OUTPUT -p tcp --sport "
# del iptables rule
del_iptable_pre = "iptables -D OUTPUT -p tcp --sport "
# query port traffic
query_port_traffic = "iptables -nvL|grep spt*"
# query port traffic
query_port_traffic_detail = "iptables -nvxL|grep spt*"
def checkUsedAndLimit():
"""
检查流量限制
:return:
"""
# 读取端口信息
port_info_dict = {}
ss_dict = {}
with open(port_path, "r", encoding='utf8') as fr:
port_info_dict = dict(json.loads(fr.readline()))
# 读取ss信息
with open(ss_path, "r", encoding="utf8") as fr:
ss_dict = dict(json.loads(fr.readline()))
isUpdate = False
for port, port_info in port_info_dict.items():
if int(port_info.get("used", 0)) > int(port_info.get("limit", 0)):
isUpdate = True
# 更新port.json
port_info.update({"useable": False})
port_info_dict.update({port: port_info})
# 更新ss.json
ss_port = ss_dict.get("port_password", {})
ss_port.pop(port)
ss_dict.update({"port_password": ss_port})
if isUpdate:
with open(ss_path, "w", encoding='utf8') as fw:
fw.write(json.dumps(ss_dict))
with open(port_path, "w", encoding="utf8") as fw:
fw.write(json.dumps(port_info_dict))
def checkSpeedLimit():
"""
检查速度限制
:return:
"""
pass
def getPortTraffic():
# 读取端口信息
port_info_dict = {}
with open(port_path, "r", encoding="utf8") as fr:
port_info_dict = dict(json.loads(fr.readline()))
# 获取端口流量信息
res = subprocess.getoutput(query_port_traffic_detail)
for item in res.split("\n"):
# 提取查询结果
item_res = re.sub("\s+", "#", item).split("#")
if not item_res[0]: item_res.pop(0)
count_watch = item_res[1]
port_watch = item_res[9]
# 将流量统计转换为MB,上取整
count_watch = int(float(count_watch) / 1000 / 1000)
# 将port_watch转换为标准格式
port_watch = port_watch.replace("spt:","")
# 获取port.json中的信息
port_info = port_info_dict.get(port_watch, {})
port_info_used = int(port_info.get("used", 0))
# 更新port.json的信息
port_info_used += count_watch
port_info.update({"used": port_info_used})
port_info_dict.update({port_watch:port_info})
# 写入port.json端口信息
with open(port_path, "w", encoding="utf8") as fw:
fw.write(json.dumps(port_info_dict))
# 删除所有port信息,再重新添加,以重新计数
delAllPortRule()
addAllPortRule()
def delAllPortRule():
"""
删除所有iptables规则
:return:
"""
port_info_dict = {}
# 读取端口信息
with open(port_path, "r", encoding="utf8") as fr:
port_info_dict = dict(json.loads(fr.readline()))
for port in port_info_dict.keys():
command_string = "{}{}".format(del_iptable_pre, port)
res = subprocess.getoutput(command_string)
def addAllPortRule():
"""
添加所有端口规则
:return:
"""
port_info_dict = {}
# 读取端口信息
with open(port_path, "r", encoding="utf8") as fr:
port_info_dict = dict(json.loads(fr.readline()))
for port in port_info_dict.keys():
command_string = "{}{}".format(add_iptable_pre, port)
res = subprocess.getoutput(command_string)
def delOnePortRule(port):
commad_string = "{}{}".format(del_iptable_pre, port)
res = subprocess.getoutput(commad_string)
def addOnePortRule(port):
command_string = "{}{}".format(add_iptable_pre, port)
res = subprocess.getoutput(command_string)
if __name__ == '__main__':
getPortTraffic()