Skip to content

Commit

Permalink
bump version, Merge branch 'print-message-2'
Browse files Browse the repository at this point in the history
  • Loading branch information
casperdcl committed Apr 25, 2016
2 parents 17a6d03 + 91133a7 commit 36c5013
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 6 deletions.
39 changes: 39 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,21 @@ Returns
Cleanup and (if leave=False) close the progressbar.
"""
def clear(self):
"""
Clear current bar display
"""
def refresh(self):
"""
Force refresh the display of this bar
"""
def write(cls, s, file=sys.stdout, end="\n"):
"""
Print a message via tqdm (without overlap with bars)
"""
def trange(*args, **kwargs):
"""
A shortcut for tqdm(xrange(*args), **kwargs).
Expand Down Expand Up @@ -449,6 +464,30 @@ For manual control over positioning (e.g. for multi-threaded use),
you may specify `position=n` where `n=0` for the outermost bar,
`n=1` for the next, and so on.

Writing messages
~~~~~~~~~~~~~~~~~~~~
Since ``tqdm`` uses a simple printing mechanism to display progress bars,
you should not write any message in the terminal using ``print()``.

To write messages in the terminal without any collision with ``tqdm`` bar
display, a ``.write()`` method is provided:

.. code:: python
from tqdm import tqdm, trange
from time import sleep
bar = trange(10)
for i in bar:
# Print using tqdm class method .write()
sleep(0.1)
if not (i % 3):
tqdm.write("Done task %i" % i)
# Can also use bar.write()
By default, this will print to standard output ``sys.stdout``. but you can
specify any file-like object using the ``file`` argument. For example, this
can be used to redirect the messages writing to a log file or class.

How to make a good progress bar
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
55 changes: 53 additions & 2 deletions tqdm/_tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,13 @@ def format_meter(n, total, elapsed, ncols=None, prefix='',
n_fmt, unit, elapsed_str, rate_fmt)

def __new__(cls, *args, **kwargs):
# Create a new instance
instance = object.__new__(cls)
# Add to the list of instances
if "_instances" not in cls.__dict__:
cls._instances = WeakSet()
cls._instances.add(instance)
# Return the instance
return instance

@classmethod
Expand Down Expand Up @@ -282,6 +285,30 @@ def _decr_instances(cls, instance):
except KeyError:
pass

@classmethod
def write(cls, s, file=sys.stdout, end="\n"):
"""
Print a message via tqdm (without overlap with bars)
"""
# Clear all bars
inst_cleared = []
for inst in cls._instances:
# Clear instance if in the target output file
# or if write output + tqdm output are both either
# sys.stdout or sys.stderr (because both are mixed in terminal)
if inst.fp == file or \
(file in [sys.stdout, sys.stderr] and
inst.fp in [sys.stdout, sys.stderr]):
inst.clear()
inst_cleared.append(inst)
# Write the message
file.write(s)
file.write(end)
# Force refresh display of bars we cleared
for inst in inst_cleared:
inst.refresh()
# TODO: make list of all instances incl. absolutely positioned ones?

