# -*- coding: utf-8 -*-
"""The xonsh built-ins.
Note that this module is named 'built_ins' so as not to be confused with the
special Python builtins module.
"""
import atexit
import builtins
from collections import Sequence
from contextlib import contextmanager
import inspect
from glob import iglob
import os
import re
import shlex
import signal
from subprocess import Popen, PIPE, STDOUT, CalledProcessError
import sys
import tempfile
import time
from xonsh.aliases import Aliases, make_default_aliases
from xonsh.environ import Env, default_env, locate_binary
from xonsh.foreign_shells import load_foreign_aliases
from xonsh.history import History
from xonsh.inspectors import Inspector
from xonsh.jobs import add_job, wait_for_active_job
from xonsh.platform import ON_POSIX, ON_WINDOWS
from xonsh.proc import (ProcProxy, SimpleProcProxy, ForegroundProcProxy,
SimpleForegroundProcProxy, TeePTYProc,
CompletedCommand, HiddenCompletedCommand)
from xonsh.tools import (
suggest_commands, XonshError, expandvars, CommandsCache, globpath,
iglobpath
)
ENV = None
BUILTINS_LOADED = False
INSPECTOR = Inspector()
AT_EXIT_SIGNALS = (signal.SIGABRT, signal.SIGFPE, signal.SIGILL, signal.SIGSEGV,
signal.SIGTERM)
SIGNAL_MESSAGES = {
signal.SIGABRT: 'Aborted',
signal.SIGFPE: 'Floating point exception',
signal.SIGILL: 'Illegal instructions',
signal.SIGTERM: 'Terminated',
signal.SIGSEGV: 'Segmentation fault'
}
if ON_POSIX:
AT_EXIT_SIGNALS += (signal.SIGTSTP, signal.SIGQUIT, signal.SIGHUP)
SIGNAL_MESSAGES.update({
signal.SIGQUIT: 'Quit',
signal.SIGHUP: 'Hangup',
signal.SIGKILL: 'Killed'
})
[docs]def resetting_signal_handle(sig, f):
"""Sets a new signal handle that will automatically restore the old value
once the new handle is finished.
"""
oldh = signal.getsignal(sig)
def newh(s=None, frame=None):
f(s, frame)
signal.signal(sig, oldh)
if sig != 0:
sys.exit(sig)
signal.signal(sig, newh)
[docs]def helper(x, name=''):
"""Prints help about, and then returns that variable."""
INSPECTOR.pinfo(x, oname=name, detail_level=0)
return x
[docs]def superhelper(x, name=''):
"""Prints help about, and then returns that variable."""
INSPECTOR.pinfo(x, oname=name, detail_level=1)
return x
[docs]def expand_path(s):
"""Takes a string path and expands ~ to home and environment vars."""
global ENV
if ENV.get('EXPAND_ENV_VARS'):
s = expandvars(s)
return os.path.expanduser(s)
[docs]def reglob(path, parts=None, i=None):
"""Regular expression-based globbing."""
if parts is None:
path = os.path.normpath(path)
drive, tail = os.path.splitdrive(path)
parts = tail.split(os.sep)
d = os.sep if os.path.isabs(path) else '.'
d = os.path.join(drive, d)
return reglob(d, parts, i=0)
base = subdir = path
if i == 0:
if not os.path.isabs(base):
base = ''
elif len(parts) > 1:
i += 1
regex = os.path.join(base, parts[i])
if ON_WINDOWS:
# currently unable to access regex backslash sequences
# on Windows due to paths using \.
regex = regex.replace('\\', '\\\\')
regex = re.compile(regex)
files = os.listdir(subdir)
files.sort()
paths = []
i1 = i + 1
if i1 == len(parts):
for f in files:
p = os.path.join(base, f)
if regex.fullmatch(p) is not None:
paths.append(p)
else:
for f in files:
p = os.path.join(base, f)
if regex.fullmatch(p) is None or not os.path.isdir(p):
continue
paths += reglob(p, parts=parts, i=i1)
return paths
[docs]def regexpath(s, pymode=False):
"""Takes a regular expression string and returns a list of file
paths that match the regex.
"""
s = expand_path(s)
o = reglob(s)
no_match = [] if pymode else [s]
return o if len(o) != 0 else no_match
RE_SHEBANG = re.compile(r'#![ \t]*(.+?)$')
def _is_binary(fname, limit=80):
with open(fname, 'rb') as f:
for i in range(limit):
char = f.read(1)
if char == b'\0':
return True
if char == b'\n':
return False
if char == b'':
return False
return False
def _un_shebang(x):
if x == '/usr/bin/env':
return []
elif any(x.startswith(i) for i in ['/usr/bin', '/usr/local/bin', '/bin']):
x = os.path.basename(x)
elif x.endswith('python') or x.endswith('python.exe'):
x = 'python'
if x == 'xonsh':
return ['python', '-m', 'xonsh.main']
return [x]
[docs]def get_script_subproc_command(fname, args):
"""
Given the name of a script outside the path, returns a list representing
an appropriate subprocess command to execute the script. Raises
PermissionError if the script is not executable.
"""
# make sure file is executable
if not os.access(fname, os.X_OK):
raise PermissionError
if ON_POSIX and not os.access(fname, os.R_OK):
# on some systems, some importnat programs (e.g. sudo) will have
# execute permissions but not read/write permisions. This enables
# things with the SUID set to be run. Needs to come before _is_binary()
# is called, because that function tries to read the file.
return [fname] + args
elif _is_binary(fname):
# if the file is a binary, we should call it directly
return [fname] + args
if ON_WINDOWS:
# Windows can execute various filetypes directly
# as given in PATHEXT
_, ext = os.path.splitext(fname)
if ext.upper() in builtins.__xonsh_env__.get('PATHEXT'):
return [fname] + args
# find interpreter
with open(fname, 'rb') as f:
first_line = f.readline().decode().strip()
m = RE_SHEBANG.match(first_line)
# xonsh is the default interpreter
if m is None:
interp = ['xonsh']
else:
interp = m.group(1).strip()
if len(interp) > 0:
interp = shlex.split(interp)
else:
interp = ['xonsh']
if ON_WINDOWS:
o = []
for i in interp:
o.extend(_un_shebang(i))
interp = o
return interp + [fname] + args
def _subproc_pre():
os.setpgrp()
signal.signal(signal.SIGTSTP, lambda n, f: signal.pause())
_REDIR_NAME = "(o(?:ut)?|e(?:rr)?|a(?:ll)?|&?\d?)"
_REDIR_REGEX = re.compile("{r}(>?>|<){r}$".format(r=_REDIR_NAME))
_MODES = {'>>': 'a', '>': 'w', '<': 'r'}
_WRITE_MODES = frozenset({'w', 'a'})
_REDIR_ALL = frozenset({'&', 'a', 'all'})
_REDIR_ERR = frozenset({'2', 'e', 'err'})
_REDIR_OUT = frozenset({'', '1', 'o', 'out'})
_E2O_MAP = frozenset({'{}>{}'.format(e, o)
for e in _REDIR_ERR
for o in _REDIR_OUT
if o != ''})
def _is_redirect(x):
return isinstance(x, str) and _REDIR_REGEX.match(x)
def _open(fname, mode):
# file descriptors
if isinstance(fname, int):
return fname
try:
return open(fname, mode)
except PermissionError:
raise XonshError('xonsh: {0}: permission denied'.format(fname))
except FileNotFoundError:
raise XonshError('xonsh: {0}: no such file or directory'.format(fname))
except Exception:
raise XonshError('xonsh: {0}: unable to open file'.format(fname))
def _redirect_io(streams, r, loc=None):
# special case of redirecting stderr to stdout
if r.replace('&', '') in _E2O_MAP:
if 'stderr' in streams:
raise XonshError('Multiple redirects for stderr')
streams['stderr'] = ('<stdout>', 'a', STDOUT)
return
orig, mode, dest = _REDIR_REGEX.match(r).groups()
# redirect to fd
if dest.startswith('&'):
try:
dest = int(dest[1:])
if loc is None:
loc, dest = dest, ''
else:
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
except (ValueError, XonshError):
raise
except Exception:
pass
mode = _MODES.get(mode, None)
if mode == 'r':
if len(orig) > 0 or len(dest) > 0:
raise XonshError('Unrecognized redirection command: {}'.format(r))
elif 'stdin' in streams:
raise XonshError('Multiple inputs for stdin')
else:
streams['stdin'] = (loc, 'r', _open(loc, mode))
elif mode in _WRITE_MODES:
if orig in _REDIR_ALL:
if 'stderr' in streams:
raise XonshError('Multiple redirects for stderr')
elif 'stdout' in streams:
raise XonshError('Multiple redirects for stdout')
elif len(dest) > 0:
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
targets = ['stdout', 'stderr']
elif orig in _REDIR_ERR:
if 'stderr' in streams:
raise XonshError('Multiple redirects for stderr')
elif len(dest) > 0:
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
targets = ['stderr']
elif orig in _REDIR_OUT:
if 'stdout' in streams:
raise XonshError('Multiple redirects for stdout')
elif len(dest) > 0:
e = 'Unrecognized redirection command: {}'.format(r)
raise XonshError(e)
targets = ['stdout']
else:
raise XonshError('Unrecognized redirection command: {}'.format(r))
f = _open(loc, mode)
for t in targets:
streams[t] = (loc, mode, f)
else:
raise XonshError('Unrecognized redirection command: {}'.format(r))
[docs]def run_subproc(cmds, captured=False):
"""Runs a subprocess, in its many forms. This takes a list of 'commands,'
which may be a list of command line arguments or a string, representing
a special connecting character. For example::
$ ls | grep wakka
is represented by the following cmds::
[['ls'], '|', ['grep', 'wakka']]
Lastly, the captured argument affects only the last real command.
"""
global ENV
background = False
procinfo = {}
if cmds[-1] == '&':
background = True
cmds = cmds[:-1]
write_target = None
last_cmd = len(cmds) - 1
procs = []
prev_proc = None
_capture_streams = captured in {'stdout', 'object'}
for ix, cmd in enumerate(cmds):
starttime = time.time()
procinfo['args'] = list(cmd)
stdin = None
stderr = None
if isinstance(cmd, str):
continue
streams = {}
while True:
if len(cmd) >= 3 and _is_redirect(cmd[-2]):
_redirect_io(streams, cmd[-2], cmd[-1])
cmd = cmd[:-2]
elif len(cmd) >= 2 and _is_redirect(cmd[-1]):
_redirect_io(streams, cmd[-1])
cmd = cmd[:-1]
elif len(cmd) >= 3 and cmd[0] == '<':
_redirect_io(streams, cmd[0], cmd[1])
cmd = cmd[2:]
else:
break
# set standard input
if 'stdin' in streams:
if prev_proc is not None:
raise XonshError('Multiple inputs for stdin')
stdin = streams['stdin'][-1]
procinfo['stdin_redirect'] = streams['stdin'][:-1]
elif prev_proc is not None:
stdin = prev_proc.stdout
# set standard output
_stdout_name = None
_stderr_name = None
if 'stdout' in streams:
if ix != last_cmd:
raise XonshError('Multiple redirects for stdout')
stdout = streams['stdout'][-1]
procinfo['stdout_redirect'] = streams['stdout'][:-1]
elif ix != last_cmd:
stdout = PIPE
elif _capture_streams:
_nstdout = stdout = tempfile.NamedTemporaryFile(delete=False)
_stdout_name = stdout.name
elif builtins.__xonsh_stdout_uncaptured__ is not None:
stdout = builtins.__xonsh_stdout_uncaptured__
else:
stdout = None
# set standard error
if 'stderr' in streams:
stderr = streams['stderr'][-1]
procinfo['stderr_redirect'] = streams['stderr'][:-1]
elif captured == 'object' and ix == last_cmd:
_nstderr = stderr = tempfile.NamedTemporaryFile(delete=False)
_stderr_name = stderr.name
elif builtins.__xonsh_stderr_uncaptured__ is not None:
stderr = builtins.__xonsh_stderr_uncaptured__
uninew = (ix == last_cmd) and (not _capture_streams)
alias = builtins.aliases.get(cmd[0], None)
procinfo['alias'] = alias
if (alias is None and
builtins.__xonsh_env__.get('AUTO_CD') and
len(cmd) == 1 and
os.path.isdir(cmd[0]) and
locate_binary(cmd[0]) is None):
cmd.insert(0, 'cd')
alias = builtins.aliases.get('cd', None)
if callable(alias):
aliased_cmd = alias
else:
if alias is not None:
cmd = alias + cmd[1:]
n = locate_binary(cmd[0])
if n is None:
aliased_cmd = cmd
else:
try:
aliased_cmd = get_script_subproc_command(n, cmd[1:])
except PermissionError:
e = 'xonsh: subprocess mode: permission denied: {0}'
raise XonshError(e.format(cmd[0]))
_stdin_file = None
if (stdin is not None and
ENV.get('XONSH_STORE_STDIN') and
captured == 'object' and
'cat' in __xonsh_commands_cache__ and
'tee' in __xonsh_commands_cache__):
_stdin_file = tempfile.NamedTemporaryFile()
cproc = Popen(['cat'],
stdin=stdin,
stdout=PIPE)
tproc = Popen(['tee', _stdin_file.name],
stdin=cproc.stdout,
stdout=PIPE)
stdin = tproc.stdout
if callable(aliased_cmd):
prev_is_proxy = True
bgable = getattr(aliased_cmd, '__xonsh_backgroundable__', True)
numargs = len(inspect.signature(aliased_cmd).parameters)
if numargs == 2:
cls = SimpleProcProxy if bgable else SimpleForegroundProcProxy
elif numargs == 4:
cls = ProcProxy if bgable else ForegroundProcProxy
else:
e = 'Expected callable with 2 or 4 arguments, not {}'
raise XonshError(e.format(numargs))
proc = cls(aliased_cmd, cmd[1:],
stdin, stdout, stderr,
universal_newlines=uninew)
else:
prev_is_proxy = False
usetee = ((stdout is None) and
(not background) and
ENV.get('XONSH_STORE_STDOUT', False))
cls = TeePTYProc if usetee else Popen
subproc_kwargs = {}
if ON_POSIX and cls is Popen:
subproc_kwargs['preexec_fn'] = _subproc_pre
try:
proc = cls(aliased_cmd,
universal_newlines=uninew,
env=ENV.detype(),
stdin=stdin,
stdout=stdout,
stderr=stderr,
**subproc_kwargs)
except PermissionError:
e = 'xonsh: subprocess mode: permission denied: {0}'
raise XonshError(e.format(aliased_cmd[0]))
except FileNotFoundError:
cmd = aliased_cmd[0]
e = 'xonsh: subprocess mode: command not found: {0}'.format(cmd)
sug = suggest_commands(cmd, ENV, builtins.aliases)
if len(sug.strip()) > 0:
e += '\n' + suggest_commands(cmd, ENV, builtins.aliases)
raise XonshError(e)
procs.append(proc)
prev_proc = proc
for proc in procs[:-1]:
try:
proc.stdout.close()
except OSError:
pass
if not prev_is_proxy:
add_job({
'cmds': cmds,
'pids': [i.pid for i in procs],
'obj': prev_proc,
'bg': background
})
if (ENV.get('XONSH_INTERACTIVE') and
not ENV.get('XONSH_STORE_STDOUT') and
not _capture_streams):
# set title here to get current command running
try:
builtins.__xonsh_shell__.settitle()
except AttributeError:
pass
if background:
return
if prev_is_proxy:
prev_proc.wait()
wait_for_active_job()
hist = builtins.__xonsh_history__
hist.last_cmd_rtn = prev_proc.returncode
# get output
output = b''
if write_target is None:
if _stdout_name is not None:
with open(_stdout_name, 'rb') as stdoutfile:
output = stdoutfile.read()
try:
_nstdout.close()
except:
pass
os.unlink(_stdout_name)
elif prev_proc.stdout not in (None, sys.stdout):
output = prev_proc.stdout.read()
if _capture_streams:
# to get proper encoding from Popen, we have to
# use a byte stream and then implement universal_newlines here
output = output.decode(encoding=ENV.get('XONSH_ENCODING'),
errors=ENV.get('XONSH_ENCODING_ERRORS'))
output = output.replace('\r\n', '\n')
else:
hist.last_cmd_out = output
if captured == 'object': # get stderr as well
named = _stderr_name is not None
unnamed = prev_proc.stderr not in {None, sys.stderr}
if named:
with open(_stderr_name, 'rb') as stderrfile:
errout = stderrfile.read()
try:
_nstderr.close()
except:
pass
os.unlink(_stderr_name)
elif unnamed:
errout = prev_proc.stderr.read()
if named or unnamed:
errout = errout.decode(encoding=ENV.get('XONSH_ENCODING'),
errors=ENV.get('XONSH_ENCODING_ERRORS'))
errout = errout.replace('\r\n', '\n')
procinfo['stderr'] = errout
if getattr(prev_proc, 'signal', None):
sig, core = prev_proc.signal
sig_str = SIGNAL_MESSAGES.get(sig)
if sig_str:
if core:
sig_str += ' (core dumped)'
print(sig_str, file=sys.stderr)
if (not prev_is_proxy and
hist.last_cmd_rtn is not None and
hist.last_cmd_rtn > 0 and
ENV.get('RAISE_SUBPROC_ERROR')):
raise CalledProcessError(hist.last_cmd_rtn, aliased_cmd, output=output)
if captured == 'stdout':
return output
elif captured is not False:
procinfo['pid'] = prev_proc.pid
procinfo['returncode'] = prev_proc.returncode
procinfo['timestamp'] = (starttime, time.time())
if captured == 'object':
procinfo['stdout'] = output
if _stdin_file is not None:
_stdin_file.seek(0)
procinfo['stdin'] = _stdin_file.read().decode()
_stdin_file.close()
return CompletedCommand(**procinfo)
else:
return HiddenCompletedCommand(**procinfo)
[docs]def subproc_captured_stdout(*cmds):
"""Runs a subprocess, capturing the output. Returns the stdout
that was produced as a str.
"""
return run_subproc(cmds, captured='stdout')
[docs]def subproc_captured_inject(*cmds):
"""Runs a subprocess, capturing the output. Returns a list of
whitespace-separated strings in the stdout that was produced."""
return [i.strip() for i in run_subproc(cmds, captured='stdout').split()]
[docs]def subproc_captured_object(*cmds):
"""
Runs a subprocess, capturing the output. Returns an instance of
``CompletedCommand`` representing the completed command.
"""
return run_subproc(cmds, captured='object')
[docs]def subproc_captured_hiddenobject(*cmds):
"""
Runs a subprocess, capturing the output. Returns an instance of
``HiddenCompletedCommand`` representing the completed command.
"""
return run_subproc(cmds, captured='hiddenobject')
[docs]def subproc_uncaptured(*cmds):
"""Runs a subprocess, without capturing the output. Returns the stdout
that was produced as a str.
"""
return run_subproc(cmds, captured=False)
[docs]def ensure_list_of_strs(x):
"""Ensures that x is a list of strings."""
if isinstance(x, str):
rtn = [x]
elif isinstance(x, Sequence):
rtn = [i if isinstance(i, str) else str(i) for i in x]
else:
rtn = [str(x)]
return rtn
[docs]def load_builtins(execer=None, config=None, login=False, ctx=None):
"""Loads the xonsh builtins into the Python builtins. Sets the
BUILTINS_LOADED variable to True.
"""
global BUILTINS_LOADED, ENV
# private built-ins
builtins.__xonsh_config__ = {}
builtins.__xonsh_env__ = ENV = Env(default_env(config=config, login=login))
builtins.__xonsh_help__ = helper
builtins.__xonsh_superhelp__ = superhelper
builtins.__xonsh_regexpath__ = regexpath
builtins.__xonsh_glob__ = globpath
builtins.__xonsh_expand_path__ = expand_path
builtins.__xonsh_exit__ = False
builtins.__xonsh_stdout_uncaptured__ = None
builtins.__xonsh_stderr_uncaptured__ = None
if hasattr(builtins, 'exit'):
builtins.__xonsh_pyexit__ = builtins.exit
del builtins.exit
if hasattr(builtins, 'quit'):
builtins.__xonsh_pyquit__ = builtins.quit
del builtins.quit
builtins.__xonsh_subproc_captured_stdout__ = subproc_captured_stdout
builtins.__xonsh_subproc_captured_inject__ = subproc_captured_inject
builtins.__xonsh_subproc_captured_object__ = subproc_captured_object
builtins.__xonsh_subproc_captured_hiddenobject__ = subproc_captured_hiddenobject
builtins.__xonsh_subproc_uncaptured__ = subproc_uncaptured
builtins.__xonsh_execer__ = execer
builtins.__xonsh_commands_cache__ = CommandsCache()
builtins.__xonsh_all_jobs__ = {}
builtins.__xonsh_ensure_list_of_strs__ = ensure_list_of_strs
# public built-ins
builtins.evalx = None if execer is None else execer.eval
builtins.execx = None if execer is None else execer.exec
builtins.compilex = None if execer is None else execer.compile
# Need this inline/lazy import here since we use locate_binary that relies on __xonsh_env__ in default aliases
builtins.default_aliases = builtins.aliases = Aliases(make_default_aliases())
if login:
builtins.aliases.update(load_foreign_aliases(issue_warning=False))
# history needs to be started after env and aliases
# would be nice to actually include non-detyped versions.
builtins.__xonsh_history__ = History(env=ENV.detype(),
ts=[time.time(), None], locked=True)
atexit.register(_lastflush)
for sig in AT_EXIT_SIGNALS:
resetting_signal_handle(sig, _lastflush)
BUILTINS_LOADED = True
def _lastflush(s=None, f=None):
builtins.__xonsh_history__.flush(at_exit=True)
[docs]def unload_builtins():
"""Removes the xonsh builtins from the Python builtins, if the
BUILTINS_LOADED is True, sets BUILTINS_LOADED to False, and returns.
"""
global BUILTINS_LOADED, ENV
if ENV is not None:
ENV.undo_replace_env()
ENV = None
if hasattr(builtins, '__xonsh_pyexit__'):
builtins.exit = builtins.__xonsh_pyexit__
if hasattr(builtins, '__xonsh_pyquit__'):
builtins.quit = builtins.__xonsh_pyquit__
if not BUILTINS_LOADED:
return
names = ['__xonsh_config__',
'__xonsh_env__',
'__xonsh_ctx__',
'__xonsh_help__',
'__xonsh_superhelp__',
'__xonsh_regexpath__',
'__xonsh_glob__',
'__xonsh_expand_path__',
'__xonsh_exit__',
'__xonsh_stdout_uncaptured__',
'__xonsh_stderr_uncaptured__',
'__xonsh_pyexit__',
'__xonsh_pyquit__',
'__xonsh_subproc_captured_stdout__',
'__xonsh_subproc_captured_inject__',
'__xonsh_subproc_captured_object__',
'__xonsh_subproc_captured_hiddenobject__',
'__xonsh_subproc_uncaptured__',
'__xonsh_execer__',
'__xonsh_commands_cache__',
'evalx',
'execx',
'compilex',
'default_aliases',
'__xonsh_all_jobs__',
'__xonsh_ensure_list_of_strs__',
'__xonsh_history__',
]
for name in names:
if hasattr(builtins, name):
delattr(builtins, name)
BUILTINS_LOADED = False
@contextmanager
[docs]def xonsh_builtins(execer=None):
"""A context manager for using the xonsh builtins only in a limited
scope. Likely useful in testing.
"""
load_builtins(execer=execer)
yield
unload_builtins()