Package common :: Module shellutils
[frames] | no frames]

Source Code for Module common.shellutils

  1  # copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """shell/term utilities, useful to write some python scripts instead of shell 
 19  scripts. 
 20  """ 
 21  __docformat__ = "restructuredtext en" 
 22   
 23  import os 
 24  import glob 
 25  import shutil 
 26  import stat 
 27  import sys 
 28  import tempfile 
 29  import time 
 30  import fnmatch 
 31  import errno 
 32  import string 
 33  import random 
 34  import subprocess 
 35  from os.path import exists, isdir, islink, basename, join 
 36   
 37  from logilab.common import STD_BLACKLIST, _handle_blacklist 
 38  from logilab.common.compat import raw_input 
 39  from logilab.common.compat import str_to_bytes 
 40  from logilab.common.deprecation import deprecated 
 41   
 42  try: 
 43      from logilab.common.proc import ProcInfo, NoSuchProcess 
 44  except ImportError: 
45 # windows platform 46 - class NoSuchProcess(Exception): pass
47
48 - def ProcInfo(pid):
49 raise NoSuchProcess()
50
51 52 -class tempdir(object):
53
54 - def __enter__(self):
55 self.path = tempfile.mkdtemp() 56 return self.path
57
58 - def __exit__(self, exctype, value, traceback):
59 # rmtree in all cases 60 shutil.rmtree(self.path) 61 return traceback is None
62
63 64 -class pushd(object):
65 - def __init__(self, directory):
66 self.directory = directory
67
68 - def __enter__(self):
69 self.cwd = os.getcwd() 70 os.chdir(self.directory) 71 return self.directory
72
73 - def __exit__(self, exctype, value, traceback):
74 os.chdir(self.cwd)
75
76 77 -def chown(path, login=None, group=None):
78 """Same as `os.chown` function but accepting user login or group name as 79 argument. If login or group is omitted, it's left unchanged. 80 81 Note: you must own the file to chown it (or be root). Otherwise OSError is raised. 82 """ 83 if login is None: 84 uid = -1 85 else: 86 try: 87 uid = int(login) 88 except ValueError: 89 import pwd # Platforms: Unix 90 uid = pwd.getpwnam(login).pw_uid 91 if group is None: 92 gid = -1 93 else: 94 try: 95 gid = int(group) 96 except ValueError: 97 import grp 98 gid = grp.getgrnam(group).gr_gid 99 os.chown(path, uid, gid)
100
101 -def mv(source, destination, _action=shutil.move):
102 """A shell-like mv, supporting wildcards. 103 """ 104 sources = glob.glob(source) 105 if len(sources) > 1: 106 assert isdir(destination) 107 for filename in sources: 108 _action(filename, join(destination, basename(filename))) 109 else: 110 try: 111 source = sources[0] 112 except IndexError: 113 raise OSError('No file matching %s' % source) 114 if isdir(destination) and exists(destination): 115 destination = join(destination, basename(source)) 116 try: 117 _action(source, destination) 118 except OSError as ex: 119 raise OSError('Unable to move %r to %r (%s)' % ( 120 source, destination, ex))
121
122 -def rm(*files):
123 """A shell-like rm, supporting wildcards. 124 """ 125 for wfile in files: 126 for filename in glob.glob(wfile): 127 if islink(filename): 128 os.remove(filename) 129 elif isdir(filename): 130 shutil.rmtree(filename) 131 else: 132 os.remove(filename)
133
134 -def cp(source, destination):
135 """A shell-like cp, supporting wildcards. 136 """ 137 mv(source, destination, _action=shutil.copy)
138
139 -def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
140 """Recursively find files ending with the given extensions from the directory. 141 142 :type directory: str 143 :param directory: 144 directory where the search should start 145 146 :type exts: basestring or list or tuple 147 :param exts: 148 extensions or lists or extensions to search 149 150 :type exclude: boolean 151 :param exts: 152 if this argument is True, returning files NOT ending with the given 153 extensions 154 155 :type blacklist: list or tuple 156 :param blacklist: 157 optional list of files or directory to ignore, default to the value of 158 `logilab.common.STD_BLACKLIST` 159 160 :rtype: list 161 :return: 162 the list of all matching files 163 """ 164 if isinstance(exts, str): 165 exts = (exts,) 166 if exclude: 167 def match(filename, exts): 168 for ext in exts: 169 if filename.endswith(ext): 170 return False 171 return True
172 else: 173 def match(filename, exts): 174 for ext in exts: 175 if filename.endswith(ext): 176 return True 177 return False 178 files = [] 179 for dirpath, dirnames, filenames in os.walk(directory): 180 _handle_blacklist(blacklist, dirnames, filenames) 181 # don't append files if the directory is blacklisted 182 dirname = basename(dirpath) 183 if dirname in blacklist: 184 continue 185 files.extend([join(dirpath, f) for f in filenames if match(f, exts)]) 186 return files 187
188 189 -def globfind(directory, pattern, blacklist=STD_BLACKLIST):
190 """Recursively finds files matching glob `pattern` under `directory`. 191 192 This is an alternative to `logilab.common.shellutils.find`. 193 194 :type directory: str 195 :param directory: 196 directory where the search should start 197 198 :type pattern: basestring 199 :param pattern: 200 the glob pattern (e.g *.py, foo*.py, etc.) 201 202 :type blacklist: list or tuple 203 :param blacklist: 204 optional list of files or directory to ignore, default to the value of 205 `logilab.common.STD_BLACKLIST` 206 207 :rtype: iterator 208 :return: 209 iterator over the list of all matching files 210 """ 211 for curdir, dirnames, filenames in os.walk(directory): 212 _handle_blacklist(blacklist, dirnames, filenames) 213 for fname in fnmatch.filter(filenames, pattern): 214 yield join(curdir, fname)
215
216 -def unzip(archive, destdir):
217 import zipfile 218 if not exists(destdir): 219 os.mkdir(destdir) 220 zfobj = zipfile.ZipFile(archive) 221 for name in zfobj.namelist(): 222 if name.endswith('/'): 223 os.mkdir(join(destdir, name)) 224 else: 225 outfile = open(join(destdir, name), 'wb') 226 outfile.write(zfobj.read(name)) 227 outfile.close()
228
229 @deprecated('Use subprocess.Popen instead') 230 -class Execute:
231 """This is a deadlock safe version of popen2 (no stdin), that returns 232 an object with errorlevel, out and err. 233 """ 234
235 - def __init__(self, command):
236 cmd = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 237 self.out, self.err = cmd.communicate() 238 self.status = os.WEXITSTATUS(cmd.returncode)
239
240 241 -def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
242 """Acquire a lock represented by a file on the file system 243 244 If the process written in lock file doesn't exist anymore, we remove the 245 lock file immediately 246 If age of the lock_file is greater than max_delay, then we raise a UserWarning 247 """ 248 count = abs(max_try) 249 while count: 250 try: 251 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT) 252 os.write(fd, str_to_bytes(str(os.getpid())) ) 253 os.close(fd) 254 return True 255 except OSError as e: 256 if e.errno == errno.EEXIST: 257 try: 258 fd = open(lock_file, "r") 259 pid = int(fd.readline()) 260 pi = ProcInfo(pid) 261 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME]) 262 if age / max_delay > 1 : 263 raise UserWarning("Command '%s' (pid %s) has locked the " 264 "file '%s' for %s minutes" 265 % (pi.name(), pid, lock_file, age/60)) 266 except UserWarning: 267 raise 268 except NoSuchProcess: 269 os.remove(lock_file) 270 except Exception: 271 # The try block is not essential. can be skipped. 272 # Note: ProcInfo object is only available for linux 273 # process information are not accessible... 274 # or lock_file is no more present... 275 pass 276 else: 277 raise 278 count -= 1 279 time.sleep(delay) 280 else: 281 raise Exception('Unable to acquire %s' % lock_file)
282
283 -def release_lock(lock_file):
284 """Release a lock represented by a file on the file system.""" 285 os.remove(lock_file)
286
287 288 -class ProgressBar(object):
289 """A simple text progression bar.""" 290
291 - def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
292 if title: 293 self._fstr = '\r%s [%%-%ss]' % (title, int(size)) 294 else: 295 self._fstr = '\r[%%-%ss]' % int(size) 296 self._stream = stream 297 self._total = nbops 298 self._size = size 299 self._current = 0 300 self._progress = 0 301 self._current_text = None 302 self._last_text_write_size = 0
303
304 - def _get_text(self):
305 return self._current_text
306
307 - def _set_text(self, text=None):
308 if text != self._current_text: 309 self._current_text = text 310 self.refresh()
311
312 - def _del_text(self):
313 self.text = None
314 315 text = property(_get_text, _set_text, _del_text) 316
317 - def update(self, offset=1, exact=False):
318 """Move FORWARD to new cursor position (cursor will never go backward). 319 320 :offset: fraction of ``size`` 321 322 :exact: 323 324 - False: offset relative to current cursor position if True 325 - True: offset as an asbsolute position 326 327 """ 328 if exact: 329 self._current = offset 330 else: 331 self._current += offset 332 333 progress = int((float(self._current)/float(self._total))*self._size) 334 if progress > self._progress: 335 self._progress = progress 336 self.refresh()
337
338 - def refresh(self):
339 """Refresh the progression bar display.""" 340 self._stream.write(self._fstr % ('=' * min(self._progress, self._size)) ) 341 if self._last_text_write_size or self._current_text: 342 template = ' %%-%is' % (self._last_text_write_size) 343 text = self._current_text 344 if text is None: 345 text = '' 346 self._stream.write(template % text) 347 self._last_text_write_size = len(text.rstrip()) 348 self._stream.flush()
349
350 - def finish(self):
351 self._stream.write('\n') 352 self._stream.flush()
353
354 355 -class DummyProgressBar(object):
356 __slot__ = ('text',) 357
358 - def refresh(self):
359 pass
360 - def update(self):
361 pass
362 - def finish(self):
363 pass
364 365 366 _MARKER = object()
367 -class progress(object):
368
369 - def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True):
370 self.nbops = nbops 371 self.size = size 372 self.stream = stream 373 self.title = title 374 self.enabled = enabled
375
376 - def __enter__(self):
377 if self.enabled: 378 kwargs = {} 379 for attr in ('nbops', 'size', 'stream', 'title'): 380 value = getattr(self, attr) 381 if value is not _MARKER: 382 kwargs[attr] = value 383 self.pb = ProgressBar(**kwargs) 384 else: 385 self.pb = DummyProgressBar() 386 return self.pb
387
388 - def __exit__(self, exc_type, exc_val, exc_tb):
389 self.pb.finish()
390
391 -class RawInput(object):
392
393 - def __init__(self, input=None, printer=None):
394 self._input = input or raw_input 395 self._print = printer
396
397 - def ask(self, question, options, default):
398 assert default in options 399 choices = [] 400 for option in options: 401 if option == default: 402 label = option[0].upper() 403 else: 404 label = option[0].lower() 405 if len(option) > 1: 406 label += '(%s)' % option[1:].lower() 407 choices.append((option, label)) 408 prompt = "%s [%s]: " % (question, 409 '/'.join([opt[1] for opt in choices])) 410 tries = 3 411 while tries > 0: 412 answer = self._input(prompt).strip().lower() 413 if not answer: 414 return default 415 possible = [option for option, label in choices 416 if option.lower().startswith(answer)] 417 if len(possible) == 1: 418 return possible[0] 419 elif len(possible) == 0: 420 msg = '%s is not an option.' % answer 421 else: 422 msg = ('%s is an ambiguous answer, do you mean %s ?' % ( 423 answer, ' or '.join(possible))) 424 if self._print: 425 self._print(msg) 426 else: 427 print(msg) 428 tries -= 1 429 raise Exception('unable to get a sensible answer')
430
431 - def confirm(self, question, default_is_yes=True):
432 default = default_is_yes and 'y' or 'n' 433 answer = self.ask(question, ('y', 'n'), default) 434 return answer == 'y'
435 436 ASK = RawInput()
437 438 439 -def getlogin():
440 """avoid using os.getlogin() because of strange tty / stdin problems 441 (man 3 getlogin) 442 Another solution would be to use $LOGNAME, $USER or $USERNAME 443 """ 444 if sys.platform != 'win32': 445 import pwd # Platforms: Unix 446 return pwd.getpwuid(os.getuid())[0] 447 else: 448 return os.environ['USERNAME']
449
450 -def generate_password(length=8, vocab=string.ascii_letters + string.digits):
451 """dumb password generation function""" 452 pwd = '' 453 for i in range(length): 454 pwd += random.choice(vocab) 455 return pwd
456