import os
import re
import shlex
import pickle
import hashlib
import pathlib
import builtins
import subprocess
import xonsh.lazyasd as xl
import xonsh.platform as xp
RE_DASHF = xl.LazyObject(lambda: re.compile(r'-F\s+(\w+)'),
globals(), 'RE_DASHF')
INITED = False
BASH_COMPLETE_HASH = None
BASH_COMPLETE_FUNCS = {}
BASH_COMPLETE_FILES = {}
CACHED_HASH = None
CACHED_FUNCS = None
CACHED_FILES = None
BASH_COMPLETE_SCRIPT = """source "{filename}"
COMP_WORDS=({line})
COMP_LINE={comp_line}
COMP_POINT=${{#COMP_LINE}}
COMP_COUNT={end}
COMP_CWORD={n}
{func} {cmd} {prefix} {prev}
for ((i=0;i<${{#COMPREPLY[*]}};i++)) do echo ${{COMPREPLY[i]}}; done
"""
[docs]def update_bash_completion():
global BASH_COMPLETE_FUNCS, BASH_COMPLETE_FILES, BASH_COMPLETE_HASH
global CACHED_FUNCS, CACHED_FILES, CACHED_HASH, INITED
completers = builtins.__xonsh_env__.get('BASH_COMPLETIONS', ())
BASH_COMPLETE_HASH = hashlib.md5(repr(completers).encode()).hexdigest()
datadir = builtins.__xonsh_env__['XONSH_DATA_DIR']
cachefname = os.path.join(datadir, 'bash_completion_cache')
if not INITED:
if os.path.isfile(cachefname):
# load from cache
with open(cachefname, 'rb') as cache:
CACHED_HASH, CACHED_FUNCS, CACHED_FILES = pickle.load(cache)
BASH_COMPLETE_HASH = CACHED_HASH
BASH_COMPLETE_FUNCS = CACHED_FUNCS
BASH_COMPLETE_FILES = CACHED_FILES
else:
# create initial cache
_load_bash_complete_funcs()
_load_bash_complete_files()
CACHED_HASH = BASH_COMPLETE_HASH
CACHED_FUNCS = BASH_COMPLETE_FUNCS
CACHED_FILES = BASH_COMPLETE_FILES
with open(cachefname, 'wb') as cache:
val = (CACHED_HASH, CACHED_FUNCS, CACHED_FILES)
pickle.dump(val, cache)
INITED = True
invalid = ((not os.path.isfile(cachefname)) or
BASH_COMPLETE_HASH != CACHED_HASH or
_completions_time() > os.stat(cachefname).st_mtime)
if invalid:
# update the cache
_load_bash_complete_funcs()
_load_bash_complete_files()
CACHED_HASH = BASH_COMPLETE_HASH
CACHED_FUNCS = BASH_COMPLETE_FUNCS
CACHED_FILES = BASH_COMPLETE_FILES
with open(cachefname, 'wb') as cache:
val = (CACHED_HASH, BASH_COMPLETE_FUNCS, BASH_COMPLETE_FILES)
pickle.dump(val, cache)
[docs]def complete_from_bash(prefix, line, begidx, endidx, ctx):
"""Completes based on results from BASH completion."""
update_bash_completion()
splt = line.split()
cmd = splt[0]
func = BASH_COMPLETE_FUNCS.get(cmd, None)
fnme = BASH_COMPLETE_FILES.get(cmd, None)
if func is None or fnme is None:
return set()
idx = n = 0
for n, tok in enumerate(splt):
if tok == prefix:
idx = line.find(prefix, idx)
if idx >= begidx:
break
prev = tok
if len(prefix) == 0:
prefix = '""'
n += 1
else:
prefix = shlex.quote(prefix)
script = BASH_COMPLETE_SCRIPT.format(
filename=fnme, line=' '.join(shlex.quote(p) for p in splt),
comp_line=shlex.quote(line), n=n, func=func, cmd=cmd,
end=endidx + 1, prefix=prefix, prev=shlex.quote(prev))
try:
out = subprocess.check_output(
[xp.bash_command()], input=script, universal_newlines=True,
stderr=subprocess.PIPE, env=builtins.__xonsh_env__.detype())
except (subprocess.CalledProcessError, FileNotFoundError):
out = ''
rtn = set(out.splitlines())
return rtn
def _load_bash_complete_funcs():
global BASH_COMPLETE_FUNCS
BASH_COMPLETE_FUNCS = bcf = {}
inp = _collect_completions_sources()
if not inp:
return
inp.append('complete -p\n')
out = _source_completions(inp)
for line in out.splitlines():
head, _, cmd = line.rpartition(' ')
if len(cmd) == 0 or cmd == 'cd':
continue
m = RE_DASHF.search(head)
if m is None:
continue
bcf[cmd] = m.group(1)
def _load_bash_complete_files():
global BASH_COMPLETE_FILES
inp = _collect_completions_sources()
if not inp:
BASH_COMPLETE_FILES = {}
return
if BASH_COMPLETE_FUNCS:
inp.append('shopt -s extdebug')
bash_funcs = set(BASH_COMPLETE_FUNCS.values())
inp.append('declare -F ' + ' '.join([f for f in bash_funcs]))
inp.append('shopt -u extdebug\n')
out = _source_completions(inp)
func_files = {}
for line in out.splitlines():
parts = line.split()
if xp.ON_WINDOWS:
parts = [parts[0], ' '.join(parts[2:])]
func_files[parts[0]] = parts[-1]
BASH_COMPLETE_FILES = {
cmd: func_files[func]
for cmd, func in BASH_COMPLETE_FUNCS.items()
if func in func_files
}
def _source_completions(source):
try:
return subprocess.check_output(
[xp.bash_command()], input='\n'.join(source),
universal_newlines=True, env=builtins.__xonsh_env__.detype(),
stderr=subprocess.DEVNULL)
except FileNotFoundError:
return ''
def _collect_completions_sources():
sources = []
completers = builtins.__xonsh_env__.get('BASH_COMPLETIONS', ())
paths = (pathlib.Path(x) for x in completers)
for path in paths:
if path.is_file():
sources.append('source "{}"'.format(path.as_posix()))
elif path.is_dir():
for _file in (x for x in path.glob('*') if x.is_file()):
sources.append('source "{}"'.format(_file.as_posix()))
return sources
def _completions_time():
compfiles = builtins.__xonsh_env__.get('BASH_COMPLETIONS', ())
compfiles = [os.stat(x).st_mtime for x in compfiles if os.path.exists(x)]
return max(compfiles) if compfiles else 0