Source code for certbot.hooks

"""Facilities for implementing hooks that call shell commands."""
from __future__ import print_function

import logging
import os

from subprocess import Popen, PIPE

from certbot import errors
from certbot import util

from certbot.plugins import util as plug_util

logger = logging.getLogger(__name__)


[docs]def validate_hooks(config): """Check hook commands are executable.""" validate_hook(config.pre_hook, "pre") validate_hook(config.post_hook, "post") validate_hook(config.deploy_hook, "deploy") validate_hook(config.renew_hook, "renew")
[docs]def _prog(shell_cmd): """Extract the program run by a shell command. :param str shell_cmd: command to be executed :returns: basename of command or None if the command isn't found :rtype: str or None """ if not util.exe_exists(shell_cmd): plug_util.path_surgery(shell_cmd) if not util.exe_exists(shell_cmd): return None return os.path.basename(shell_cmd)
[docs]def validate_hook(shell_cmd, hook_name): """Check that a command provided as a hook is plausibly executable. :raises .errors.HookCommandNotFound: if the command is not found """ if shell_cmd: cmd = shell_cmd.split(None, 1)[0] if not _prog(cmd): path = os.environ["PATH"] if os.path.exists(cmd): msg = "{1}-hook command {0} exists, but is not executable.".format(cmd, hook_name) else: msg = "Unable to find {2}-hook command {0} in the PATH.\n(PATH is {1})".format( cmd, path, hook_name) raise errors.HookCommandNotFound(msg)
[docs]def pre_hook(config): "Run pre-hook if it's defined and hasn't been run." cmd = config.pre_hook if cmd and cmd not in pre_hook.already: logger.info("Running pre-hook command: %s", cmd) _run_hook(cmd) pre_hook.already.add(cmd) elif cmd: logger.info("Pre-hook command already run, skipping: %s", cmd)
pre_hook.already = set() # type: ignore
[docs]def post_hook(config): """Run post hook if defined. If the verb is renew, we might have more certs to renew, so we wait until run_saved_post_hooks() is called. """ cmd = config.post_hook # In the "renew" case, we save these up to run at the end if config.verb == "renew": if cmd and cmd not in post_hook.eventually: post_hook.eventually.append(cmd) # certonly / run elif cmd: logger.info("Running post-hook command: %s", cmd) _run_hook(cmd)
post_hook.eventually = [] # type: ignore
[docs]def run_saved_post_hooks(): """Run any post hooks that were saved up in the course of the 'renew' verb""" for cmd in post_hook.eventually: logger.info("Running post-hook command: %s", cmd) _run_hook(cmd)
[docs]def deploy_hook(config, domains, lineage_path): """Run post-issuance hook if defined. :param configuration.NamespaceConfig config: Certbot settings :param domains: domains in the obtained certificate :type domains: `list` of `str` :param str lineage_path: live directory path for the new cert """ if config.deploy_hook: renew_hook(config, domains, lineage_path)
[docs]def renew_hook(config, domains, lineage_path): """Run post-renewal hook if defined.""" if config.renew_hook: if not config.dry_run: os.environ["RENEWED_DOMAINS"] = " ".join(domains) os.environ["RENEWED_LINEAGE"] = lineage_path logger.info("Running deploy-hook command: %s", config.renew_hook) _run_hook(config.renew_hook) else: logger.warning( "Dry run: skipping deploy hook command: %s", config.renew_hook)
[docs]def _run_hook(shell_cmd): """Run a hook command. :returns: stderr if there was any""" err, _ = execute(shell_cmd) return err
[docs]def execute(shell_cmd): """Run a command. :returns: `tuple` (`str` stderr, `str` stdout)""" # universal_newlines causes Popen.communicate() # to return str objects instead of bytes in Python 3 cmd = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, err = cmd.communicate() base_cmd = os.path.basename(shell_cmd.split(None, 1)[0]) if out: logger.info('Output from %s:\n%s', base_cmd, out) if cmd.returncode != 0: logger.error('Hook command "%s" returned error code %d', shell_cmd, cmd.returncode) if err: logger.error('Error output from %s:\n%s', base_cmd, err) return (err, out)