Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support runtest.py to work on Windows when no_pty=True #640

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
211 changes: 152 additions & 59 deletions runtest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
#!/usr/bin/env python

from __future__ import print_function
import os, sys, re
import argparse, time
import signal, atexit

from subprocess import Popen, STDOUT, PIPE
from select import select

# Pseudo-TTY and terminal manipulation
import pty, array, fcntl, termios

IS_PY_3 = sys.version_info[0] == 3

if os.name == 'posix':
from select import select
else:
if IS_PY_3:
import threading, queue
from subprocess import TimeoutExpired
else:
import threading
import Queue as queue

debug_file = None
log_file = None

Expand Down Expand Up @@ -83,80 +87,160 @@ def __init__(self, args, no_pty=False, line_break="\n"):
env['TERM'] = 'dumb'
env['INPUTRC'] = '/dev/null'
env['PERL_RL'] = 'false'
if no_pty:
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
if os.name == 'posix':
if no_pty:
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
else:
# Pseudo-TTY and terminal manipulation
import pty, array, fcntl, termios

# provide tty to get 'interactive' readline to work
master, slave = pty.openpty()

# Set terminal size large so that readline will not send
# ANSI/VT escape codes when the lines are long.
buf = array.array('h', [100, 200, 0, 0])
fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True)

self.p = Popen(args, bufsize=0,
stdin=slave, stdout=slave, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
# Now close slave so that we will get an exception from
# read when the child exits early
# http://stackoverflow.com/questions/11165521
os.close(slave)
self.stdin = os.fdopen(master, 'r+b', 0)
self.stdout = self.stdin
elif os.name == 'nt':
if no_pty:
from subprocess import CREATE_NEW_PROCESS_GROUP
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
creationflags=CREATE_NEW_PROCESS_GROUP,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
else:
raise ValueError('pty not supported on os.name="{}"'.format(os.name))
else:
# provide tty to get 'interactive' readline to work
master, slave = pty.openpty()

# Set terminal size large so that readline will not send
# ANSI/VT escape codes when the lines are long.
buf = array.array('h', [100, 200, 0, 0])
fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True)

