Source code for asyncssh.agent
# Copyright (c) 2016 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""SSH agent client"""
import asyncio
import os
from .misc import ChannelOpenError
from .packet import Byte, String, UInt32, PacketDecodeError, SSHPacket
from .public_key import SSHKeyPair
# pylint: disable=bad-whitespace
# Generic agent replies
SSH_AGENT_FAILURE = 5
# Protocol 2 key operations
SSH2_AGENTC_REQUEST_IDENTITIES = 11
SSH2_AGENT_IDENTITIES_ANSWER = 12
SSH2_AGENTC_SIGN_REQUEST = 13
SSH2_AGENT_SIGN_RESPONSE = 14
# pylint: enable=bad-whitespace
class _SSHAgentKeyPair(SSHKeyPair):
"""Surrogate for a key managed by the SSH agent"""
def __init__(self, agent, public_data, comment):
self._agent = agent
packet = SSHPacket(public_data)
self.algorithm = packet.get_string()
self.public_data = public_data
self.comment = comment
@asyncio.coroutine
def sign(self, data):
"""Sign a block of data with this private key"""
return (yield from self._agent.sign(self.public_data, data))
[docs]class SSHAgentClient:
"""SSH agent client"""
def __init__(self, loop, agent_path):
self._loop = loop
self._agent_path = agent_path
self._reader = None
self._writer = None
self._lock = asyncio.Lock()
def _cleanup(self):
"""Clean up this SSH agent client"""
if self._writer:
self._writer.close()
self._reader = None
self._writer = None
@asyncio.coroutine
def connect(self):
"""Connect to the SSH agent"""
if isinstance(self._agent_path, str):
# pylint doesn't think open_unix_connection exists
# pylint: disable=no-member
self._reader, self._writer = \
yield from asyncio.open_unix_connection(self._agent_path,
loop=self._loop)
else:
self._reader, self._writer = \
yield from self._agent_path.open_agent_connection()
@asyncio.coroutine
def _make_request(self, msgtype, *args):
"""Send an SSH agent request"""
with (yield from self._lock):
try:
if not self._writer:
yield from self.connect()
payload = Byte(msgtype) + b''.join(args)
self._writer.write(UInt32(len(payload)) + payload)
resplen = yield from self._reader.readexactly(4)
resplen = int.from_bytes(resplen, 'big')
resp = yield from self._reader.readexactly(resplen)
resp = SSHPacket(resp)
resptype = resp.get_byte()
return resptype, resp
except (OSError, EOFError, PacketDecodeError) as exc:
self._cleanup()
raise ValueError(str(exc)) from None
@asyncio.coroutine
[docs] def get_keys(self):
"""Request the available client keys
This method is a coroutine which returns a list of client keys
available in the ssh-agent.
:returns: A list of :class:`SSHKeyPair` objects
"""
resptype, resp = \
yield from self._make_request(SSH2_AGENTC_REQUEST_IDENTITIES)
if resptype == SSH2_AGENT_IDENTITIES_ANSWER:
result = []
num_keys = resp.get_uint32()
for _ in range(num_keys):
key_blob = resp.get_string()
comment = resp.get_string()
result.append(_SSHAgentKeyPair(self, key_blob, comment))
resp.check_end()
return result
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
@asyncio.coroutine
def sign(self, key_blob, data):
"""Sign a block of data with this private key"""
resptype, resp = \
yield from self._make_request(SSH2_AGENTC_SIGN_REQUEST,
String(key_blob), String(data),
UInt32(0))
if resptype == SSH2_AGENT_SIGN_RESPONSE:
sig = resp.get_string()
resp.check_end()
return sig
elif resptype == SSH_AGENT_FAILURE:
raise ValueError('Unknown key passed to SSH agent')
else:
raise ValueError('Unknown SSH agent response: %d' % resptype)
[docs] def close(self):
"""Close the SSH agent connection
This method closes the connection to the ssh-agent. Any
attempts to use this :class:``SSHAgentClient`` or the key
pairs it previously returned will result in an error.
"""
self._cleanup()
@asyncio.coroutine
[docs]def connect_agent(agent_path=None, *, loop=None):
"""Make a connection to the SSH agent
This function attempts to connect to an ssh-agent process
listening on a UNIX domain socket at ``agent_path``. If not
provided, it will attempt to get the path from the ``SSH_AUTH_SOCK``
environment variable.
If the connection is successful, an ``SSHAgentClient`` object
is returned that has methods on it you can use to query the
ssh-agent. If no path is specified and the environment variable
is not set or the connection to the agent fails, this function
returns ``None``.
:param agent_path: (optional)
The path to use to contact the ssh-agent process, or the
:class:`SSHServerConnection` to forward the agent request
over.
:param loop: (optional)
The event loop to use when creating the connection. If not
specified, the default event loop is used.
:type agent_path: str or :class:`SSHServerConnection`
:returns: An :class:`SSHAgentClient` or ``None``
"""
if not loop:
loop = asyncio.get_event_loop()
if not agent_path:
agent_path = os.environ.get('SSH_AUTH_SOCK', None)
if not agent_path:
return None
agent = SSHAgentClient(loop, agent_path)
try:
yield from agent.connect()
return agent
except (OSError, ChannelOpenError):
return None