"""Tools for creating command-line and web-based wizards from a tree of nodes.
"""
import os
import re
import ast
import json
import pprint
import fnmatch
import builtins
import textwrap
import collections.abc as cabc
import typing as tp
from xonsh.tools import to_bool, to_bool_or_break, backup_file, print_color
from xonsh.jsonutils import serialize_xonsh_json
#
# Nodes themselves
#
[docs]class Node(object):
"""Base type of all nodes."""
attrs: tp.Union[tp.Tuple[str, ...], str] = ()
def __str__(self):
return PrettyFormatter(self).visit()
def __repr__(self):
return str(self).replace("\n", "")
[docs]class Wizard(Node):
"""Top-level node in the tree."""
attrs = ("children", "path")
def __init__(self, children, path=None):
self.children = children
self.path = path
[docs]class Pass(Node):
"""Simple do-nothing node"""
[docs]class Message(Node):
"""Contains a simple message to report to the user."""
attrs = "message"
def __init__(self, message):
self.message = message
[docs]class Question(Node):
"""Asks a question and then chooses the next node based on the response.
"""
attrs = ("question", "responses", "converter", "path")
def __init__(self, question, responses, converter=None, path=None):
"""
Parameters
----------
question : str
The question itself.
responses : dict with str keys and Node values
Mapping from user-input responses to nodes.
converter : callable, optional
Converts the string the user typed into another object
that serves as a key to the responses dict.
path : str or sequence of str, optional
A path within the storage object.
"""
self.question = question
self.responses = responses
self.converter = converter
self.path = path
[docs]class While(Node):
"""Computes a body while a condition function evaluates to true.
The condition function has the form ``cond(visitor=None, node=None)`` and
must return an object that responds to the Python magic method ``__bool__``.
The beg attribute specifies the number to start the loop iteration at.
"""
attrs = ("cond", "body", "idxname", "beg", "path")
def __init__(self, cond, body, idxname="idx", beg=0, path=None):
"""
Parameters
----------
cond : callable
Function that determines if the next loop iteration should
be executed.
body : sequence of nodes
A list of node to execute on each iteration. The condition function
has the form ``cond(visitor=None, node=None)`` and must return an
object that responds to the Python magic method ``__bool__``.
idxname : str, optional
The variable name for the index.
beg : int, optional
The first index value when evaluating path format strings.
path : str or sequence of str, optional
A path within the storage object.
"""
self.cond = cond
self.body = body
self.idxname = idxname
self.beg = beg
self.path = path
#
# Helper nodes
#
[docs]class YesNo(Question):
"""Represents a simple yes/no question."""
def __init__(self, question, yes, no, path=None):
"""
Parameters
----------
question : str
The question itself.
yes : Node
Node to execute if the response is True.
no : Node
Node to execute if the response is False.
path : str or sequence of str, optional
A path within the storage object.
"""
responses = {True: yes, False: no}
super().__init__(question, responses, converter=to_bool, path=path)
[docs]class TrueFalse(Input):
"""Input node the returns a True or False value."""
def __init__(self, prompt="yes or no [default: no]? ", path=None):
super().__init__(
prompt=prompt,
converter=to_bool,
show_conversion=False,
confirm=False,
path=path,
)
[docs]class TrueFalseBreak(Input):
"""Input node the returns a True, False, or 'break' value."""
def __init__(self, prompt="yes, no, or break [default: no]? ", path=None):
super().__init__(
prompt=prompt,
converter=to_bool_or_break,
show_conversion=False,
confirm=False,
path=path,
)
[docs]class StoreNonEmpty(Input):
"""Stores the user input only if the input was not an empty string.
This works by wrapping the converter function.
"""
def __init__(
self,
prompt=">>> ",
converter=None,
show_conversion=False,
confirm=False,
retry=False,
path=None,
store_raw=False,
):
def nonempty_converter(x):
"""Converts non-empty values and converts empty inputs to
Unstorable.
"""
if len(x) == 0:
x = Unstorable
elif converter is None:
pass
elif store_raw:
converter(x) # make sure str is valid, even if storing raw
else:
x = converter(x)
return x
super().__init__(
prompt=prompt,
converter=nonempty_converter,
show_conversion=show_conversion,
confirm=confirm,
path=path,
retry=retry,
)
[docs]class StateFile(Input):
"""Node for representing the state as a file under a default or user
given file name. This node type is likely not useful on its own.
"""
attrs: tp.Tuple[str, ...] = ("default_file", "check", "ask_filename")
def __init__(self, default_file=None, check=True, ask_filename=True):
"""
Parameters
----------
default_file : str, optional
The default filename to save the file as.
check : bool, optional
Whether to print the current state and ask if it should be
saved/loaded prior to asking for the file name and saving the
file, default=True.
ask_filename : bool, optional
Whether to ask for the filename (if ``False``, always use the
default filename)
"""
self._df = None
super().__init__(prompt="filename: ", converter=None, confirm=False, path=None)
self.ask_filename = ask_filename
self.default_file = default_file
self.check = check
@property
def default_file(self):
return self._df
@default_file.setter
def default_file(self, val):
self._df = val
if val is None:
self.prompt = "filename: "
else:
self.prompt = "filename [default={0!r}]: ".format(val)
[docs]class SaveJSON(StateFile):
"""Node for saving the state as a JSON file under a default or user
given file name.
"""
[docs]class LoadJSON(StateFile):
"""Node for loading the state as a JSON file under a default or user
given file name.
"""
[docs]class FileInserter(StateFile):
"""Node for inserting the state into a file in between a prefix and suffix.
The state is converted according to some dumper rules.
"""
attrs = ("prefix", "suffix", "dump_rules", "default_file", "check", "ask_filename")
def __init__(
self,
prefix,
suffix,
dump_rules,
default_file=None,
check=True,
ask_filename=True,
):
"""
Parameters
----------
prefix : str
Starting unique string in file to find and begin the insertion at,
e.g. '# XONSH WIZARD START\n'
suffix : str
Ending unique string to find in the file and end the replacement at,
e.g. '\n# XONSH WIZARD END'
dump_rules : dict of strs to functions
This is a dictionary that maps the path-like match strings to functions
that take the flat path and the value as arguments and convert the state
value at a path to a string. The keys here may use wildcards (as seen in
the standard library fnmatch module). For example::
dump_rules = {
'/path/to/exact': lambda path, x: str(x),
'/otherpath/*': lambda path, x: x,
'*ending': lambda path x: repr(x),
'/': None,
}
If a wildcard is not used in a path, then that rule will be used
used on an exact match. If wildcards are used, the deepest and longest
match is used. If None is given instead of a the function, it means to
skip generating that key.
default_file : str, optional
The default filename to save the file as.
check : bool, optional
Whether to print the current state and ask if it should be
saved/loaded prior to asking for the file name and saving the
file, default=True.
ask_filename : bool, optional
Whether to ask for the filename (if ``False``, always use the
default filename)
"""
self._dr = None
super().__init__(
default_file=default_file, check=check, ask_filename=ask_filename
)
self.prefix = prefix
self.suffix = suffix
self.dump_rules = self.string_rules = dump_rules
@property
def dump_rules(self):
return self._dr
@dump_rules.setter
def dump_rules(self, value):
dr = {}
for key, func in value.items():
key_trans = fnmatch.translate(key)
r = re.compile(key_trans)
dr[r] = func
self._dr = dr
@staticmethod
def _find_rule_key(x):
"""Key function for sorting regular expression rules"""
return (x[0], len(x[1].pattern))
[docs] def find_rule(self, path):
"""For a path, find the key and conversion function that should be used to
dump a value.
"""
if path in self.string_rules:
return path, self.string_rules[path]
len_funcs = []
for rule, func in self.dump_rules.items():
m = rule.match(path)
if m is None:
continue
i, j = m.span()
len_funcs.append((j - i, rule, func))
if len(len_funcs) == 0:
# No dump rule function for path
return path, None
len_funcs.sort(reverse=True, key=self._find_rule_key)
_, rule, func = len_funcs[0]
return rule, func
[docs] def dumps(self, flat):
"""Dumps a flat mapping of (string path keys, values) pairs and returns
a formatted string.
"""
lines = [self.prefix]
for path, value in sorted(flat.items()):
rule, func = self.find_rule(path)
if func is None:
continue
line = func(path, value)
lines.append(line)
lines.append(self.suffix)
new = "\n".join(lines) + "\n"
return new
[docs]def create_truefalse_cond(prompt="yes or no [default: no]? ", path=None):
"""This creates a basic condition function for use with nodes like While
or other conditions. The condition function creates and visits a TrueFalse
node and returns the result. This TrueFalse node takes the prompt and
path that is passed in here.
"""
def truefalse_cond(visitor, node=None):
"""Prompts the user for a true/false condition."""
tf = TrueFalse(prompt=prompt, path=path)
rtn = visitor.visit(tf)
return rtn
return truefalse_cond
#
# Tools for trees of nodes.
#
def _lowername(cls):
return cls.__name__.lower()
[docs]class Visitor(object):
"""Super-class for all classes that should walk over a tree of nodes.
This implements the visit() method.
"""
def __init__(self, tree=None):
self.tree = tree
[docs] def visit(self, node=None):
"""Walks over a node. If no node is provided, the tree is used."""
if node is None:
node = self.tree
if node is None:
raise RuntimeError("no node or tree given!")
for clsname in map(_lowername, type.mro(node.__class__)):
meth = getattr(self, "visit_" + clsname, None)
if callable(meth):
rtn = meth(node)
break
else:
msg = "could not find valid visitor method for {0} on {1}"
nodename = node.__class__.__name__
selfname = self.__class__.__name__
raise AttributeError(msg.format(nodename, selfname))
return rtn
[docs]def ensure_str_or_int(x):
"""Creates a string or int."""
if isinstance(x, int):
return x
x = x if isinstance(x, str) else str(x)
try:
x = ast.literal_eval(x)
except (ValueError, SyntaxError):
pass
if not isinstance(x, (int, str)):
msg = "{0!r} could not be converted to int or str".format(x)
raise ValueError(msg)
return x
[docs]def canon_path(path, indices=None):
"""Returns the canonical form of a path, which is a tuple of str or ints.
Indices may be optionally passed in.
"""
if not isinstance(path, str):
return tuple(map(ensure_str_or_int, path))
if indices is not None:
path = path.format(**indices)
path = path[1:] if path.startswith("/") else path
path = path[:-1] if path.endswith("/") else path
if len(path) == 0:
return ()
return tuple(map(ensure_str_or_int, path.split("/")))
[docs]class UnstorableType(object):
"""Represents an unstorable return value for when no input was given
or such input was skipped. Typically represented by the Unstorable
singleton.
"""
_inst: tp.Optional["UnstorableType"] = None
def __new__(cls, *args, **kwargs):
if cls._inst is None:
cls._inst = super(UnstorableType, cls).__new__(cls, *args, **kwargs)
return cls._inst
Unstorable = UnstorableType()
[docs]class StateVisitor(Visitor):
"""This class visits the nodes and stores the results in a top-level
dict of data according to the state path of the node. The the node
does not have a path or the path does not exist, the storage is skipped.
This class can be optionally initialized with an existing state.
"""
def __init__(self, tree=None, state=None, indices=None):
super().__init__(tree=tree)
self.state = {} if state is None else state
self.indices = {} if indices is None else indices
[docs] def visit(self, node=None):
if node is None:
node = self.tree
if node is None:
raise RuntimeError("no node or tree given!")
rtn = super().visit(node)
path = getattr(node, "path", None)
if callable(path):
path = path(visitor=self, node=node, val=rtn)
if path is not None and rtn is not Unstorable:
self.store(path, rtn, indices=self.indices)
return rtn
[docs] def store(self, path, val, indices=None):
"""Stores a value at the path location."""
path = canon_path(path, indices=indices)
loc = self.state
for p, n in zip(path[:-1], path[1:]):
if isinstance(p, str) and p not in loc:
loc[p] = {} if isinstance(n, str) else []
elif isinstance(p, int) and abs(p) + (p >= 0) > len(loc):
i = abs(p) + (p >= 0) - len(loc)
if isinstance(n, str):
ex = [{} for _ in range(i)]
else:
ex = [[] for _ in range(i)]
loc.extend(ex)
loc = loc[p]
p = path[-1]
if isinstance(p, int) and abs(p) + (p >= 0) > len(loc):
i = abs(p) + (p >= 0) - len(loc)
ex = [None] * i
loc.extend(ex)
loc[p] = val
[docs] def flatten(self, path="/", value=None, flat=None):
"""Returns a dict version of the store whose keys are paths.
Note that list and dict entries will always end in '/', allowing
disambiquation in dump_rules.
"""
value = self.state if value is None else value
flat = {} if flat is None else flat
if isinstance(value, cabc.Mapping):
path = path if path.endswith("/") else path + "/"
flat[path] = value
for k, v in value.items():
p = path + k
self.flatten(path=p, value=v, flat=flat)
elif isinstance(value, (str, bytes)):
flat[path] = value
elif isinstance(value, cabc.Sequence):
path = path if path.endswith("/") else path + "/"
flat[path] = value
for i, v in enumerate(value):
p = path + str(i)
self.flatten(path=p, value=v, flat=flat)
else:
flat[path] = value
return flat
YN = "{GREEN}yes{RESET} or {RED}no{RESET} [default: no]? "
YNB = "{GREEN}yes{RESET}, {RED}no{RESET}, or " "{YELLOW}break{RESET} [default: no]? "
[docs]class PromptVisitor(StateVisitor):
"""Visits the nodes in the tree via the a command-line prompt."""
def __init__(self, tree=None, state=None, **kwargs):
"""
Parameters
----------
tree : Node, optional
Tree of nodes to start visitor with.
state : dict, optional
Initial state to begin with.
kwargs : optional
Options that are passed through to the prompt via the shell's
singleline() method. See BaseShell for mor details.
"""
super().__init__(tree=tree, state=state)
self.env = builtins.__xonsh__.env
self.shell = builtins.__xonsh__.shell.shell
self.shell_kwargs = kwargs
[docs] def visit_wizard(self, node):
for child in node.children:
self.visit(child)
[docs] def visit_pass(self, node):
pass
[docs] def visit_message(self, node):
print_color(node.message)
[docs] def visit_question(self, node):
self.env["PROMPT"] = node.question
r = self.shell.singleline(**self.shell_kwargs)
if callable(node.converter):
r = node.converter(r)
self.visit(node.responses[r])
return r
[docs] def visit_while(self, node):
rtns = []
origidx = self.indices.get(node.idxname, None)
self.indices[node.idxname] = idx = node.beg
while node.cond(visitor=self, node=node):
rtn = list(map(self.visit, node.body))
rtns.append(rtn)
idx += 1
self.indices[node.idxname] = idx
if origidx is None:
del self.indices[node.idxname]
else:
self.indices[node.idxname] = origidx
return rtns
[docs] def visit_savejson(self, node):
jstate = json.dumps(
self.state, indent=1, sort_keys=True, default=serialize_xonsh_json
)
if node.check:
msg = "The current state is:\n\n{0}\n"
print(msg.format(textwrap.indent(jstate, " ")))
ap = "Would you like to save this state, " + YN
asker = TrueFalse(prompt=ap)
do_save = self.visit(asker)
if not do_save:
return Unstorable
fname = None
if node.ask_filename:
fname = self.visit_input(node)
if fname is None or len(fname) == 0:
fname = node.default_file
if os.path.isfile(fname):
backup_file(fname)
else:
os.makedirs(os.path.dirname(fname), exist_ok=True)
with open(fname, "w") as f:
f.write(jstate)
return fname
[docs] def visit_loadjson(self, node):
if node.check:
ap = "Would you like to load an existing file, " + YN
asker = TrueFalse(prompt=ap)
do_load = self.visit(asker)
if not do_load:
return Unstorable
fname = self.visit_input(node)
if fname is None or len(fname) == 0:
fname = node.default_file
if os.path.isfile(fname):
with open(fname, "r") as f:
self.state = json.load(f)
print_color("{{GREEN}}{0!r} loaded.{{RESET}}".format(fname))
else:
print_color(
("{{RED}}{0!r} could not be found, " "continuing.{{RESET}}").format(
fname
)
)
return fname
[docs] def visit_fileinserter(self, node):
# perform the dumping operation.
new = node.dumps(self.flatten())
# check if we should write this out
if node.check:
msg = "The current state to insert is:\n\n{0}\n"
print(msg.format(textwrap.indent(new, " ")))
ap = "Would you like to write out the current state, " + YN
asker = TrueFalse(prompt=ap)
do_save = self.visit(asker)
if not do_save:
return Unstorable
# get and backup the file.
fname = None
if node.ask_filename:
fname = self.visit_input(node)
if fname is None or len(fname) == 0:
fname = node.default_file
if os.path.isfile(fname):
with open(fname, "r") as f:
s = f.read()
before, _, s = s.partition(node.prefix)
_, _, after = s.partition(node.suffix)
backup_file(fname)
else:
before = after = ""
dname = os.path.dirname(fname)
if dname:
os.makedirs(dname, exist_ok=True)
# write out the file
with open(fname, "w") as f:
f.write(before + new + after)
return fname