-
Notifications
You must be signed in to change notification settings - Fork 7
/
tip.py
1608 lines (1314 loc) · 63.9 KB
/
tip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
#-*-coding: UTF-8 -*-
import gi
from gi.repository import Gtk,Gdk
gi.require_version('Vte', '2.91')
from gi.repository import Vte
from gi.repository import GLib,Pango
from terminatorlib.translation import _
import os
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import datetime,time
import json
import re
import sqlite3
import uuid
import math
import cgi
from collections import OrderedDict
def adapt_datetime(ts):
return time.mktime(ts.timetuple())
sqlite3.register_adapter(datetime.datetime, adapt_datetime)
nowTime = lambda:int(round(time.time() * 1000))
db_file = os.path.join(os.path.expanduser('~'), '.terminator.db')
start_blank = re.compile("^\s{2,}")
exclude_cmds = ["clear"]
SUGGESTION_NUM = 8 # 提示框展示的提示命令的最大数量
MAX_HIS_NUM = 8000 # history记录数大于此数时,删除前 DELETE_NUM条。出现性能问题后可先把这个值调小,再想其他办法解决
DELETE_NUM = 2000 # 删除时删除前多少条
MAX_STAT_NUM = 800 # cmd数量最大800,多出的删除。出现性能问题后可先把这个值调小,再想其他办法解决
DELETE_STAT_NUM = 200 # stat删除时删除前多少条
AUTO_TIP_WAIT = 3500 # 自动提示显示多久自动关闭
AUTO_SELECT_ENABLE = False # 开启后 会选中第一条提示,但是比如想执行 ab, 自动选中了 abc,此时回车会输入abc,无法执行 ab
AUTO_SELECT_WAIT = 1500 # 自动选中第一条提示命令后多久自动取消选中,---废弃不用了 不确定性的东西会导致经常出错
DEBUG_ENABLE = False # 是否打印DEBUG日志
INTERVAL_LEVEL = [5000,10000,15000,60000,300000,600000]
# VT100控制码 实验了多次,只支持这样,如\033[D, 而\033[15D这样的直接移动15个位置的不支持,原因未知
cmd_pattern = re.compile(r'[^ ]+ (/)?')
space_pattern = re.compile(r'\s') # 用于匹配是否包含空白字符
(CC_COL_COMMAND, CC_COL_COUNT) = range(0,2)
def log_debug(msg):
if DEBUG_ENABLE:
print("[DEBUG]: %s (%s:%d)" % (msg, __file__[__file__.rfind('/')+1:], sys._getframe(1).f_lineno))
def log_info(msg):
print '\033[32m' + msg + "\033[0m"
def timing(f):
def wrap(*args):
time1 = time.time()
ret = f(*args)
time2 = time.time()
print '%s function took %0.5f ms' % (f.func_name, (time2-time1)*1000.0)
return ret
return wrap
class LRUCache(OrderedDict):
def __init__(self, size=128):
self.size = size,
self.cache = OrderedDict()
def get(self, key):
if self.cache.has_key(key):
val = self.cache.pop(key)
self.cache[key] = val
else:
val = None
return val
def set(self, key, val):
if self.cache.has_key(key):
val = self.cache.pop(key)
self.cache[key] = val
else:
if len(self.cache) == self.size:
self.cache.popitem(last=False)
self.cache[key] = val
else:
self.cache[key] = val
def split_word(str):
s_list = re.split("\W+",str)
for i in range(len(s_list)-1, -1, -1):
if s_list[i] == '':
s_list.pop(i)
return s_list
# simple two string match
str_match_cache = LRUCache(size=2000)
#@timing
def match_str_by_words(str1, str2):
key = str1+":"+str2
cache_match = str_match_cache.get(key)
if cache_match is not None:
return cache_match
l_match = left_match_str_by_words(str1, str2)
if l_match >= 0.5:
str_match_cache.set(key, l_match)
return l_match
r_match = right_match_str_by_words(str1, str2)
bigger_match = (l_match if(l_match > r_match) else r_match)
str_match_cache.set(key, bigger_match)
return bigger_match
def left_match_str(str1, str2):
i,j = len(str1),len(str2)
k = (i if(i<j) else j)
if k == 0:
return 0
m = 0
while m < k:
if str1[m] != str2[m]:
break
m = m + 1
l_match = float(2*m)/(i+j)
return l_match
def left_match_str_by_words(str1, str2):
if len(str1) < 7 or len(str2) < 7:
return left_match_str(str1, str2)
s1 = split_word(str1)
s2 = split_word(str2)
size1 = len(s1)
size2 = len(s2)
k = (size1 if(size1<size2) else size2)
if k == 0:
return 0
m = 0
while m < k:
if s1[m] != s2[m]:
break
m = m + 1
l_match = float(2*m)/(size1+size2)
return l_match
def right_match_str(str1, str2):
i,j = len(str1),len(str2)
k = (i if(i<j) else j)
if k == 0:
return 0
m = 0
while m < k:
if str1[i-1-m] != str2[j-1-m]:
break
m = m + 1
r_match = float(2*m)/(i+j)
return r_match
def right_match_str_by_words(str1, str2):
if len(str1) < 7 or len(str2) < 7:
return right_match_str(str1, str2)
s1 = split_word(str1)
s2 = split_word(str2)
size1 = len(s1)
size2 = len(s2)
k = (size1 if(size1<size2) else size2)
if k == 0:
return 0
m = 0
while m < k:
if s1[size1-1-m] != s2[size2-1-m]:
break
m = m + 1
r_match = float(2*m)/(size1+size2)
return r_match
p_match_cache = LRUCache(size=2000)
#@timing
def prefix_match_str(str1, str2):
key = str1+":"+str2
cache_match = p_match_cache.get(key)
if cache_match is not None:
return cache_match
#prefix是从后往前匹配,且去掉尾部可能存在的特殊字符
prefix_match = right_match_str_by_words(str1,str2)
p_match_cache.set(key, prefix_match)
return prefix_match
#@timing
def precmd_match_str(str1, str2):
s1 = str1.split()
s2 = str2.split()
if len(s1) != len(s2):
return 0
else:
if len(s1) == 1:
return 0
elif len(s1) == 2:
# 主要为了区别 第二个参数带/和不带/ 如 cd /data 和 cd data
matchObj1 = cmd_pattern.match(str1)
matchObj2 = cmd_pattern.match(str2)
if matchObj1 and matchObj2:
cmd1 = matchObj1.group()
cmd2 = matchObj2.group()
if cmd1 != cmd2:
return 0
else:
return 0.3
else:
common_cmd,_ = get_common_cmd(str1,str2)
if common_cmd != '':
return 0.8
else:
return 0
return 0
# type: 1:prefix 2:precmd
def max_match_str(str1, strmaps, type=None):
max_match = 0
max_match_count = 0
total = 0 #总数
kinds = 0 #种类
for str2,count in strmaps.items():
total = total + count
kinds = kinds + 1
if str1 == str2:
new_match = 1
elif str1 is None or str2 is None:
new_match = 0
elif type == 1:
new_match = prefix_match_str(str1, str2)
elif type == 2:
new_match = precmd_match_str(str1, str2)
else:
new_match = match_str_by_words(str1, str2) # title 的匹配
if new_match > max_match:
max_match = new_match
max_match_count = count
# 为了计算总数,这里不 break
# if max_match == 1:
# break
# 计算相关度
relation = float(1)/kinds
if kinds < 5 and max_match_count > 0:
relation = float(max_match_count)/total
return max_match, max_match_count, relation
# 尝试提取2个命令的公共部分,
# 如 git cherry-pick aaksdasd0102021 -n和git cherry-pick isidasudusaudau -n 得到 (git cherry-pick -n, 3)
# 3表示公共部分中不同的部分在中间的情况,最后公共部分输入到终端后需要 回退3列来输入不同的部分
def get_common_cmd(str1,str2):
if '"' in str1 or "'" in str1 or '"' in str2 or "'" in str2:
return '',0
s1 = str1.split()
s2 = str2.split()
if len(s1) != len(s2) or len(s1) < 3 or s1[0] != s2[0]:
return '',0
not_match_count = 0
not_match_index = -1 #不一样的部分的索引
for index, substr in enumerate(s1):
substr2 = s2[index]
if substr != substr2:
not_match_count = not_match_count + 1
not_match_index = index
if not_match_count > 1:
break
#不同的部分不等于1 则不处理
if not_match_count == 1:
common_prefix = os.path.commonprefix([s1[not_match_index],s2[not_match_index]])
s1[not_match_index] = common_prefix
common_cmd = " ".join(s1)
return common_cmd, len(" ".join(s1[not_match_index:])) - len(common_prefix)
return '',0
def get_interval_level(interval):
if interval == -1 or not interval:
return 10
for level, value in enumerate(INTERVAL_LEVEL):
if interval <= value:
return level
def by_score(suggest_cmd):
return suggest_cmd["score"]
class History():
def __init__(self):
self.new_history =[]
self.last_history = {"cmd":""}
self.total_count = 0
self.history_stat = {}
self.init_history()
self.last_append_time = nowTime()
def init_history(self):
self.conn = sqlite3.connect(db_file)
self.conn.text_factory = str
#create cursor
cursor = self.conn.cursor()
cursor.execute("SELECT count(*) AS cnt FROM sqlite_master WHERE type='table' AND name='history'")
row = cursor.fetchone()
if row[0] == 0:
#create history table
cursor.execute('''create table history(
id INTEGER PRIMARY KEY AUTOINCREMENT,
session varchar(20),
cmd TEXT,
pre_cmd TEXT,
prefix TEXT,
window_title TEXT,
time date,
interval INTEGER
)''')
#fetch history
cursor.execute('select id,session,cmd,pre_cmd,prefix,window_title,time,interval from history')
while True:
historys = cursor.fetchmany(size=1000)
if not historys or len(historys) == 0:
break
for his in historys:
history = {'cmd':his[2],'pre_cmd':his[3],'prefix':his[4],'window_title':his[5],'time':his[6],'interval':his[7]}
self.add_to_stat(history)
self.total_count = self.total_count + 1
#大于最大数量时,删除 DELETE_NUM 条,此时不同步更新 stat,没有大问题
if self.total_count >= MAX_HIS_NUM:
result = cursor.execute("delete from history where id in (select id from history order by id limit {})".format(DELETE_NUM))
self.total_count = self.total_count - result.rowcount
self.conn.commit()
#清理数据库
cursor.execute("VACUUM")
#获取所有公共命令
cursor.execute("SELECT count(*) AS cnt FROM sqlite_master WHERE type='table' AND name='common_cmd'")
row = cursor.fetchone()
if row[0] == 0:
#create common_cmd table
cursor.execute('''create table common_cmd(
id INTEGER PRIMARY KEY AUTOINCREMENT,
cmd TEXT,
back_size INTEGER,
count INTEGER,
time date
)''')
#fetch common_cmd
cursor.execute('select id,cmd,back_size,count,time from common_cmd order by id desc')
common_cmds = cursor.fetchmany(size=1000)
self.all_common_cmds = []
all_size = len(common_cmds)
if common_cmds and all_size > 0:
group_common_cmds = {}
index = 0
to_delete_common_cmds = []
for common_cmd in common_cmds:
index = index + 1
# 删除400之后的,只保留400条,等下次大于500条的时候再次删除
if index >= 400 and all_size > 500:
to_delete_common_cmds.append(common_cmd[0])
else:
cmd_str = common_cmd[1]
group_common_cmds.setdefault(cmd_str, {'cmd':cmd_str, 'back_size':common_cmd[2],'count':0,'time':common_cmd[4]})
group_common_cmds[cmd_str]["count"] = group_common_cmds[cmd_str]["count"] + 1
group_common_cmds[cmd_str]["time"] = common_cmd[4]
if len(to_delete_common_cmds) > 0:
# create cursor and delete common_cmds
cursor = conn.cursor()
cursor.executemany("delete from common_cmd where cmd = id", to_delete_common_cmds)
conn.commit()
self.all_common_cmds = group_common_cmds.values()
def add_to_stat(self,history):
cmd = history["cmd"]
title = history["window_title"]
last_time = history["time"]
prefix = history["prefix"]
pre_cmd = history["pre_cmd"]
l_interval = get_interval_level(history["interval"])
self.history_stat.setdefault(cmd,{"count":0,"titles":{},"prefixs":{},"pre_cmds":{}})
self.history_stat[cmd]["count"] = self.history_stat[cmd]["count"] + 1
self.history_stat[cmd]["titles"].setdefault(title,0)
self.history_stat[cmd]["titles"][title] = self.history_stat[cmd]["titles"][title] + 1
self.history_stat[cmd]["prefixs"].setdefault(prefix,0)
self.history_stat[cmd]["prefixs"][prefix] = self.history_stat[cmd]["prefixs"][prefix] + 1
#pre_cmds的结构: "pre_cmds": { "pwd": { "1": 1 }, "java -jar arthas-boot.jar": { "4": 1 } }
self.history_stat[cmd]["pre_cmds"].setdefault(pre_cmd,{})
self.history_stat[cmd]["pre_cmds"][pre_cmd].setdefault(l_interval,0)
self.history_stat[cmd]["pre_cmds"][pre_cmd][l_interval] = self.history_stat[cmd]["pre_cmds"][pre_cmd][l_interval] + 1
self.history_stat[cmd]["last_time"] = last_time
#当大于最大STAT数量时
if len(self.history_stat) >= MAX_STAT_NUM:
all_stats = []
for cmd, stat in self.history_stat.items():
all_stats.append({"cmd":cmd,"count":stat["count"],"last_time":stat["last_time"]})
#排序后取前100,删除前100条数据
sort_stats = sorted(all_stats, key=lambda stat : (stat['count'], stat['last_time']))
to_delete_cmds = []
for stat in sort_stats:
to_delete_cmds.append((stat["cmd"],))
if len(to_delete_cmds) >= DELETE_STAT_NUM:
break
#create cursor
print(tuple(to_delete_cmds))
cursor = self.conn.cursor()
cursor.executemany("delete from history where cmd = ?", to_delete_cmds)
self.conn.commit()
#清理数据库
cursor.execute("VACUUM")
#重新查询总数
cursor.execute("select count(*) from history")
count_result = cursor.fetchone()
self.total_count = count_result[0]
#从history_stat删除
for del_cmd_tuple in to_delete_cmds:
del self.history_stat[del_cmd_tuple[0]]
def append_to_histable(self):
if len(self.new_history) == 0 :
return
log_debug("write to history:" + str(self.new_history))
his_list = []
for his in self.new_history:
his_list.append((his["session"],his["cmd"].encode('utf-8'),
his["pre_cmd"].encode('utf-8'),his["prefix"].encode('utf-8'),
his.get("window_title",'').encode('utf-8'),
his["time"],his["interval"]))
#create cursor
cursor = self.conn.cursor()
cursor.executemany('''INSERT INTO history(session,cmd,pre_cmd,prefix,window_title,time,interval)
VALUES(?,?,?,?,?,?,?)''', his_list)
self.conn.commit()
#大于最大数量时,删除 DELETE_NUM 条,此时不同步更新 stat,没有大问题
if self.total_count >= MAX_HIS_NUM:
result = cursor.execute("delete from history where id in (select id from history order by id limit {})".format(DELETE_NUM))
self.total_count = self.total_count - result.rowcount
self.conn.commit()
#清理数据库
cursor.execute("VACUUM")
self.new_history = []
self.last_append_time = nowTime()
def add_history(self, history):
#长度小于2的直接不记录了
if len(history["cmd"]) <= 2:
return
if self.last_history["cmd"] == history["cmd"]:
return
self.last_history = history
self.total_count = self.total_count + 1
self.new_history.append(history)
self.add_to_stat(history)
now = nowTime()
if len(self.new_history) >= 5 or self.last_append_time - now > 120 * 1000:
self.append_to_histable()
# 记录公共命令,先更新self.all_common_cmds,然后插入
def append_common_cmd(self,common_cmd, back_size):
exist_before = False
for _common in self.all_common_cmds:
if _common["cmd"] == common_cmd:
_common["count"] = _common["count"] + 1
_common["time"] = nowTime()
back_size = _common["back_size"] # 用于插入记录
exist_before = True
break
# 应该不会有 back_size 还是None的
if back_size is None:
log_info("exception...")
return
if not exist_before:
self.all_common_cmds.append( {'cmd':common_cmd,'back_size':back_size,'count':1,'time':nowTime()} )
log_debug("add common_cmd:" + common_cmd)
# 添加一条新的 common历史记录
cursor = self.conn.cursor()
cursor.execute('''INSERT INTO common_cmd(cmd,back_size,count,time)
VALUES(?,?,?,?)''', (common_cmd,back_size,1,nowTime()))
self.conn.commit()
# 获取最少使用的命令 count在5以下的 使用最少的一般可能是错误的,需要删除的
def get_lfu_cmds(self,input=''):
lfu_cmds = []
for cmd, stat in self.history_stat.items():
if stat["count"] < 5 and input in cmd:
lfu_cmds.append({"command":cmd,"count":stat["count"],"last_time":stat["last_time"]})
sorted_lfu_cmds = sorted(lfu_cmds, key=lambda item : (item['count'], item['last_time']))
if len(sorted_lfu_cmds) > 10:
sorted_lfu_cmds = sorted_lfu_cmds[:10]
return sorted_lfu_cmds
# 删除命令记录
def delete_cmd(self, del_cmd):
cursor = self.conn.cursor()
cursor.execute("delete from history where cmd = ?", (del_cmd,))
self.conn.commit()
del self.history_stat[del_cmd]
his_recorder = History()
class ListBoxRowWithData(Gtk.ListBoxRow):
def __init__(self, data, back_len, start1, end1, start2,
pattern=None, back_size=0):
super(Gtk.ListBoxRow, self).__init__()
self.data = data
self.back_len = back_len
self.start1 = start1
self.end1 = end1
self.start2 = start2
self.pattern = pattern
self.back_size = back_size
label = Gtk.Label()
label.set_markup('<span foreground="blue">' + cgi.escape(data[:start1]) + '</span>'\
+ cgi.escape(data[start1:end1]) + '<span foreground="blue">' + \
cgi.escape(data[end1:start2]) +'</span>'+ \
cgi.escape(data[start2:]))
label.set_xalign(0)
label.set_margin_start(5)
label.set_margin_end(3)
label.set_margin_top(3)
label.set_margin_bottom(3)
label.set_width_chars(15)
label.set_max_width_chars(100)
label.set_line_wrap(False)
label.set_ellipsize(Pango.EllipsizeMode.END)
self.add(label)
class Tip(Gtk.Window):
def __init__(self):
self.recorder = {}
self.terminal = None
self.style_context = Gtk.StyleContext()
self.provider = Gtk.CssProvider()
self.provider.load_from_data("""
#listbox {
font-weight: 500;
}
"""
)
self.init_tip_window()
def init_tip_window(self):
tip_window = Gtk.Window (Gtk.WindowType.POPUP)
tip_window.set_position(Gtk.WindowPosition.CENTER_ON_PARENT)
#tip_window.set_transient_for(self.parent)
self.style_context.add_provider_for_screen(tip_window.get_screen(), self.provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION )
#listBox start
listbox = Gtk.ListBox()
listbox.set_selection_mode(Gtk.SelectionMode.NONE)
listbox.unselect_all()
listbox.set_name("listbox")
def on_row_activated(listbox_widget, row):
log_debug(row.data)
back_len = row.back_len
start1 = row.start1
end1 = row.end1
start2 = row.start2
pattern = row.pattern
back_size = row.back_size
feed_cmd = row.data
#为了兼容zsh-autosuggestions这种,主动发一下 相当于 ↓ 这个按钮的控制码来消除zsh的自动提示
if self.recorder[self.terminal].get("shell_auto_suggestion", False):
down_control_seq = "\033[B"
self.terminal.feed_child(down_control_seq,len(down_control_seq))
if back_len > 0:
back = "\033[D"
self.terminal.feed_child(back * back_len, 3 * back_len)
self.terminal.feed_child(feed_cmd[start1:end1], len(feed_cmd[start1:end1]))
forward = "\033[C"
self.terminal.feed_child(forward * back_len, 3 * back_len)
self.terminal.feed_child(feed_cmd[start2:], len(feed_cmd[start2:]))
if pattern is not None :
if back_size > 0:
back = "\033[D"
self.terminal.feed_child(back * back_size, 3 * back_size)
self.recorder[self.terminal]["pattern"] = pattern
self.recorder[self.terminal]["selected_suggestion"] = row.data
self.hide_suggestion_list()
listbox.connect('row-activated', on_row_activated)
#listBox end
self.listbox = listbox
tip_window.add(listbox)
tip_window.connect("key-press-event", self.tip_key_press)
self.tip_window = tip_window
self.tip_create_time = nowTime()
self.wait_unselect = False
self.wait_autoclose = False
def start_record(self, terminal):
log_debug("liangyong record")
# start
(col, row) = terminal.get_cursor_position()
self.recorder[terminal] = {"handler_id":0, "min_col": 99999, "row":row,
"line_start_row":row,"pre_cmd":"","session": str(uuid.uuid1()),
"hidden":False}
# 添加contents-changed事件的处理
log_debug("liangyong record contents-changed")
self.recorder[terminal]["handler_id"] = terminal.connect('contents-changed', self.record_history)
log_debug("liangyong record contents-changed childexited")
self.recorder[terminal]["exit_id"] = terminal.connect('child-exited', self.exit_record)
def record_history(self, terminal):
log_debug("contents-changed")
(col, row) = terminal.get_cursor_position()
log_debug(str(col) + ":" + str(row))
if col == 0 and row == 0 :
log_debug("this is the terminal start, return")
return
last_saved_row = self.recorder[terminal]["row"]
last_commited = self.recorder[terminal].get("commit_char",'')
last_min_col = self.recorder[terminal].get("min_col",9999)
line_wrap = False #此次内容变化是否是长度导致的换行
auto_show = False
if row != last_saved_row:
log_debug("row changed")
log_debug(str(row) + ":" + str(last_saved_row))
last_row_content = self.recorder[terminal].get("row_content","")
last_cmd = self.recorder[terminal].get("cmd_content","")
last_title = self.recorder[terminal].get("window_title","")
last_min_col = self.recorder[terminal]["min_col"] # 光标最小位置
log_debug(last_min_col)
init_content = self.recorder[terminal].get("init_content","")
#检查这次的内容变化是否是长度导致的换行/或者长度导致的行数减小
#需要判断row大于上一次的row,有过操作,且在三行内,才判断是不是换行,因为vi退出后也可能导致这种变化
if row > last_saved_row and row - last_saved_row < 3 and self.recorder[terminal].get("has_operation",False):
content_between_row = self.get_text_content_with_last(terminal,last_saved_row, 0, row, terminal.get_column_count())
log_debug(content_between_row)
if content_between_row != '' and "\n" not in content_between_row:
line_wrap = True
elif last_saved_row - row == 1: #本来跨行,后退后不跨了?
line_wrap = True
log_debug(line_wrap)
if not line_wrap: # 需要记录最后一条命令,并初始化新行
# get output
invi = self.recorder[terminal].get("invi",False)
last_output = ''
begin_row = last_saved_row + 1
if row -1 >= begin_row and not invi: # vi模式中的不记录
if row-21 > begin_row:
begin_row = row-21
last_output,_ = self.get_text_content(terminal, begin_row, 0, row-1, terminal.get_column_count())
# last_output 可以用来提示
log_debug(last_output)
log_debug("print last content")
log_debug(last_row_content)
log_debug(init_content)
log_debug(last_title)
log_debug(last_cmd)
log_debug(last_min_col)
log_debug(last_commited)
#记录上次的命令行输入
last_cmd = last_cmd.strip()
line_start_row = self.recorder[terminal]["line_start_row"]
invi = self.recorder[terminal].get("invi",False)
if (last_min_col > 0 or line_start_row != last_saved_row) and not invi and init_content != last_row_content :
if start_blank.match(last_row_content):
log_debug("not record because row_content :" + last_row_content)
elif last_cmd != '' and len(last_cmd) > 1 and not last_cmd in exclude_cmds :
last_cmd = self.special_handle(last_cmd)
pre_cmd = self.recorder[terminal]["pre_cmd"]
index = last_row_content.find(last_cmd)
prefix = last_row_content[0:index].strip()
session = self.recorder[terminal]["session"]
#记录2个命令之间的时间间隔
now = nowTime()
pre_time = self.recorder[terminal].get("pre_time",0)
interval = now - pre_time
if interval >= 600000 :
interval = -1
history = {"time":now,"prefix":prefix,"cmd":last_cmd,"window_title":last_title,"pre_cmd": pre_cmd,"session":session,"interval":interval}
log_debug(history)
his_recorder.add_history(history)
self.recorder[terminal]["pre_cmd"] = last_cmd
self.recorder[terminal]["pre_time"] = now
# 尝试提取公共部分
select_cmd = self.recorder[terminal].get("selected_suggestion",'')
select_pattern = self.recorder[terminal].get("pattern",None)
if select_cmd != '' and select_cmd != last_cmd and select_pattern is None:
log_debug("need try to get common")
common_cmd, back_size = get_common_cmd(last_cmd, select_cmd)
# common_cmd 为空表示没有公共部分
if common_cmd != '':
his_recorder.append_common_cmd(common_cmd,back_size)
elif select_pattern is not None and select_pattern.match(last_cmd):
his_recorder.append_common_cmd(select_cmd, None)
self.recorder[terminal]["selected_suggestion"] = ''
self.recorder[terminal]["pattern"] = None
else:
log_debug("not record because of last_cmd:" + last_cmd)
# 初始化新行的相关变量
self.init_new_row(terminal,col,row)
auto_show = True
self.recorder[terminal]["row"] = row
if (last_min_col > col and self.recorder[terminal]["line_start_row"] == row):
self.recorder[terminal]["min_col"] = col
#或者一开始是0,未进行过操作自动变到当前位置:对应场景是有时反应慢,过一会才正常显示在屏幕上
elif last_min_col == 0 and not self.recorder[terminal]["has_operation"] :
self.recorder[terminal]["min_col"] = col
auto_show=True
min_col = self.recorder[terminal]["min_col"]
line_start_row = self.recorder[terminal]["line_start_row"]
log_debug(last_commited)
log_debug((line_start_row,row))
log_debug(self.recorder[terminal].get("invi",False))
# 为了修复跨行问题,多获取一行 这样也只能解决命令跨一行的问题,先这样,后面再看
# 这里修复的场景是:变为第二行的首位,反应慢还没变到正常位置时
row_content,_ = self.get_text_content(terminal,line_start_row, 0, row+1, terminal.get_column_count())
lf_index = row_content.find("\n")
if lf_index != -1:
row_content = row_content[:lf_index]
# 必须再次调用,不能直接由row_content得来,否则前面的PS1如zsh的含有特殊字符时,会出错
cmd_content,cmd_attrs = self.get_text_content(terminal,line_start_row, min_col, row+1, terminal.get_column_count())
lf_index = cmd_content.find("\n")
if lf_index != -1:
cmd_content,cmd_attrs = cmd_content[:lf_index],cmd_attrs[:lf_index]
# 对于如redis-cli,zsh这样的,后面可能会自动出现提示信息,这里尝试舍弃后面的部分
cmd_content = self.check_shell_auto_suggestion(terminal,cmd_content,cmd_attrs)
self.recorder[terminal]["row_content"] = row_content
self.recorder[terminal]["cmd_content"] = cmd_content
self.recorder[terminal]["window_title"] = self.get_window_title(terminal)
# 以下逻辑决定是否展示提示
# 如果在单词级别提示下,则展示单词级别提示
if terminal.is_focus():
# 未跨行且最后输入的时单个字符并且不在vi下则提示
if last_commited != '' and line_start_row == row and \
not self.recorder[terminal].get("invi",False) and \
not self.recorder[terminal]["hidden"]:
self.show_tip_window(terminal)
elif auto_show and min_col > 0:
self.auto_suggestion(terminal)
else:
self.hide_suggestion_list()
def check_shell_auto_suggestion(self,terminal,cmd_content,cmd_attrs):
if len(cmd_attrs) > 0 and len(cmd_content) == len(cmd_attrs):
first_fore = cmd_attrs[0].fore
index = 0
#log_debug((first_fore.blue,first_fore.green,first_fore.red))
for attr in cmd_attrs:
cur_for = attr.fore
#log_debug((cur_for.blue,cur_for.green,cur_for.red,attr.column))
#根据颜色对比,如redis-cli,zsh这样使用VT100控制码浅色提示的,提示信息的颜色与前面的输入是不一样的
if cur_for.blue != first_fore.blue or cur_for.green != first_fore.green or cur_for.red != first_fore.red:
#log_debug("color check break")
break
index += 1
if index < len(cmd_content):
log_debug("may be have auto prompt string, such as redis-cli,zsh")
self.recorder[terminal]["shell_auto_suggestion"] = True
cmd_content = cmd_content[:index]
else:
self.recorder[terminal]["shell_auto_suggestion"] = False
return cmd_content
def init_new_row(self,terminal,col,row):
log_debug("init_new_row")
new_row_content,_ = self.get_text_content(terminal,row, 0, row, terminal.get_column_count())
self.recorder[terminal]["min_col"] = col
self.recorder[terminal]["commit_char"] = ''
self.recorder[terminal]["has_operation"] = False # if had operation(eg: move) on current line
self.recorder[terminal]["init_content"] = new_row_content
self.recorder[terminal]["row_content"] = new_row_content
self.recorder[terminal]["line_start_row"] = row
self.recorder[terminal]["shell_auto_suggestion"] = False #
self.recorder[terminal]["selected_suggestion"] = ''
self.recorder[terminal]["pattern"] = None
self.recorder[terminal]["hidden"] = False
self.check_if_invi(terminal)
def check_if_invi(self,terminal):
#尝试检查是否在vi编辑中,不一定准确,但是没找到更好的办法
adj = terminal.get_vadjustment()
lower,upper,value,p_size = adj.get_lower(), adj.get_upper(), adj.get_value(),adj.get_page_size()
#log_debug(str(lower) +"/" + str(upper) +"/" + str(value) + "/" + str(p_size))
#log_debug(terminal.get_row_count())
if lower != 0 and lower == value and p_size == terminal.get_row_count():
log_debug("now in vi")
self.recorder[terminal]["invi"] = True
else:
self.recorder[terminal]["invi"] = False
# Don't get the last char if is '\n'
def get_text_content(self,terminal,start_row,start_col,end_row,end_col ):
content_attr = terminal.get_text_range(start_row, start_col, end_row, end_col, lambda *a: True)
content = content_attr[0]
attrs = content_attr[1]
if content.endswith("\n"):
content = content[:-1]
attrs = attrs[:-1]
return content,attrs
def get_text_content_with_last(self,terminal,start_row,start_col,end_row,end_col ):
content_attr = terminal.get_text_range(start_row, start_col, end_row, end_col, lambda *a: True)
content = content_attr[0]
return content
# title有可能取空
def get_window_title(self,terminal):
title = terminal.get_window_title()
if title is None:
title = ''
return title
#当前仅对cd 进行特殊处理,统一去掉最后的/
def special_handle(self, last_cmd):
if last_cmd.startswith("cd ") and last_cmd.endswith("/"):
last_cmd = last_cmd.rstrip("/")
#普通的命令,中间有多个空格的,替换为1个
if not '"' in last_cmd and not "'" in last_cmd:
last_cmd = " ".join(last_cmd.split())
return last_cmd
def exit_record(self, terminal, status):
log_debug("exit_record")
his_recorder.append_to_histable()
if self.recorder.has_key(terminal):
if self.recorder[terminal].get("handler_id",0) != 0:
terminal.disconnect(self.recorder[terminal]["handler_id"])
del(self.recorder[terminal])
#如果所有窗口都关闭了,则关闭连接
if len(self.recorder) == 0:
try:
his_recorder.conn.close()
except:
log_debug("close slite exception")
else:
log_debug("close sqlite success")
#上一个命令完成后,自动提示下一个命令并选中第一个
def auto_suggestion(self,terminal):
log_debug("auto_suggestion")
# 将要添加的提示
list_add = []
pre_cmd = self.recorder[terminal]["pre_cmd"]
row_content = self.recorder[terminal]["row_content"]
min_col = self.recorder[terminal]["min_col"]
prefix = row_content[0:min_col].strip()
log_debug(prefix)
log_debug(pre_cmd)
min_interval = 2 #间隔小于15秒的
for cmd, stat in his_recorder.history_stat.items():
#当长度小于2时,不需要
if len(cmd) <= 2:
continue
#根据 pre_cmd prefix 两者来判断
prefixs = stat["prefixs"]
pre_cmds = stat["pre_cmds"]
titles = stat["titles"]
prefix_count = 0
for _prefix,count in prefixs.items():
if _prefix == prefix:
prefix_count = count
#如果等于0,则没必要继续了
if prefix_count ==0:
continue
# 自动提示是在上一个命令执行后,自动展示 所以有必要判断历史命令间隔,小于一定时间间隔的才有意义
precmd_count = 0
# pre_cmds 转化为 {"cmd":total_count} 的结构 再去匹配
to_match_precmds = {}
for precmd, intervals in pre_cmds.items():
_precmd_count = 0
_valid_count = 0
for interval,_count in intervals.items():
# 间隔没有意义的不管了 interval的值参见 get_interval_level
if interval >= 10:
continue
_precmd_count = _precmd_count + _count
if interval <= min_interval:
_valid_count = _valid_count + _count
to_match_precmds[precmd] = {"count":_precmd_count, "valid_count": _valid_count}
for _precmd, item in to_match_precmds.items():
if _precmd == pre_cmd:
precmd_count = item["valid_count"]
if precmd_count == 0: