# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import contextlib
import os
import random
import subprocess
import tempfile
import zipfile
from textwrap import dedent
from .common import safe_mkdir, safe_rmtree
from .compatibility import nested
from .installer import EggInstaller, Packager
from .pex_builder import PEXBuilder
from .util import DistributionHelper
@contextlib.contextmanager
def temporary_dir():
td = tempfile.mkdtemp()
try:
yield td
finally:
safe_rmtree(td)
def random_bytes(length):
return ''.join(
map(chr, (random.randint(ord('a'), ord('z')) for _ in range(length)))).encode('utf-8')
@contextlib.contextmanager
[docs]def temporary_content(content_map, interp=None, seed=31337):
"""Write content to disk where content is map from string => (int, string).
If target is int, write int random bytes. Otherwise write contents of string."""
random.seed(seed)
interp = interp or {}
with temporary_dir() as td:
for filename, size_or_content in content_map.items():
safe_mkdir(os.path.dirname(os.path.join(td, filename)))
with open(os.path.join(td, filename), 'wb') as fp:
if isinstance(size_or_content, int):
fp.write(random_bytes(size_or_content))
else:
fp.write((size_or_content % interp).encode('utf-8'))
yield td
def yield_files(directory):
for root, _, files in os.walk(directory):
for f in files:
filename = os.path.join(root, f)
rel_filename = os.path.relpath(filename, directory)
yield filename, rel_filename
def write_zipfile(directory, dest, reverse=False):
with contextlib.closing(zipfile.ZipFile(dest, 'w')) as zf:
for filename, rel_filename in sorted(yield_files(directory), reverse=reverse):
zf.write(filename, arcname=rel_filename)
return dest
PROJECT_CONTENT = {
'setup.py': dedent('''
from setuptools import setup
setup(
name=%(project_name)r,
version='0.0.0',
zip_safe=%(zip_safe)r,
packages=['my_package'],
scripts=[
'scripts/hello_world',
'scripts/shell_script',
],
package_data={'my_package': ['package_data/*.dat']},
install_requires=%(install_requires)r,
)
'''),
'scripts/hello_world': '#!/usr/bin/env python\nprint("hello world!")\n',
'scripts/shell_script': '#!/usr/bin/env bash\necho hello world\n',
'my_package/__init__.py': 0,
'my_package/my_module.py': 'def do_something():\n print("hello world!")\n',
'my_package/package_data/resource1.dat': 1000,
'my_package/package_data/resource2.dat': 1000,
}
@contextlib.contextmanager
def make_installer(name='my_project', installer_impl=EggInstaller, zip_safe=True,
install_reqs=None):
interp = {'project_name': name, 'zip_safe': zip_safe, 'install_requires': install_reqs or []}
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
yield installer_impl(td)
@contextlib.contextmanager
def make_source_dir(name='my_project', install_reqs=None):
interp = {'project_name': name, 'zip_safe': True, 'install_requires': install_reqs or []}
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
yield td
def make_sdist(name='my_project', zip_safe=True, install_reqs=None):
with make_installer(name=name, installer_impl=Packager, zip_safe=zip_safe,
install_reqs=install_reqs) as packager:
return packager.sdist()
@contextlib.contextmanager
def make_bdist(name='my_project', installer_impl=EggInstaller, zipped=False, zip_safe=True):
with make_installer(name=name, installer_impl=installer_impl, zip_safe=zip_safe) as installer:
dist_location = installer.bdist()
if zipped:
yield DistributionHelper.distribution_from_path(dist_location)
else:
with temporary_dir() as td:
extract_path = os.path.join(td, os.path.basename(dist_location))
with contextlib.closing(zipfile.ZipFile(dist_location)) as zf:
zf.extractall(extract_path)
yield DistributionHelper.distribution_from_path(extract_path)
COVERAGE_PREAMBLE = """
try:
from coverage import coverage
cov = coverage(auto_data=True, data_suffix=True)
cov.start()
except ImportError:
pass
"""
[docs]def write_simple_pex(td, exe_contents, dists=None, coverage=False):
"""Write a pex file that contains an executable entry point
:param td: temporary directory path
:param exe_contents: entry point python file
:type exe_contents: string
:param dists: distributions to include, typically sdists or bdists
:param coverage: include coverage header
"""
dists = dists or []
with open(os.path.join(td, 'exe.py'), 'w') as fp:
fp.write(exe_contents)
pb = PEXBuilder(path=td, preamble=COVERAGE_PREAMBLE if coverage else None)
for dist in dists:
pb.add_egg(dist.location)
pb.set_executable(os.path.join(td, 'exe.py'))
pb.freeze()
return pb
# TODO(wickman) Why not PEX.run?
def run_simple_pex(pex, args=(), env=None):
po = subprocess.Popen(
[pex] + list(args),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env)
po.wait()
return po.stdout.read(), po.returncode
def run_simple_pex_test(body, args=(), env=None, dists=None, coverage=False):
with nested(temporary_dir(), temporary_dir()) as (td1, td2):
pb = write_simple_pex(td1, body, dists=dists, coverage=coverage)
pex = os.path.join(td2, 'app.pex')
pb.build(pex)
return run_simple_pex(pex, args=args, env=env)
def _iter_filter(data_dict):
fragment = '/%s/_pex/' % PEXBuilder.BOOTSTRAP_DIR
for filename, records in data_dict.items():
try:
bi = filename.index(fragment)
except ValueError:
continue
# rewrite to look like root source
yield ('pex/' + filename[bi + len():], records)
def combine_pex_coverage(coverage_file_iter):
from coverage.data import CoverageData
combined = CoverageData(basename='.coverage_combined')
for filename in coverage_file_iter:
cov = CoverageData(basename=filename)
cov.read()
combined.add_line_data(dict(_iter_filter(cov.line_data())))
combined.add_arc_data(dict(_iter_filter(cov.arc_data())))
combined.write()
return combined.filename