1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """shell/term utilities, useful to write some python scripts instead of shell
19 scripts.
20 """
21 __docformat__ = "restructuredtext en"
22
23 import os
24 import glob
25 import shutil
26 import stat
27 import sys
28 import tempfile
29 import time
30 import fnmatch
31 import errno
32 import string
33 import random
34 import subprocess
35 from os.path import exists, isdir, islink, basename, join
36
37 from logilab.common import STD_BLACKLIST, _handle_blacklist
38 from logilab.common.compat import raw_input
39 from logilab.common.compat import str_to_bytes
40 from logilab.common.deprecation import deprecated
41
42 try:
43 from logilab.common.proc import ProcInfo, NoSuchProcess
44 except ImportError:
47
50
53
55 self.path = tempfile.mkdtemp()
56 return self.path
57
58 - def __exit__(self, exctype, value, traceback):
59
60 shutil.rmtree(self.path)
61 return traceback is None
62
66 self.directory = directory
67
69 self.cwd = os.getcwd()
70 os.chdir(self.directory)
71 return self.directory
72
73 - def __exit__(self, exctype, value, traceback):
75
76
77 -def chown(path, login=None, group=None):
78 """Same as `os.chown` function but accepting user login or group name as
79 argument. If login or group is omitted, it's left unchanged.
80
81 Note: you must own the file to chown it (or be root). Otherwise OSError is raised.
82 """
83 if login is None:
84 uid = -1
85 else:
86 try:
87 uid = int(login)
88 except ValueError:
89 import pwd
90 uid = pwd.getpwnam(login).pw_uid
91 if group is None:
92 gid = -1
93 else:
94 try:
95 gid = int(group)
96 except ValueError:
97 import grp
98 gid = grp.getgrnam(group).gr_gid
99 os.chown(path, uid, gid)
100
101 -def mv(source, destination, _action=shutil.move):
102 """A shell-like mv, supporting wildcards.
103 """
104 sources = glob.glob(source)
105 if len(sources) > 1:
106 assert isdir(destination)
107 for filename in sources:
108 _action(filename, join(destination, basename(filename)))
109 else:
110 try:
111 source = sources[0]
112 except IndexError:
113 raise OSError('No file matching %s' % source)
114 if isdir(destination) and exists(destination):
115 destination = join(destination, basename(source))
116 try:
117 _action(source, destination)
118 except OSError as ex:
119 raise OSError('Unable to move %r to %r (%s)' % (
120 source, destination, ex))
121
123 """A shell-like rm, supporting wildcards.
124 """
125 for wfile in files:
126 for filename in glob.glob(wfile):
127 if islink(filename):
128 os.remove(filename)
129 elif isdir(filename):
130 shutil.rmtree(filename)
131 else:
132 os.remove(filename)
133
134 -def cp(source, destination):
135 """A shell-like cp, supporting wildcards.
136 """
137 mv(source, destination, _action=shutil.copy)
138
140 """Recursively find files ending with the given extensions from the directory.
141
142 :type directory: str
143 :param directory:
144 directory where the search should start
145
146 :type exts: basestring or list or tuple
147 :param exts:
148 extensions or lists or extensions to search
149
150 :type exclude: boolean
151 :param exts:
152 if this argument is True, returning files NOT ending with the given
153 extensions
154
155 :type blacklist: list or tuple
156 :param blacklist:
157 optional list of files or directory to ignore, default to the value of
158 `logilab.common.STD_BLACKLIST`
159
160 :rtype: list
161 :return:
162 the list of all matching files
163 """
164 if isinstance(exts, str):
165 exts = (exts,)
166 if exclude:
167 def match(filename, exts):
168 for ext in exts:
169 if filename.endswith(ext):
170 return False
171 return True
172 else:
173 def match(filename, exts):
174 for ext in exts:
175 if filename.endswith(ext):
176 return True
177 return False
178 files = []
179 for dirpath, dirnames, filenames in os.walk(directory):
180 _handle_blacklist(blacklist, dirnames, filenames)
181
182 dirname = basename(dirpath)
183 if dirname in blacklist:
184 continue
185 files.extend([join(dirpath, f) for f in filenames if match(f, exts)])
186 return files
187
190 """Recursively finds files matching glob `pattern` under `directory`.
191
192 This is an alternative to `logilab.common.shellutils.find`.
193
194 :type directory: str
195 :param directory:
196 directory where the search should start
197
198 :type pattern: basestring
199 :param pattern:
200 the glob pattern (e.g *.py, foo*.py, etc.)
201
202 :type blacklist: list or tuple
203 :param blacklist:
204 optional list of files or directory to ignore, default to the value of
205 `logilab.common.STD_BLACKLIST`
206
207 :rtype: iterator
208 :return:
209 iterator over the list of all matching files
210 """
211 for curdir, dirnames, filenames in os.walk(directory):
212 _handle_blacklist(blacklist, dirnames, filenames)
213 for fname in fnmatch.filter(filenames, pattern):
214 yield join(curdir, fname)
215
216 -def unzip(archive, destdir):
217 import zipfile
218 if not exists(destdir):
219 os.mkdir(destdir)
220 zfobj = zipfile.ZipFile(archive)
221 for name in zfobj.namelist():
222 if name.endswith('/'):
223 os.mkdir(join(destdir, name))
224 else:
225 outfile = open(join(destdir, name), 'wb')
226 outfile.write(zfobj.read(name))
227 outfile.close()
228
229 @deprecated('Use subprocess.Popen instead')
230 -class Execute:
231 """This is a deadlock safe version of popen2 (no stdin), that returns
232 an object with errorlevel, out and err.
233 """
234
236 cmd = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
237 self.out, self.err = cmd.communicate()
238 self.status = os.WEXITSTATUS(cmd.returncode)
239
240
241 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
242 """Acquire a lock represented by a file on the file system
243
244 If the process written in lock file doesn't exist anymore, we remove the
245 lock file immediately
246 If age of the lock_file is greater than max_delay, then we raise a UserWarning
247 """
248 count = abs(max_try)
249 while count:
250 try:
251 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT)
252 os.write(fd, str_to_bytes(str(os.getpid())) )
253 os.close(fd)
254 return True
255 except OSError as e:
256 if e.errno == errno.EEXIST:
257 try:
258 fd = open(lock_file, "r")
259 pid = int(fd.readline())
260 pi = ProcInfo(pid)
261 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME])
262 if age / max_delay > 1 :
263 raise UserWarning("Command '%s' (pid %s) has locked the "
264 "file '%s' for %s minutes"
265 % (pi.name(), pid, lock_file, age/60))
266 except UserWarning:
267 raise
268 except NoSuchProcess:
269 os.remove(lock_file)
270 except Exception:
271
272
273
274
275 pass
276 else:
277 raise
278 count -= 1
279 time.sleep(delay)
280 else:
281 raise Exception('Unable to acquire %s' % lock_file)
282
284 """Release a lock represented by a file on the file system."""
285 os.remove(lock_file)
286
289 """A simple text progression bar."""
290
291 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
292 if title:
293 self._fstr = '\r%s [%%-%ss]' % (title, int(size))
294 else:
295 self._fstr = '\r[%%-%ss]' % int(size)
296 self._stream = stream
297 self._total = nbops
298 self._size = size
299 self._current = 0
300 self._progress = 0
301 self._current_text = None
302 self._last_text_write_size = 0
303
304 - def _get_text(self):
305 return self._current_text
306
307 - def _set_text(self, text=None):
308 if text != self._current_text:
309 self._current_text = text
310 self.refresh()
311
312 - def _del_text(self):
314
315 text = property(_get_text, _set_text, _del_text)
316
317 - def update(self, offset=1, exact=False):
318 """Move FORWARD to new cursor position (cursor will never go backward).
319
320 :offset: fraction of ``size``
321
322 :exact:
323
324 - False: offset relative to current cursor position if True
325 - True: offset as an asbsolute position
326
327 """
328 if exact:
329 self._current = offset
330 else:
331 self._current += offset
332
333 progress = int((float(self._current)/float(self._total))*self._size)
334 if progress > self._progress:
335 self._progress = progress
336 self.refresh()
337
339 """Refresh the progression bar display."""
340 self._stream.write(self._fstr % ('=' * min(self._progress, self._size)) )
341 if self._last_text_write_size or self._current_text:
342 template = ' %%-%is' % (self._last_text_write_size)
343 text = self._current_text
344 if text is None:
345 text = ''
346 self._stream.write(template % text)
347 self._last_text_write_size = len(text.rstrip())
348 self._stream.flush()
349
351 self._stream.write('\n')
352 self._stream.flush()
353
364
365
366 _MARKER = object()
368
370 self.nbops = nbops
371 self.size = size
372 self.stream = stream
373 self.title = title
374 self.enabled = enabled
375
377 if self.enabled:
378 kwargs = {}
379 for attr in ('nbops', 'size', 'stream', 'title'):
380 value = getattr(self, attr)
381 if value is not _MARKER:
382 kwargs[attr] = value
383 self.pb = ProgressBar(**kwargs)
384 else:
385 self.pb = DummyProgressBar()
386 return self.pb
387
388 - def __exit__(self, exc_type, exc_val, exc_tb):
390
435
436 ASK = RawInput()
440 """avoid using os.getlogin() because of strange tty / stdin problems
441 (man 3 getlogin)
442 Another solution would be to use $LOGNAME, $USER or $USERNAME
443 """
444 if sys.platform != 'win32':
445 import pwd
446 return pwd.getpwuid(os.getuid())[0]
447 else:
448 return os.environ['USERNAME']
449
451 """dumb password generation function"""
452 pwd = ''
453 for i in range(length):
454 pwd += random.choice(vocab)
455 return pwd
456