1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """\
21 Python Gevent based port forwarding server (openssh -L option) for the
22 proxying of graphical X2Go elements.
23
24 """
25 __NAME__ = "x2gofwtunnel-pylib"
26
27
28 import copy
29
30
31 import gevent
32 from gevent import select, socket
33 from gevent.server import StreamServer
34
35
36 import log
37 from defaults import X2GOCLIENT_OS as _X2GOCLIENT_OS
38
40 """\
41 L{X2GoFwServer} implements a gevent's StreamServer based Paramiko/SSH port
42 forwarding server.
43
44 An L{X2GoFwServer} class object is used to tunnel graphical trafic
45 through an external proxy command launched by a C{X2GoProxy*} backend.
46
47 """
48 - def __init__ (self, listener, remote_host, remote_port, ssh_transport, session_instance=None, session_name=None, logger=None, loglevel=log.loglevel_DEFAULT,):
49 """\
50 @param listener: listen on TCP/IP socket C{(<IP>, <Port>)}
51 @type listener: C{tuple}
52 @param remote_host: hostname or IP of remote host (in case of X2Go mostly 127.0.0.1)
53 @type remote_host: C{str}
54 @param remote_port: port of remote host
55 @type remote_port: C{int}
56 @param ssh_transport: a valid Paramiko/SSH transport object
57 @type ssh_transport: C{obj}
58 @param session_instance: the complete L{X2GoSession} instance of the X2Go session this port forwarding server belongs to.
59 Note: for new L{X2GoSession} instances the object has the session name not yet set(!!!)
60 @type session_instance: C{obj}
61 @param session_name: the session name of the X2Go session this port forwarding server belongs to
62 @type session_name: C{str}
63 @param logger: you can pass an L{X2GoLogger} object to the
64 L{X2GoFwServer} constructor
65 @type logger: C{obj}
66 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be
67 constructed with the given loglevel
68 @type loglevel: C{int}
69
70 """
71 if logger is None:
72 self.logger = log.X2GoLogger(loglevel=loglevel)
73 else:
74 self.logger = copy.deepcopy(logger)
75 self.logger.tag = __NAME__
76
77 self.chan = None
78 self.is_active = False
79 self.failed = False
80 self.keepalive = False
81 self.listener = listener
82 self.chain_host = remote_host
83 self.chain_port = remote_port
84 self.ssh_transport = ssh_transport
85 self.session_name = session_name
86 self.session_instance = session_instance
87
88 self.fw_socket = None
89
90 StreamServer.__init__(self, self.listener, self.x2go_forward_tunnel_handle)
91
93 """\
94 Handle for SSH/Paramiko forwarding tunnel.
95
96 @param fw_socket: local end of the forwarding tunnel
97 @type fw_socket: C{obj}
98 @param address: unused/ignored
99 @type address: C{tuple}
100
101 """
102 self.fw_socket = fw_socket
103
104
105 self.fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
106
107 _success = False
108 _count = 0
109 _maxwait = 20
110
111 while not _success and _count < _maxwait:
112
113
114 if self.session_instance:
115 if not self.session_instance.is_connected():
116 break
117
118 _count += 1
119 try:
120 self.chan = self.ssh_transport.open_channel('direct-tcpip',
121 (self.chain_host, self.chain_port),
122 self.fw_socket.getpeername())
123 chan_peername = self.chan.getpeername()
124 _success = True
125 except Exception, e:
126 self.logger('incoming request to %s:%d failed on attempt %d of %d: %s' % (self.chain_host,
127 self.chain_port,
128 _count,
129 _maxwait,
130 repr(e)),
131 loglevel=log.loglevel_WARN)
132 gevent.sleep(.4)
133
134 if not _success:
135 self.logger('incoming request to %s:%d failed after %d attempts' % (self.chain_host,
136 self.chain_port,
137 _count),
138 loglevel=log.loglevel_ERROR)
139 if self.session_instance:
140 self.session_instance.set_session_name(self.session_name)
141 self.session_instance.HOOK_forwarding_tunnel_setup_failed(chain_host=self.chain_host, chain_port=self.chain_port)
142 self.failed = True
143
144 else:
145
146 self.logger('connected! Tunnel open %r -> %r (on master connection %r -> %r)' % (
147 self.listener, (self.chain_host, self.chain_port),
148 self.fw_socket.getpeername(), chan_peername),
149 loglevel=log.loglevel_INFO)
150
151
152 self.is_active = True
153
154 self.keepalive = True
155 try:
156 while self.keepalive:
157 r, w, x = select.select([self.fw_socket, self.chan], [], [])
158 if fw_socket in r:
159 data = fw_socket.recv(1024)
160 if len(data) == 0:
161 break
162 self.chan.send(data)
163 if self.chan in r:
164 data = self.chan.recv(1024)
165 if len(data) == 0:
166 break
167 fw_socket.send(data)
168 self.close_channel()
169 self.close_socket()
170 except socket.error:
171 pass
172
173 self.is_active = False
174 self.logger('Tunnel closed from %r' % (chan_peername,),
175 loglevel=log.loglevel_INFO)
176
178 """\
179 Close an open channel again.
180
181 """
182
183 if self.chan is not None:
184 try:
185 if _X2GOCLIENT_OS != 'Windows':
186 self.chan.close()
187 self.chan = None
188 except EOFError:
189 pass
190
192 """\
193 Close the forwarding tunnel's socket again.
194
195 """
196 _success = False
197 _count = 0
198 _maxwait = 20
199
200
201 while not _success and _count < _maxwait:
202 _count += 1
203 try:
204 self.close_channel()
205 if self.fw_socket is not None:
206 self.fw_socket.close()
207 _success = True
208 except socket.error:
209 gevent.sleep(.2)
210 self.logger('could not close fw_tunnel socket, try again (%s of %s)' % (_count, _maxwait), loglevel=log.loglevel_WARN)
211
212 if _count >= _maxwait:
213 self.logger('forwarding tunnel to [%s]:%d could not be closed properly' % (self.chain_host, self.chain_port), loglevel=log.loglevel_WARN)
214
216 """\
217 Stop the forwarding tunnel.
218
219 """
220 self.close_socket()
221 StreamServer.stop(self)
222
223
224 -def start_forward_tunnel(local_host='127.0.0.1', local_port=22022,
225 remote_host='127.0.0.1', remote_port=22,
226 ssh_transport=None,
227 session_instance=None,
228 session_name=None,
229 logger=None, ):
230 """\
231 Setup up a Paramiko/SSH port forwarding tunnel (like openssh -L option).
232
233 The tunnel is used to transport X2Go graphics data through a proxy application like nxproxy.
234
235 @param local_host: local starting point of the forwarding tunnel
236 @type local_host: C{int}
237 @param local_port: listen port of the local starting point
238 @type local_port: C{int}
239 @param remote_host: from the endpoint of the tunnel, connect to host C{<remote_host>}...
240 @type remote_host: C{str}
241 @param remote_port: ... on port C{<remote_port>}
242 @type remote_port: C{int}
243 @param ssh_transport: the Paramiko/SSH transport (i.e. the X2Go session's Paramiko/SSH transport object)
244 @type ssh_transport: C{obj}
245 @param session_instance: the L{X2GoSession} instance that initiates this tunnel
246 @type session_instance: C{obj}
247 @param session_name: the session name of the X2Go session this port forwarding server belongs to
248 @type session_name: C{str}
249 @param logger: an X2GoLogger object
250 @type logger: C{obj}
251
252 @return: returns an L{X2GoFwServer} instance
253 @rtype: C{obj}
254
255 """
256 fw_server = X2GoFwServer(listener=(local_host, local_port),
257 remote_host=remote_host, remote_port=remote_port,
258 ssh_transport=ssh_transport,
259 session_instance=session_instance, session_name=session_name,
260 logger=logger,
261 )
262 try:
263 fw_server.start()
264 except socket.error:
265 fw_server.failed = True
266 fw_server.is_active = False
267
268 return fw_server
269
271 """\
272 Tear down a given Paramiko/SSH port forwarding tunnel.
273
274 @param fw_server: an L{X2GoFwServer} instance as returned by the L{start_forward_tunnel()} function
275 @type fw_server: C{obj}
276
277 """
278 if fw_server is not None:
279 fw_server.keepalive = False
280 gevent.sleep(.5)
281 fw_server.stop()
282
283
284 if __name__ == '__main__':
285 pass
286