"""Utilities for all Certbot."""
import argparse
import collections
# distutils.version under virtualenv confuses pylint
# For more info, see: https://github.com/PyCQA/pylint/issues/73
import distutils.version # pylint: disable=import-error,no-name-in-module
import errno
import logging
import os
import platform
import re
import six
import socket
import stat
import subprocess
import sys
import configargparse
from certbot import errors
logger = logging.getLogger(__name__)
Key = collections.namedtuple("Key", "file pem")
# Note: form is the type of data, "pem" or "der"
CSR = collections.namedtuple("CSR", "file data form")
# ANSI SGR escape codes
# Formats text as bold or with increased intensity
ANSI_SGR_BOLD = '\033[1m'
# Colors text red
ANSI_SGR_RED = "\033[31m"
# Resets output format
ANSI_SGR_RESET = "\033[0m"
[docs]def run_script(params):
"""Run the script with the given params.
:param list params: List of parameters to pass to Popen
"""
try:
proc = subprocess.Popen(params,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except (OSError, ValueError):
msg = "Unable to run the command: %s" % " ".join(params)
logger.error(msg)
raise errors.SubprocessError(msg)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
msg = "Error while running %s.\n%s\n%s" % (
" ".join(params), stdout, stderr)
# Enter recovery routine...
logger.error(msg)
raise errors.SubprocessError(msg)
return stdout, stderr
[docs]def exe_exists(exe):
"""Determine whether path/name refers to an executable.
:param str exe: Executable path or name
:returns: If exe is a valid executable
:rtype: bool
"""
def is_exe(path):
"""Determine if path is an exe."""
return os.path.isfile(path) and os.access(path, os.X_OK)
path, _ = os.path.split(exe)
if path:
return is_exe(exe)
else:
for path in os.environ["PATH"].split(os.pathsep):
if is_exe(os.path.join(path, exe)):
return True
return False
[docs]def make_or_verify_dir(directory, mode=0o755, uid=0, strict=False):
"""Make sure directory exists with proper permissions.
:param str directory: Path to a directory.
:param int mode: Directory mode.
:param int uid: Directory owner.
:raises .errors.Error: if a directory already exists,
but has wrong permissions or owner
:raises OSError: if invalid or inaccessible file names and
paths, or other arguments that have the correct type,
but are not accepted by the operating system.
"""
try:
os.makedirs(directory, mode)
except OSError as exception:
if exception.errno == errno.EEXIST:
if strict and not check_permissions(directory, mode, uid):
raise errors.Error(
"%s exists, but it should be owned by user %d with"
"permissions %s" % (directory, uid, oct(mode)))
else:
raise
[docs]def check_permissions(filepath, mode, uid=0):
"""Check file or directory permissions.
:param str filepath: Path to the tested file (or directory).
:param int mode: Expected file mode.
:param int uid: Expected file owner.
:returns: True if `mode` and `uid` match, False otherwise.
:rtype: bool
"""
file_stat = os.stat(filepath)
return stat.S_IMODE(file_stat.st_mode) == mode and file_stat.st_uid == uid
[docs]def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`, uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`, uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def _unique_file(path, filename_pat, count, mode):
while True:
current_path = os.path.join(path, filename_pat(count))
try:
return safe_open(current_path, chmod=mode),\
os.path.abspath(current_path)
except OSError as err:
# "File exists," is okay, try a different name.
if err.errno != errno.EEXIST:
raise
count += 1
[docs]def unique_file(path, mode=0o777):
"""Safely finds a unique file.
:param str path: path/filename.ext
:param int mode: File mode
:returns: tuple of file object and file name
"""
path, tail = os.path.split(path)
return _unique_file(
path, filename_pat=(lambda count: "%04d_%s" % (count, tail)),
count=0, mode=mode)
[docs]def unique_lineage_name(path, filename, mode=0o777):
"""Safely finds a unique file using lineage convention.
:param str path: directory path
:param str filename: proposed filename
:param int mode: file mode
:returns: tuple of file object and file name (which may be modified
from the requested one by appending digits to ensure uniqueness)
:raises OSError: if writing files fails for an unanticipated reason,
such as a full disk or a lack of permission to write to
specified location.
"""
preferred_path = os.path.join(path, "%s.conf" % (filename))
try:
return safe_open(preferred_path, chmod=mode), preferred_path
except OSError as err:
if err.errno != errno.EEXIST:
raise
return _unique_file(
path, filename_pat=(lambda count: "%s-%04d.conf" % (filename, count)),
count=1, mode=mode)
[docs]def safely_remove(path):
"""Remove a file that may not exist."""
try:
os.remove(path)
except OSError as err:
if err.errno != errno.ENOENT:
raise
[docs]def get_os_info(filepath="/etc/os-release"):
"""
Get OS name and version
:param str filepath: File path of os-release file
:returns: (os_name, os_version)
:rtype: `tuple` of `str`
"""
if os.path.isfile(filepath):
# Systemd os-release parsing might be viable
os_name, os_version = get_systemd_os_info(filepath=filepath)
if os_name:
return (os_name, os_version)
# Fallback to platform module
return get_python_os_info()
[docs]def get_os_info_ua(filepath="/etc/os-release"):
"""
Get OS name and version string for User Agent
:param str filepath: File path of os-release file
:returns: os_ua
:rtype: `str`
"""
if os.path.isfile(filepath):
os_ua = _get_systemd_os_release_var("PRETTY_NAME", filepath=filepath)
if not os_ua:
os_ua = _get_systemd_os_release_var("NAME", filepath=filepath)
if os_ua:
return os_ua
# Fallback
return " ".join(get_python_os_info())
[docs]def get_systemd_os_info(filepath="/etc/os-release"):
"""
Parse systemd /etc/os-release for distribution information
:param str filepath: File path of os-release file
:returns: (os_name, os_version)
:rtype: `tuple` of `str`
"""
os_name = _get_systemd_os_release_var("ID", filepath=filepath)
os_version = _get_systemd_os_release_var("VERSION_ID", filepath=filepath)
return (os_name, os_version)
[docs]def _get_systemd_os_release_var(varname, filepath="/etc/os-release"):
"""
Get single value from systemd /etc/os-release
:param str varname: Name of variable to fetch
:param str filepath: File path of os-release file
:returns: requested value
:rtype: `str`
"""
var_string = varname+"="
if not os.path.isfile(filepath):
return ""
with open(filepath, 'r') as fh:
contents = fh.readlines()
for line in contents:
if line.strip().startswith(var_string):
# Return the value of var, normalized
return _normalize_string(line.strip()[len(var_string):])
return ""
[docs]def _normalize_string(orig):
"""
Helper function for _get_systemd_os_release_var() to remove quotes
and whitespaces
"""
return orig.replace('"', '').replace("'", "").strip()
[docs]def get_python_os_info():
"""
Get Operating System type/distribution and major version
using python platform module
:returns: (os_name, os_version)
:rtype: `tuple` of `str`
"""
info = platform.system_alias(
platform.system(),
platform.release(),
platform.version()
)
os_type, os_ver, _ = info
os_type = os_type.lower()
if os_type.startswith('linux'):
info = platform.linux_distribution()
# On arch, platform.linux_distribution() is reportedly ('','',''),
# so handle it defensively
if info[0]:
os_type = info[0]
if info[1]:
os_ver = info[1]
elif os_type.startswith('darwin'):
os_ver = subprocess.Popen(
["sw_vers", "-productVersion"],
stdout=subprocess.PIPE
).communicate()[0].rstrip('\n')
elif os_type.startswith('freebsd'):
# eg "9.3-RC3-p1"
os_ver = os_ver.partition("-")[0]
os_ver = os_ver.partition(".")[0]
elif platform.win32_ver()[1]:
os_ver = platform.win32_ver()[1]
else:
# Cases known to fall here: Cygwin python
os_ver = ''
return os_type, os_ver
# Just make sure we don't get pwned... Make sure that it also doesn't
# start with a period or have two consecutive periods <- this needs to
# be done in addition to the regex
EMAIL_REGEX = re.compile("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+$")
[docs]def safe_email(email):
"""Scrub email address before using it."""
if EMAIL_REGEX.match(email) is not None:
return not email.startswith(".") and ".." not in email
else:
logger.warn("Invalid email address: %s.", email)
return False
[docs]def add_deprecated_argument(add_argument, argument_name, nargs):
"""Adds a deprecated argument with the name argument_name.
Deprecated arguments are not shown in the help. If they are used on
the command line, a warning is shown stating that the argument is
deprecated and no other action is taken.
:param callable add_argument: Function that adds arguments to an
argument parser/group.
:param str argument_name: Name of deprecated argument.
:param nargs: Value for nargs when adding the argument to argparse.
"""
class ShowWarning(argparse.Action):
"""Action to log a warning when an argument is used."""
def __call__(self, unused1, unused2, unused3, option_string=None):
sys.stderr.write(
"Use of {0} is deprecated.\n".format(option_string))
configargparse.ACTION_TYPES_THAT_DONT_NEED_A_VALUE.add(ShowWarning)
add_argument(argument_name, action=ShowWarning,
help=argparse.SUPPRESS, nargs=nargs)
[docs]def enforce_domain_sanity(domain):
"""Method which validates domain value and errors out if
the requirements are not met.
:param domain: Domain to check
:type domains: `str` or `unicode`
:raises ConfigurationError: for invalid domains and cases where Let's
Encrypt currently will not issue certificates
:returns: The domain cast to `str`, with ASCII-only contents
:rtype: str
"""
# Check if there's a wildcard domain
if domain.startswith("*."):
raise errors.ConfigurationError(
"Wildcard domains are not supported: {0}".format(domain))
# Punycode
if "xn--" in domain:
raise errors.ConfigurationError(
"Punycode domains are not presently supported: {0}".format(domain))
# Unicode
try:
domain = domain.encode('ascii').lower()
except UnicodeError:
error_fmt = (u"Internationalized domain names "
"are not presently supported: {0}")
if isinstance(domain, six.text_type):
raise errors.ConfigurationError(error_fmt.format(domain))
else:
raise errors.ConfigurationError(str(error_fmt).format(domain))
# Remove trailing dot
domain = domain[:-1] if domain.endswith('.') else domain
# Explain separately that IP addresses aren't allowed (apart from not
# being FQDNs) because hope springs eternal concerning this point
try:
socket.inet_aton(domain)
raise errors.ConfigurationError(
"Requested name {0} is an IP address. The Let's Encrypt "
"certificate authority will not issue certificates for a "
"bare IP address.".format(domain))
except socket.error:
# It wasn't an IP address, so that's good
pass
# FQDN checks from
# http://www.mkyong.com/regular-expressions/domain-name-regular-expression-example/
# Characters used, domain parts < 63 chars, tld > 1 < 64 chars
# first and last char is not "-"
fqdn = re.compile("^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\\.)+[A-Za-z]{2,63}$")
if not fqdn.match(domain):
raise errors.ConfigurationError("Requested domain {0} is not a FQDN"
.format(domain))
return domain
[docs]def get_strict_version(normalized):
"""Converts a normalized version to a strict version.
:param str normalized: normalized version string
:returns: An equivalent strict version
:rtype: distutils.version.StrictVersion
"""
# strict version ending with "a" and a number designates a pre-release
# pylint: disable=no-member
return distutils.version.StrictVersion(normalized.replace(".dev", "a"))