self.p = Popen(args, bufsize=0,
stdin=slave, stdout=slave, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
# Now close slave so that we will get an exception from
# read when the child exits early
# http://stackoverflow.com/questions/11165521
os.close(slave)
self.stdin = os.fdopen(master, 'r+b', 0)
self.stdout = self.stdin
if no_pty:
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
else:
raise ValueError('pty not supported on os.name="{}"'.format(os.name))

#print "started"
self.buf = ""
self.last_prompt = ""

self.line_break = line_break

def read_to_prompt(self, prompts, timeout):
if os.name == 'posix':
self.q = None
self.t = None
else:
self.q = queue.Queue()
self.t = threading.Thread(target=self._reader, args=())
self.t.daemon = True
self.t.start()

def _reader(self):
try:
f = self.stdout
ok = True
while ok:
try:
new_data = f.read(1)
if len(new_data) == 0: # EOF
ok = False
except Exception as e:
# catch the read exception and send it to queue
ok = False
new_data = e
self.q.put(new_data)
except:
pass

def read_to_prompt(self, prompts, timeout, search_prefix=''):
end_time = time.time() + timeout
while time.time() < end_time:
[outs,_,_] = select([self.stdout], [], [], 1)
if self.stdout in outs:
while True:
current_timeout = max(end_time - time.time(), 0.)
if current_timeout == 0.:
break
if os.name == 'posix':
[outs,_,_] = select([self.stdout], [], [], 1)
if self.stdout not in outs:
continue
new_data = self.stdout.read(1)
new_data = new_data.decode("utf-8") if IS_PY_3 else new_data
#print("new_data: '%s'" % new_data)
debug(new_data)
# Perform newline cleanup
self.buf += new_data.replace("\r", "")
for prompt in prompts:
regexp = re.compile(prompt)
match = regexp.search(self.buf)
if match:
end = match.end()
buf = self.buf[0:match.start()]
self.buf = self.buf[end:]
self.last_prompt = prompt
return buf
else:
try:
new_data = self.q.get(timeout=current_timeout)
except queue.Empty:
break
if isinstance(new_data, Exception):
raise new_data
if len(new_data) == 0: # EOF
break
new_data = new_data.decode("utf-8") if IS_PY_3 else new_data
#print("new_data: '%s'" % new_data)
debug(new_data)
# Perform newline cleanup
self.buf += new_data.replace("\r", "")
for prompt in prompts:
regexp = re.compile(prompt)
match = regexp.search(search_prefix + self.buf)
if match:
start = match.start() - len(search_prefix)
end = match.end() - len(search_prefix)
buf = self.buf[0:start]
self.buf = self.buf[end:]
self.last_prompt = prompt
return buf
# MAYBE we should distinguish EOF from TIMEOUT,
# return None for both cases currently
return None

def writeline(self, str):
def _to_bytes(s):
return bytes(s, "utf-8") if IS_PY_3 else s

self.stdin.write(_to_bytes(str.replace('\r', '\x16\r') + self.line_break))
if os.name == 'posix':
self.stdin.write(_to_bytes(str.replace('\r', '\x16\r') + self.line_break))
else:
self.stdin.write(_to_bytes(str + self.line_break))

def cleanup(self):
#print "cleaning up"
if self.p:
try:
os.killpg(self.p.pid, signal.SIGTERM)
if os.name == 'posix':
os.killpg(self.p.pid, signal.SIGTERM)
elif os.name == 'nt':
self.p.send_signal(signal.CTRL_BREAK_EVENT)
else:
self.p.terminate()
if IS_PY_3:
try:
self.p.communicate(timeout=1.0)
except TimeoutExpired:
self.p.kill()
except OSError:
pass
self.p = None
self.stdin = None
self.stdout = None

class TestReader:
def __init__(self, test_file):
self.line_num = 0
f = open(test_file, newline='') if IS_PY_3 else open(test_file)
f = open(test_file)
self.data = f.read().split('\n')
self.soft = False
self.deferrable = False
Expand Down Expand Up @@ -267,6 +351,14 @@ def assert_prompt(runner, prompts, timeout):
class TestTimeout(Exception):
pass

def has_any_match(expects, res):
success = False
for expect in expects:
success = re.search(expect, res, re.S)
if success:
break
return success

while t.next():
if args.deferrable == False and t.deferrable:
log(t.deferrable)
Expand All @@ -287,23 +379,24 @@ class TestTimeout(Exception):
# The repeated form is to get around an occasional OS X issue
# where the form is repeated.
# https://github.com/kanaka/mal/issues/30
expects = [".*%s%s%s" % (sep, t.out, re.escape(t.ret)),
".*%s.*%s%s%s" % (sep, sep, t.out, re.escape(t.ret))]
expects = ["%s%s" % (t.out, re.escape(t.ret)), # for Windows, WSL
".*%s%s%s" % (sep, t.out, re.escape(t.ret)), # for Linux, OS X
".*%s.*%s%s%s" % (sep, sep, t.out, re.escape(t.ret))] # for OS X

r.writeline(t.form)
try:
test_cnt += 1
# Search with prepending prefix '\n' for avoiding hangs on Windows
res = r.read_to_prompt(['\r\n[^\s()<>]+> ', '\n[^\s()<>]+> '],
timeout=args.test_timeout)
timeout=args.test_timeout, search_prefix='\n')
#print "%s,%s,%s" % (idx, repr(p.before), repr(p.after))
if (res == None):
log(" -> TIMEOUT (line %d)" % t.line_num)
raise TestTimeout("TIMEOUT (line %d)" % t.line_num)
elif (t.ret == "" and t.out == ""):
log(" -> SUCCESS (result ignored)")
pass_cnt += 1
elif (re.search(expects[0], res, re.S) or
re.search(expects[1], res, re.S)):
elif has_any_match(expects, res):
log(" -> SUCCESS")
pass_cnt += 1
else:
Expand Down