1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """
20 SFTP file object
21 """
22
23 from __future__ import with_statement
24
25 from binascii import hexlify
26 from collections import deque
27 import socket
28 import threading
29 import time
30 from paramiko.common import DEBUG
31
32 from paramiko.file import BufferedFile
33 from paramiko.py3compat import long
34 from paramiko.sftp import CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, \
35 CMD_STATUS, CMD_FSTAT, CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED
36 from paramiko.sftp_attr import SFTPAttributes
37
38
40 """
41 Proxy object for a file on the remote server, in client mode SFTP.
42
43 Instances of this class may be used as context managers in the same way
44 that built-in Python file objects are.
45 """
46
47
48
49 MAX_REQUEST_SIZE = 32768
50
51 - def __init__(self, sftp, handle, mode='r', bufsize=-1):
52 BufferedFile.__init__(self)
53 self.sftp = sftp
54 self.handle = handle
55 BufferedFile._set_mode(self, mode, bufsize)
56 self.pipelined = False
57 self._prefetching = False
58 self._prefetch_done = False
59 self._prefetch_data = {}
60 self._prefetch_extents = {}
61 self._prefetch_lock = threading.Lock()
62 self._saved_exception = None
63 self._reqs = deque()
64
66 self._close(async=True)
67
69 """
70 Close the file.
71 """
72 self._close(async=False)
73
74 - def _close(self, async=False):
75
76
77
78
79
80
81
82 if self._closed:
83 return
84 self.sftp._log(DEBUG, 'close(%s)' % hexlify(self.handle))
85 if self.pipelined:
86 self.sftp._finish_responses(self)
87 BufferedFile.close(self)
88 try:
89 if async:
90
91 self.sftp._async_request(type(None), CMD_CLOSE, self.handle)
92 else:
93 self.sftp._request(CMD_CLOSE, self.handle)
94 except EOFError:
95
96 pass
97 except (IOError, socket.error):
98
99 pass
100
102 k = [x for x in list(self._prefetch_extents.values()) if x[0] <= offset]
103 if len(k) == 0:
104 return False
105 k.sort(key=lambda x: x[0])
106 buf_offset, buf_size = k[-1]
107 if buf_offset + buf_size <= offset:
108
109 return False
110 if buf_offset + buf_size >= offset + size:
111
112 return True
113
114 return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size)
115
117 """
118 if a block of data is present in the prefetch buffers, at the given
119 offset, return the offset of the relevant prefetch buffer. otherwise,
120 return None. this guarantees nothing about the number of bytes
121 collected in the prefetch buffer so far.
122 """
123 k = [i for i in self._prefetch_data.keys() if i <= offset]
124 if len(k) == 0:
125 return None
126 index = max(k)
127 buf_offset = offset - index
128 if buf_offset >= len(self._prefetch_data[index]):
129
130 return None
131 return index
132
134 """
135 read data out of the prefetch buffer, if possible. if the data isn't
136 in the buffer, return None. otherwise, behaves like a normal read.
137 """
138
139 while True:
140 offset = self._data_in_prefetch_buffers(self._realpos)
141 if offset is not None:
142 break
143 if self._prefetch_done or self._closed:
144 break
145 self.sftp._read_response()
146 self._check_exception()
147 if offset is None:
148 self._prefetching = False
149 return None
150 prefetch = self._prefetch_data[offset]
151 del self._prefetch_data[offset]
152
153 buf_offset = self._realpos - offset
154 if buf_offset > 0:
155 self._prefetch_data[offset] = prefetch[:buf_offset]
156 prefetch = prefetch[buf_offset:]
157 if size < len(prefetch):
158 self._prefetch_data[self._realpos + size] = prefetch[size:]
159 prefetch = prefetch[:size]
160 return prefetch
161
163 size = min(size, self.MAX_REQUEST_SIZE)
164 if self._prefetching:
165 data = self._read_prefetch(size)
166 if data is not None:
167 return data
168 t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size))
169 if t != CMD_DATA:
170 raise SFTPError('Expected data')
171 return msg.get_string()
172
174
175 chunk = min(len(data), self.MAX_REQUEST_SIZE)
176 self._reqs.append(self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), data[:chunk]))
177 if not self.pipelined or (len(self._reqs) > 100 and self.sftp.sock.recv_ready()):
178 while len(self._reqs):
179 req = self._reqs.popleft()
180 t, msg = self.sftp._read_response(req)
181 if t != CMD_STATUS:
182 raise SFTPError('Expected status')
183
184 return chunk
185
187 """
188 Set a timeout on read/write operations on the underlying socket or
189 ssh `.Channel`.
190
191 :param float timeout:
192 seconds to wait for a pending read/write operation before raising
193 ``socket.timeout``, or ``None`` for no timeout
194
195 .. seealso:: `.Channel.settimeout`
196 """
197 self.sftp.sock.settimeout(timeout)
198
200 """
201 Returns the timeout in seconds (as a `float`) associated with the
202 socket or ssh `.Channel` used for this file.
203
204 .. seealso:: `.Channel.gettimeout`
205 """
206 return self.sftp.sock.gettimeout()
207
209 """
210 Set blocking or non-blocking mode on the underiying socket or ssh
211 `.Channel`.
212
213 :param int blocking:
214 0 to set non-blocking mode; non-0 to set blocking mode.
215
216 .. seealso:: `.Channel.setblocking`
217 """
218 self.sftp.sock.setblocking(blocking)
219
220 - def seek(self, offset, whence=0):
221 self.flush()
222 if whence == self.SEEK_SET:
223 self._realpos = self._pos = offset
224 elif whence == self.SEEK_CUR:
225 self._pos += offset
226 self._realpos = self._pos
227 else:
228 self._realpos = self._pos = self._get_size() + offset
229 self._rbuffer = bytes()
230
232 """
233 Retrieve information about this file from the remote system. This is
234 exactly like `.SFTPClient.stat`, except that it operates on an
235 already-open file.
236
237 :return: an `.SFTPAttributes` object containing attributes about this file.
238 """
239 t, msg = self.sftp._request(CMD_FSTAT, self.handle)
240 if t != CMD_ATTRS:
241 raise SFTPError('Expected attributes')
242 return SFTPAttributes._from_msg(msg)
243
245 """
246 Change the mode (permissions) of this file. The permissions are
247 unix-style and identical to those used by Python's `os.chmod`
248 function.
249
250 :param int mode: new permissions
251 """
252 self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode))
253 attr = SFTPAttributes()
254 attr.st_mode = mode
255 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
256
257 - def chown(self, uid, gid):
258 """
259 Change the owner (``uid``) and group (``gid``) of this file. As with
260 Python's `os.chown` function, you must pass both arguments, so if you
261 only want to change one, use `stat` first to retrieve the current
262 owner and group.
263
264 :param int uid: new owner's uid
265 :param int gid: new group id
266 """
267 self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid))
268 attr = SFTPAttributes()
269 attr.st_uid, attr.st_gid = uid, gid
270 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
271
273 """
274 Set the access and modified times of this file. If
275 ``times`` is ``None``, then the file's access and modified times are set
276 to the current time. Otherwise, ``times`` must be a 2-tuple of numbers,
277 of the form ``(atime, mtime)``, which is used to set the access and
278 modified times, respectively. This bizarre API is mimicked from Python
279 for the sake of consistency -- I apologize.
280
281 :param tuple times:
282 ``None`` or a tuple of (access time, modified time) in standard
283 internet epoch time (seconds since 01 January 1970 GMT)
284 """
285 if times is None:
286 times = (time.time(), time.time())
287 self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times))
288 attr = SFTPAttributes()
289 attr.st_atime, attr.st_mtime = times
290 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
291
293 """
294 Change the size of this file. This usually extends
295 or shrinks the size of the file, just like the ``truncate()`` method on
296 Python file objects.
297
298 :param size: the new size of the file
299 :type size: int or long
300 """
301 self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size))
302 attr = SFTPAttributes()
303 attr.st_size = size
304 self.sftp._request(CMD_FSETSTAT, self.handle, attr)
305
306 - def check(self, hash_algorithm, offset=0, length=0, block_size=0):
307 """
308 Ask the server for a hash of a section of this file. This can be used
309 to verify a successful upload or download, or for various rsync-like
310 operations.
311
312 The file is hashed from ``offset``, for ``length`` bytes. If ``length``
313 is 0, the remainder of the file is hashed. Thus, if both ``offset``
314 and ``length`` are zero, the entire file is hashed.
315
316 Normally, ``block_size`` will be 0 (the default), and this method will
317 return a byte string representing the requested hash (for example, a
318 string of length 16 for MD5, or 20 for SHA-1). If a non-zero
319 ``block_size`` is given, each chunk of the file (from ``offset`` to
320 ``offset + length``) of ``block_size`` bytes is computed as a separate
321 hash. The hash results are all concatenated and returned as a single
322 string.
323
324 For example, ``check('sha1', 0, 1024, 512)`` will return a string of
325 length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes
326 of the file, and the last 20 bytes will be the SHA-1 of the next 512
327 bytes.
328
329 :param str hash_algorithm:
330 the name of the hash algorithm to use (normally ``"sha1"`` or
331 ``"md5"``)
332 :param offset:
333 offset into the file to begin hashing (0 means to start from the
334 beginning)
335 :type offset: int or long
336 :param length:
337 number of bytes to hash (0 means continue to the end of the file)
338 :type length: int or long
339 :param int block_size:
340 number of bytes to hash per result (must not be less than 256; 0
341 means to compute only one hash of the entire segment)
342 :type block_size: int
343 :return:
344 `str` of bytes representing the hash of each block, concatenated
345 together
346
347 :raises IOError: if the server doesn't support the "check-file"
348 extension, or possibly doesn't support the hash algorithm
349 requested
350
351 .. note:: Many (most?) servers don't support this extension yet.
352
353 .. versionadded:: 1.4
354 """
355 t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle,
356 hash_algorithm, long(offset), long(length), block_size)
357 ext = msg.get_text()
358 alg = msg.get_text()
359 data = msg.get_remainder()
360 return data
361
363 """
364 Turn on/off the pipelining of write operations to this file. When
365 pipelining is on, paramiko won't wait for the server response after
366 each write operation. Instead, they're collected as they come in. At
367 the first non-write operation (including `.close`), all remaining
368 server responses are collected. This means that if there was an error
369 with one of your later writes, an exception might be thrown from within
370 `.close` instead of `.write`.
371
372 By default, files are not pipelined.
373
374 :param bool pipelined:
375 ``True`` if pipelining should be turned on for this file; ``False``
376 otherwise
377
378 .. versionadded:: 1.5
379 """
380 self.pipelined = pipelined
381
383 """
384 Pre-fetch the remaining contents of this file in anticipation of future
385 `.read` calls. If reading the entire file, pre-fetching can
386 dramatically improve the download speed by avoiding roundtrip latency.
387 The file's contents are incrementally buffered in a background thread.
388
389 The prefetched data is stored in a buffer until read via the `.read`
390 method. Once data has been read, it's removed from the buffer. The
391 data may be read in a random order (using `.seek`); chunks of the
392 buffer that haven't been read will continue to be buffered.
393
394 .. versionadded:: 1.5.1
395 """
396 size = self.stat().st_size
397
398 chunks = []
399 n = self._realpos
400 while n < size:
401 chunk = min(self.MAX_REQUEST_SIZE, size - n)
402 chunks.append((n, chunk))
403 n += chunk
404 if len(chunks) > 0:
405 self._start_prefetch(chunks)
406
407 - def readv(self, chunks):
408 """
409 Read a set of blocks from the file by (offset, length). This is more
410 efficient than doing a series of `.seek` and `.read` calls, since the
411 prefetch machinery is used to retrieve all the requested blocks at
412 once.
413
414 :param chunks:
415 a list of (offset, length) tuples indicating which sections of the
416 file to read
417 :type chunks: list(tuple(long, int))
418 :return: a list of blocks read, in the same order as in ``chunks``
419
420 .. versionadded:: 1.5.4
421 """
422 self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks))
423
424 read_chunks = []
425 for offset, size in chunks:
426
427 if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size):
428 continue
429
430
431 while size > 0:
432 chunk_size = min(size, self.MAX_REQUEST_SIZE)
433 read_chunks.append((offset, chunk_size))
434 offset += chunk_size
435 size -= chunk_size
436
437 self._start_prefetch(read_chunks)
438
439 for x in chunks:
440 self.seek(x[0])
441 yield self.read(x[1])
442
443
444
446 try:
447 return self.stat().st_size
448 except:
449 return 0
450
452 self._prefetching = True
453 self._prefetch_done = False
454
455 t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
456 t.setDaemon(True)
457 t.start()
458
460
461
462 for offset, length in chunks:
463 with self._prefetch_lock:
464 num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
465 self._prefetch_extents[num] = (offset, length)
466
468 if t == CMD_STATUS:
469
470 try:
471 self.sftp._convert_status(msg)
472 except Exception as e:
473 self._saved_exception = e
474 return
475 if t != CMD_DATA:
476 raise SFTPError('Expected data')
477 data = msg.get_string()
478 with self._prefetch_lock:
479 offset, length = self._prefetch_extents[num]
480 self._prefetch_data[offset] = data
481 del self._prefetch_extents[num]
482 if len(self._prefetch_extents) == 0:
483 self._prefetch_done = True
484
486 """if there's a saved exception, raise & clear it"""
487 if self._saved_exception is not None:
488 x = self._saved_exception
489 self._saved_exception = None
490 raise x
491
494
495 - def __exit__(self, type, value, traceback):
497