# Copyright (c) 2013-2015 by Ron Frederick <ronf@timeheart.net>.
# All rights reserved.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v1.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-v10.html
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""Miscellaneous utility classes and functions"""
import ipaddress
import socket
from random import SystemRandom
from .constants import DEFAULT_LANG
# Define a version of randrange which is based on SystemRandom(), so that
# we get back numbers suitable for cryptographic use.
_random = SystemRandom()
randrange = _random.randrange
def all_ints(seq):
"""Return if a sequence contains all integers"""
return all(isinstance(i, int) for i in seq)
def mod_inverse(x, m):
"""Compute the modular inverse (x^-1) modulo m"""
# pylint: disable=invalid-name
a, b, c, d = m, x % m, 0, 1
while b:
q, r = divmod(a, b)
a, b, c, d = b, r, d, c - q*d
if a == 1:
return c if c >= 0 else c + m
else:
raise ValueError('%d has no inverse mod %d' % (x, m))
def _normalize_scoped_ip(addr):
"""Normalize scoped IP address
The ipaddress module doesn't handle scoped addresses properly,
so we strip off the CIDR suffix here and normalize scoped IP
addresses using socket.inet_pton before we pass them into
ipaddress.
"""
for family in (socket.AF_INET, socket.AF_INET6):
try:
return socket.inet_ntop(family, socket.inet_pton(family, addr))
except (ValueError, socket.error):
pass
return addr
def ip_address(addr):
"""Wrapper for ipaddress.ip_address which supports scoped addresses"""
return ipaddress.ip_address(_normalize_scoped_ip(addr))
def ip_network(addr):
"""Wrapper for ipaddress.ip_network which supports scoped addresses"""
idx = addr.find('/')
if idx >= 0:
addr, mask = addr[:idx], addr[idx:]
else:
mask = ''
return ipaddress.ip_network(_normalize_scoped_ip(addr) + mask)
class Error(Exception):
"""General SSH error"""
def __init__(self, errtype, code, reason, lang):
super().__init__('%s Error: %s' % (errtype, reason))
self.code = code
self.reason = reason
self.lang = lang
[docs]class DisconnectError(Error):
"""SSH disconnect error
This exception is raised when a serious error occurs which causes
the SSH connection to be disconnected. Exception codes should be
taken from :ref:`disconnect reason codes <DisconnectReasons>`.
:param integer code:
Disconnect reason, taken from :ref:`disconnect reason
codes <DisconnectReasons>`
:param string reason:
A human-readable reason for the disconnect
:param string lang:
The language the reason is in
"""
def __init__(self, code, reason, lang=DEFAULT_LANG):
super().__init__('Disconnect', code, reason, lang)
[docs]class ChannelOpenError(Error):
"""SSH channel open error
This exception is raised by connection handlers to report
channel open failures.
:param integer code:
Channel open failure reason, taken from :ref:`channel open
failure reason codes <ChannelOpenFailureReasons>`
:param string reason:
A human-readable reason for the channel open failure
:param string lang:
The language the reason is in
"""
def __init__(self, code, reason, lang=DEFAULT_LANG):
super().__init__('Channel Open', code, reason, lang)
[docs]class BreakReceived(Exception):
"""SSH break request received
This exception is raised on an SSH server stdin stream when the
client sends a break on the channel.
:param integer msec:
The duration of the break in milliseconds
"""
def __init__(self, msec):
super().__init__('Break for %s msec' % msec)
self.msec = msec
[docs]class SignalReceived(Exception):
"""SSH signal request received
This exception is raised on an SSH server stdin stream when the
client sends a signal on the channel.
:param string signal:
The name of the signal sent by the client
"""
def __init__(self, signal):
super().__init__('Signal: %s' % signal)
self.signal = signal
[docs]class TerminalSizeChanged(Exception):
"""SSH terminal size change notification received
This exception is raised on an SSH server stdin stream when the
client sends a terminal size change on the channel.
:param integer width:
The new terminal width
:param integer height:
The new terminal height
:param integer pixwidth:
The new terminal width in pixels
:param integer pixheight:
The new terminal height in pixels
"""
def __init__(self, width, height, pixwidth, pixheight):
super().__init__('Terminal size change: (%s, %s, %s, %s)' %
(width, height, pixwidth, pixheight))
self.width = width
self.height = height
self.pixwidth = pixwidth
self.pixheight = pixheight