Package x2go :: Module rforward
[frames] | no frames]

Source Code for Module x2go.rforward

  1  #!/usr/bin/env python 
  2   
  3  # Copyright (C) 2010-2013 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de> 
  4  # 
  5  # Python X2Go is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU Affero General Public License as published by 
  7  # the Free Software Foundation; either version 3 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # Python X2Go is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU Affero General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU Affero General Public License 
 16  # along with this program; if not, write to the 
 17  # Free Software Foundation, Inc., 
 18  # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. 
 19   
 20  """\ 
 21  X2Go reverse SSH/Paramiko tunneling provides X2Go sound, X2Go printing and 
 22  X2Go sshfs for folder sharing and mounting remote devices in X2Go terminal 
 23  server sessions. 
 24   
 25  """ 
 26  __NAME__ = 'x2gorevtunnel-pylib' 
 27   
 28  # modules 
 29  import copy 
 30  import threading 
 31  import gevent 
 32  import paramiko 
 33   
 34  # gevent/greenlet 
 35  from gevent import select, socket, Timeout 
 36   
 37  # Python X2Go modules 
 38  import log 
 39   
 40   
41 -def x2go_transport_tcp_handler(chan, (origin_addr, origin_port), (server_addr, server_port)):
42 """\ 43 An X2Go customized TCP handler for the Paramiko/SSH C{Transport()} class. 44 45 Incoming channels will be put into Paramiko's default accept queue. This corresponds to 46 the default behaviour of Paramiko's C{Transport} class. 47 48 However, additionally this handler function checks the server port of the incoming channel 49 and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming 50 channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an L{X2GoSession} instance 51 (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests). 52 53 If the server port of an incoming Paramiko/SSH channel matches the configured port of an L{X2GoRevFwTunnel} 54 instance, this instance gets notified of the incoming channel and a new L{X2GoRevFwChannelThread} is 55 started. This L{X2GoRevFwChannelThread} then takes care of the new channel's incoming data stream. 56 57 """ 58 transport = chan.get_transport() 59 transport._queue_incoming_channel(chan) 60 rev_tuns = transport.reverse_tunnels 61 62 for session_name in rev_tuns.keys(): 63 64 if int(server_port) in [ int(tunnel[0]) for tunnel in rev_tuns[session_name].values() ]: 65 66 if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]): 67 rev_tuns[session_name]['snd'][1].notify() 68 69 elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]): 70 rev_tuns[session_name]['sshfs'][1].notify()
71 72
73 -class X2GoRevFwTunnel(threading.Thread):
74 """\ 75 L{X2GoRevFwTunnel} class objects are used to reversely tunnel 76 X2Go audio, X2Go printing and X2Go folder sharing / device mounting 77 through Paramiko/SSH. 78 79 """
80 - def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
81 """\ 82 Setup a reverse tunnel through Paramiko/SSH. 83 84 After the reverse tunnel has been setup up with L{X2GoRevFwTunnel.start()} it waits 85 for notification from L{X2GoRevFwTunnel.notify()} to accept incoming channels. This 86 notification (L{X2GoRevFwTunnel.notify()} gets called from within the transport's 87 TCP handler function L{x2go_transport_tcp_handler} of the L{X2GoSession} instance. 88 89 @param server_port: the TCP/IP port on the X2Go server (starting point of the tunnel), 90 normally some number above 30000 91 @type server_port: int 92 @param remote_host: the target address for reversely tunneled traffic. With X2Go this should 93 always be set to the localhost (IPv4) address. 94 @type remote_host: str 95 @param remote_port: the TCP/IP port on the X2Go client (end point of the tunnel), 96 normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.) 97 @type remote_port: int 98 @param ssh_transport: the L{X2GoSession}'s Paramiko/SSH transport instance 99 @type ssh_transport: C{paramiko.Transport} instance 100 @param logger: you can pass an L{X2GoLogger} object to the 101 L{X2GoRevFwTunnel} constructor 102 @type logger: L{X2GoLogger} instance 103 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be 104 constructed with the given loglevel 105 @type loglevel: int 106 107 """ 108 if logger is None: 109 self.logger = log.X2GoLogger(loglevel=loglevel) 110 else: 111 self.logger = copy.deepcopy(logger) 112 self.logger.tag = __NAME__ 113 114 self.server_port = server_port 115 self.remote_host = remote_host 116 self.remote_port = remote_port 117 self.ssh_transport = ssh_transport 118 self.session_instance = session_instance 119 120 self.open_channels = {} 121 self.incoming_channel = threading.Condition() 122 123 threading.Thread.__init__(self) 124 self.daemon = True 125 self._accept_channels = True
126
127 - def __del__(self):
128 """\ 129 Class destructor. 130 131 """ 132 self.stop_thread() 133 self.cancel_port_forward('', self.server_port)
134
135 - def cancel_port_forward(self, address, port):
136 """\ 137 Cancel a port forwarding request. This cancellation request is sent to the server and 138 on the server the port forwarding should be unregistered. 139 140 @param address: remote server address 141 @type address: C{str} 142 @param port: remote port 143 @type port: C{int} 144 145 """ 146 timeout = Timeout(10) 147 timeout.start() 148 try: 149 self.ssh_transport.global_request('cancel-tcpip-forward', (address, port), wait=True) 150 except: 151 pass 152 finally: 153 timeout.cancel()
154
155 - def pause(self):
156 """\ 157 Prevent acceptance of new incoming connections through the Paramiko/SSH 158 reverse forwarding tunnel. Also, any active connection on this L{X2GoRevFwTunnel} 159 instance will be closed immediately, if this method is called. 160 161 """ 162 if self._accept_channels == True: 163 self.cancel_port_forward('', self.server_port) 164 self._accept_channels = False 165 self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
166
167 - def resume(self):
168 """\ 169 Resume operation of the Paramiko/SSH reverse forwarding tunnel 170 and continue accepting new incoming connections. 171 172 """ 173 if self._accept_channels == False: 174 self._accept_channels = True 175 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) 176 self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
177
178 - def notify(self):
179 """\ 180 Notify an L{X2GoRevFwTunnel} instance of an incoming Paramiko/SSH channel. 181 182 If an incoming reverse tunnel channel appropriate for this instance has 183 been detected, this method gets called from the L{X2GoSession}'s transport 184 TCP handler. 185 186 The sent notification will trigger a C{thread.Condition()} waiting for notification 187 in L{X2GoRevFwTunnel.run()}. 188 189 """ 190 self.incoming_channel.acquire() 191 self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG) 192 self.incoming_channel.notify() 193 self.incoming_channel.release()
194
195 - def stop_thread(self):
196 """\ 197 Stops this L{X2GoRevFwTunnel} thread completely. 198 199 """ 200 self.pause() 201 self._keepalive = False 202 self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG) 203 self.notify()
204
205 - def _request_port_forwarding(self):
206 try: 207 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) 208 except paramiko.SSHException: 209 # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on 210 # self.server_port 211 self.cancel_port_forward('', self.server_port) 212 gevent.sleep(1) 213 try: 214 self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) 215 except paramiko.SSHException, e: 216 if self.session_instance: 217 self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port) 218 else: 219 self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
220
221 - def run(self):
222 """\ 223 This method gets run once an L{X2GoRevFwTunnel} has been started with its 224 L{start()} method. Use L{X2GoRevFwTunnel}.stop_thread() to stop the 225 reverse forwarding tunnel again. You can also temporarily lock the tunnel 226 down with L{X2GoRevFwTunnel.pause()} and L{X2GoRevFwTunnel.resume()}). 227 228 L{X2GoRevFwTunnel.run()} waits for notifications of an appropriate incoming 229 Paramiko/SSH channel (issued by L{X2GoRevFwTunnel.notify()}). Appropriate in 230 this context means, that its start point on the X2Go server matches the class's 231 property C{server_port}. 232 233 Once a new incoming channel gets announced by the L{notify()} method, a new 234 L{X2GoRevFwChannelThread} instance will be initialized. As a data stream handler, 235 the function L{x2go_rev_forward_channel_handler()} will be used. 236 237 The channel will last till the connection gets dropped on the X2Go server side or 238 until the tunnel gets paused by an L{X2GoRevFwTunnel.pause()} call or stopped via the 239 L{X2GoRevFwTunnel.stop_thread()} method. 240 241 """ 242 self._request_port_forwarding() 243 self._keepalive = True 244 while self._keepalive: 245 246 self.incoming_channel.acquire() 247 248 self.logger('waiting for incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 249 self.incoming_channel.wait() 250 251 if self._keepalive: 252 self.logger('detected incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 253 _chan = self.ssh_transport.accept() 254 self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG) 255 else: 256 self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 257 258 self.incoming_channel.release() 259 if self._accept_channels and self._keepalive: 260 _new_chan_thread = X2GoRevFwChannelThread(_chan, (self.remote_host, self.remote_port), 261 target=x2go_rev_forward_channel_handler, 262 kwargs={ 263 'chan': _chan, 264 'addr': self.remote_host, 265 'port': self.remote_port, 266 'parent_thread': self, 267 'logger': self.logger, 268 } 269 ) 270 _new_chan_thread.start() 271 self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
272 273
274 -def x2go_rev_forward_channel_handler(chan=None, addr='', port=0, parent_thread=None, logger=None, ):
275 """\ 276 Handle the data stream of a requested channel that got set up by a L{X2GoRevFwTunnel} (Paramiko/SSH 277 reverse forwarding tunnel). 278 279 The channel (and the corresponding connections) close either ... 280 281 - ... if the connecting application closes the connection and thus, drops 282 the channel, or 283 - ... if the L{X2GoRevFwTunnel} parent thread gets paused. The call 284 of L{X2GoRevFwTunnel.pause()} on the instance can be used to shut down all incoming 285 tunneled SSH connections associated to this L{X2GoRevFwTunnel} instance 286 from within a Python X2Go application. 287 288 @param chan: channel 289 @type chan: C{class} 290 @param addr: bind address 291 @type addr: C{str} 292 @param port: bind port 293 @type port: C{int} 294 @param parent_thread: the calling L{X2GoRevFwTunnel} instance 295 @type parent_thread: L{X2GoRevFwTunnel} instance 296 @param logger: you can pass an L{X2GoLogger} object to the 297 L{X2GoRevFwTunnel} constructor 298 @type logger: L{X2GoLogger} instance 299 300 """ 301 fw_socket = socket.socket() 302 fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 303 if logger is None: 304 def _dummy_logger(msg, l): 305 pass
306 logger = _dummy_logger 307 308 try: 309 fw_socket.connect((addr, port)) 310 except Exception, e: 311 logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO) 312 return 313 314 logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr, 315 chan.getpeername(), (addr, port)), 316 loglevel=log.loglevel_INFO) 317 while parent_thread._accept_channels: 318 r, w, x = select.select([fw_socket, chan], [], []) 319 if fw_socket in r: 320 data = fw_socket.recv(1024) 321 if len(data) == 0: 322 break 323 chan.send(data) 324 if chan in r: 325 data = chan.recv(1024) 326 if len(data) == 0: 327 break 328 fw_socket.send(data) 329 330 chan.close() 331 fw_socket.close() 332 logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO) 333 334
335 -class X2GoRevFwChannelThread(threading.Thread):
336 """\ 337 Starts a thread for each incoming Paramiko/SSH data channel trough the reverse 338 forwarding tunnel. 339 340 """
341 - def __init__(self, channel, remote=None, **kwargs):
342 """\ 343 Initializes a reverse forwarding channel thread. 344 345 @param channel: incoming Paramiko/SSH channel from the L{X2GoSession}'s transport 346 accept queue 347 @type channel: class 348 @param remote: tuple (addr, port) that specifies the data endpoint of the channel 349 @type remote: C{tuple(str, int)} 350 351 """ 352 self.channel = channel 353 if remote is not None: 354 self.remote_host = remote[0] 355 self.remote_port = remote[1] 356 threading.Thread.__init__(self, **kwargs) 357 self.daemon = True
358