def __init__(self, iterable=None, desc=None, total=None, leave=True,
file=sys.stderr, ncols=None, mininterval=0.1,
maxinterval=10.0, miniters=None, ascii=None, disable=False,
Expand Down Expand Up @@ -598,8 +625,8 @@ def __iter__(self):
(1 - smoothing) * miniters

# Store old values for next call
last_print_n = n
last_print_t = cur_t
self.n = self.last_print_n = last_print_n = n
self.last_print_t = last_print_t = cur_t

# Closing the progress bar.
# Update some internal variables for close().
Expand Down Expand Up @@ -758,6 +785,30 @@ def set_description(self, desc=None):
def moveto(self, n):
self.fp.write(_unicode('\n' * n + _term_move_up() * -n))

def clear(self, nomove=False):
"""
Clear current bar display
"""
if not nomove:
self.moveto(self.pos)
# clear up the bar (can't rely on sp(''))
self.fp.write('\r')
self.fp.write(' ' * (self.ncols if self.ncols else 10))
self.fp.write('\r') # place cursor back at the beginning of line
if not nomove:
self.moveto(-self.pos)

def refresh(self):
"""
Force refresh the display of this bar
"""
self.moveto(self.pos)
# clear up this line's content (whatever there was)
self.clear(nomove=True)
# Print current/last bar state
self.fp.write(self.__repr__())
self.moveto(-self.pos)


def trange(*args, **kwargs):
"""
Expand Down
2 changes: 1 addition & 1 deletion tqdm/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
__all__ = ["__version__"]

# major, minor, patch, -extra
version_info = 4, 4, 3
version_info = 4, 5, 0

# Nice string for the version
__version__ = '.'.join(map(str, version_info))
Expand Down
172 changes: 169 additions & 3 deletions tqdm/tests/tests_tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def closing(arg):
RE_rate = re.compile(r'(\d+\.\d+)it/s')
RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars
RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars
RE_pos = re.compile(r'((\x1b\[A|\r|\n)+((pos\d+) bar:\s+\d+%|\s{3,6})?)') # NOQA


class DiscreteTimer(object):
Expand Down Expand Up @@ -134,6 +135,51 @@ def progressbar_rate(bar_str):
return float(RE_rate.search(bar_str).group(1))


def squash_ctrlchars(s):
""" Apply control characters in a string just like a terminal display """
# List of supported control codes
ctrlcodes = [r'\r', r'\n', r'\x1b\[A']

# Init variables
curline = 0 # current line in our fake terminal
lines = [''] # state of our fake terminal

# Split input string by control codes
RE_ctrl = re.compile("(%s)" % ("|".join(ctrlcodes)), flags=re.DOTALL)
s_split = RE_ctrl.split(s)
s_split = filter(None, s_split) # filter out empty splits

# For each control character or message
for nextctrl in s_split:
# If it's a control character, apply it
if nextctrl == '\r':
# Carriage return
# Go to the beginning of the line
# simplified here: we just empty the string
lines[curline] = ''
elif nextctrl == '\n':
# Newline
# Go to the next line
if curline < (len(lines) - 1):
# If already exists, just move cursor
curline += 1
else:
# Else the new line is created
lines.append('')
curline += 1
elif nextctrl == '\x1b[A':
# Move cursor up
if curline > 0:
curline -= 1
else:
raise ValueError("Cannot go up, anymore!")
# Else, it is a message, we print it on current line
else:
lines[curline] += nextctrl

return lines


def test_format_interval():
""" Test time interval format """
format_interval = tqdm.format_interval
Expand Down Expand Up @@ -810,9 +856,6 @@ def test_position():
if nt_and_no_colorama:
raise SkipTest

# Use regexp because the it rates can change
RE_pos = re.compile(r'((\x1b\[A|\r|\n)+((pos\d+) bar:\s+\d+%|\s{3,6})?)') # NOQA

# Artificially test nested loop printing
# Without leave
our_file = StringIO()
Expand Down Expand Up @@ -1029,3 +1072,126 @@ def test_repr():
with closing(StringIO()) as our_file:
with tqdm(total=10, ascii=True, file=our_file) as t:
assert str(t) == ' 0%| | 0/10 [00:00<?, ?it/s]'


@with_setup(pretest, posttest)
def test_clear():
""" Test clearing bar display """
with closing(StringIO()) as our_file:
t1 = tqdm(total=10, file=our_file, desc='pos0 bar',
bar_format='{l_bar}')
t2 = trange(10, file=our_file, desc='pos1 bar',
bar_format='{l_bar}')
before = squash_ctrlchars(our_file.getvalue())
t2.clear()
t1.clear()
after = squash_ctrlchars(our_file.getvalue())
t1.close()
t2.close()
assert before == ['pos0 bar: 0%|', 'pos1 bar: 0%|']
assert after == ['', '']


@with_setup(pretest, posttest)
def test_refresh():
""" Test refresh bar display """
with closing(StringIO()) as our_file:
t1 = tqdm(total=10, file=our_file, desc='pos0 bar',
bar_format='{l_bar}', mininterval=999, miniters=999)
t2 = tqdm(total=10, file=our_file, desc='pos1 bar',
bar_format='{l_bar}', mininterval=999, miniters=999)
t1.update()
t2.update()
before = squash_ctrlchars(our_file.getvalue())
t1.refresh()
t2.refresh()
after = squash_ctrlchars(our_file.getvalue())
t1.close()
t2.close()

# Check that refreshing indeed forced the display to use realtime state
assert before == [u'pos0 bar: 0%|', u'pos1 bar: 0%|']
assert after == [u'pos0 bar: 10%|', u'pos1 bar: 10%|']


@with_setup(pretest, posttest)
def test_write():
""" Test write messages """
s = "Hello world"
with closing(StringIO()) as our_file:
# Change format to keep only left part w/o bar and it/s rate
t1 = tqdm(total=10, file=our_file, desc='pos0 bar',
bar_format='{l_bar}', mininterval=0, miniters=1)
t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}',
mininterval=0, miniters=1)
t3 = tqdm(total=10, file=our_file, desc='pos2 bar',
bar_format='{l_bar}', mininterval=0, miniters=1)
t1.update()
t2.update()
t3.update()
before = our_file.getvalue()

# Write msg and see if bars are correctly redrawn below the msg
t1.write(s, file=our_file) # call as an instance method
tqdm.write(s, file=our_file) # call as a class method
after = our_file.getvalue()

t1.close()
t2.close()
t3.close()

before_squashed = squash_ctrlchars(before)
after_squashed = squash_ctrlchars(after)

assert after_squashed == [s, s] + before_squashed

# Check that no bar clearing if different file
with closing(StringIO()) as our_file_bar:
with closing(StringIO()) as our_file_write:
t1 = tqdm(total=10, file=our_file_bar, desc='pos0 bar',
bar_format='{l_bar}', mininterval=0, miniters=1)

t1.update()
before_bar = our_file_bar.getvalue()

tqdm.write(s, file=our_file_write)

after_bar = our_file_bar.getvalue()
t1.close()

assert before_bar == after_bar

# Test stdout/stderr anti-mixup strategy
# Backup stdout/stderr
stde = sys.stderr
stdo = sys.stdout
# Mock stdout/stderr
with closing(StringIO()) as our_stderr:
with closing(StringIO()) as our_stdout:
sys.stderr = our_stderr
sys.stdout = our_stdout
t1 = tqdm(total=10, file=sys.stderr, desc='pos0 bar',
bar_format='{l_bar}', mininterval=0, miniters=1)

t1.update()
before_err = sys.stderr.getvalue()
before_out = sys.stdout.getvalue()

tqdm.write(s, file=sys.stdout)
after_err = sys.stderr.getvalue()
after_out = sys.stdout.getvalue()

t1.close()

assert before_err == '\rpos0 bar: 0%|\rpos0 bar: 10%|'
assert before_out == ''
after_err_res = [m[0] for m in RE_pos.findall(after_err)]
assert after_err_res == [u'\rpos0 bar: 0%',
u'\rpos0 bar: 10%',
u'\r ',
u'\r\r ',
u'\rpos0 bar: 10%']
assert after_out == s+'\n'
# Restore stdout and stderr
sys.stderr = stde
sys.stdout = stdo

0 comments on commit 36c5013

Please sign in to comment.