Source code for asyncssh.auth_keys

# Copyright (c) 2015 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""Parser for SSH known_hosts files"""

import socket

from .misc import ip_address
from .pattern import HostPatternList, WildcardPatternList
from .public_key import import_public_key, KeyImportError


class _SSHAuthorizedKeyEntry:
    """An entry in an SSH authorized_keys list"""

    def __init__(self, line):
        self.options = {}

        try:
            self.key = import_public_key(line)
            return
        except KeyImportError:
            pass

        line = self._parse_options(line)
        self.key = import_public_key(line)

    def _set_string(self, option, value):
        """Set an option with a string value"""

        self.options[option] = value

    def _add_environment(self, option, value):
        """Add an environment key/value pair"""

        if value.startswith('=') or '=' not in value:
            raise ValueError('Invalid environment entry in authorized_keys')

        name, value = value.split('=', 1)
        self.options.setdefault(option, {})[name] = value

    def _add_from(self, option, value):
        """Add a from host pattern"""

        self.options.setdefault(option, []).append(HostPatternList(value))

    def _add_permitopen(self, option, value):
        """Add a permitopen host/port pair"""

        try:
            host, port = value.rsplit(':', 1)

            if host.startswith('[') and host.endswith(']'):
                host = host[1:-1]

            port = None if port == '*' else int(port)
        except:
            raise ValueError('Illegal permitopen value: %s' % value) from None

        self.options.setdefault(option, set()).add((host, port))

    def _add_principals(self, option, value):
        """Add a principals wildcard pattern list"""

        self.options.setdefault(option, []).append(WildcardPatternList(value))

    _handlers = {
        'command':     _set_string,
        'environment': _add_environment,
        'from':        _add_from,
        'permitopen':  _add_permitopen,
        'principals':  _add_principals
    }

    def _add_option(self):
        """Add an option value"""

        if self._option.startswith('='):
            raise ValueError('Missing option name in authorized_keys')

        if '=' in self._option:
            option, value = self._option.split('=', 1)

            handler = self._handlers.get(option)
            if handler:
                handler(self, option, value)
            else:
                self.options.setdefault(option, []).append(value)
        else:
            self.options[self._option] = True

    def _parse_options(self, line):
        """Parse options in this entry"""

        self._option = ''

        idx = 0
        quoted = False
        escaped = False

        for idx, ch in enumerate(line):
            if escaped:
                self._option += ch
                escaped = False
            elif ch == '\\':
                escaped = True
            elif ch == '"':
                quoted = not quoted
            elif quoted:
                self._option += ch
            elif ch in ' \t':
                break
            elif ch == ',':
                self._add_option()
                self._option = ''
            else:
                self._option += ch

        self._add_option()

        if quoted:
            raise ValueError('Unbalanced quote in authorized_keys')
        elif escaped:
            raise ValueError('Unbalanced backslash in authorized_keys')

        return line[idx:].strip()


[docs]class SSHAuthorizedKeys: """An SSH authorized keys list""" def __init__(self, data): self._user_entries = [] self._ca_entries = [] for line in data.splitlines(): line = line.strip() if not line or line.startswith('#'): continue try: entry = _SSHAuthorizedKeyEntry(line) except KeyImportError: continue if 'cert-authority' in entry.options: self._ca_entries.append(entry) else: self._user_entries.append(entry) def validate(self, key, client_addr, cert_principals=None, ca=False): """Return whether a public key or CA is valid for authentication""" for entry in self._ca_entries if ca else self._user_entries: if entry.key != key: continue from_patterns = entry.options.get('from') if from_patterns is not None: client_host, _ = socket.getnameinfo((client_addr, 0), socket.NI_NUMERICSERV) client_ip = ip_address(client_addr) if not all(pattern.matches(client_host, client_addr, client_ip) for pattern in from_patterns): continue principal_patterns = entry.options.get('principals') if cert_principals is not None and principal_patterns is not None: if not all(any(pattern.matches(principal) for principal in cert_principals) for pattern in principal_patterns): continue return entry.options return None
[docs]def import_authorized_keys(data): """Import SSH authorized keys This function imports public keys and associated options in OpenSSH authorized keys format. :param string data: The key data to import. :returns: An :class:`SSHAuthorizedKeys` object """ return SSHAuthorizedKeys(data)
[docs]def read_authorized_keys(filename): """Read SSH authorized keys from a file This function reads public keys and associated options in OpenSSH authorized_keys format from a file. :param string filename: The file to read the keys from. :returns: An :class:`SSHAuthorizedKeys` object """ with open(filename, 'r') as f: return import_authorized_keys(f.read())