%shebang% # This script must retain compatibility with a wide variety of Python versions # since it is run for every py_binary target. Currently we guarantee support # going back to Python 2.7, and try to support even Python 2.6 on a best-effort # basis. We might abandon 2.6 support once users have the ability to control the # above shebang string via the Python toolchain (#8685). from __future__ import absolute_import from __future__ import division from __future__ import print_function import sys # The Python interpreter unconditionally prepends the directory containing this # script (following symlinks) to the import path. This is the cause of #9239, # and is a special case of #7091. We therefore explicitly delete that entry. # TODO(#7091): Remove this hack when no longer necessary. del sys.path[0] import os import subprocess import uuid def IsRunningFromZip(): return %is_zipfile% if IsRunningFromZip(): import shutil import tempfile import zipfile else: import re # Return True if running on Windows def IsWindows(): return os.name == 'nt' def GetWindowsPathWithUNCPrefix(path): """Adds UNC prefix after getting a normalized absolute Windows path. No-op for non-Windows platforms or if running under python2. """ path = path.strip() # No need to add prefix for non-Windows platforms. # And \\?\ doesn't work in python 2 or on mingw if not IsWindows() or sys.version_info[0] < 3: return path # Starting in Windows 10, version 1607(OS build 14393), MAX_PATH limitations have been # removed from common Win32 file and directory functions. # Related doc: https://docs.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=cmd#enable-long-paths-in-windows-10-version-1607-and-later import platform if platform.win32_ver()[1] >= '10.0.14393': return path # import sysconfig only now to maintain python 2.6 compatibility import sysconfig if sysconfig.get_platform() == 'mingw': return path # Lets start the unicode fun unicode_prefix = '\\\\?\\' if path.startswith(unicode_prefix): return path # os.path.abspath returns a normalized absolute path return unicode_prefix + os.path.abspath(path) def HasWindowsExecutableExtension(path): return path.endswith('.exe') or path.endswith('.com') or path.endswith('.bat') PYTHON_BINARY = '%python_binary%' if IsWindows() and not HasWindowsExecutableExtension(PYTHON_BINARY): PYTHON_BINARY = PYTHON_BINARY + '.exe' def SearchPath(name): """Finds a file in a given search path.""" search_path = os.getenv('PATH', os.defpath).split(os.pathsep) for directory in search_path: if directory: path = os.path.join(directory, name) if os.path.isfile(path) and os.access(path, os.X_OK): return path return None def FindPythonBinary(module_space): """Finds the real Python binary if it's not a normal absolute path.""" return FindBinary(module_space, PYTHON_BINARY) def PrintVerbose(*args): if os.environ.get("RULES_PYTHON_BOOTSTRAP_VERBOSE"): print("bootstrap:", *args, file=sys.stderr, flush=True) def PrintVerboseCoverage(*args): """Print output if VERBOSE_COVERAGE is non-empty in the environment.""" if os.environ.get("VERBOSE_COVERAGE"): print(*args, file=sys.stderr) def IsVerboseCoverage(): """Returns True if VERBOSE_COVERAGE is non-empty in the environment.""" return os.environ.get("VERBOSE_COVERAGE") def FindCoverageEntryPoint(module_space): cov_tool = '%coverage_tool%' if cov_tool: PrintVerboseCoverage('Using toolchain coverage_tool %r' % cov_tool) else: cov_tool = os.environ.get('PYTHON_COVERAGE') if cov_tool: PrintVerboseCoverage('PYTHON_COVERAGE: %r' % cov_tool) if cov_tool: return FindBinary(module_space, cov_tool) return None def FindBinary(module_space, bin_name): """Finds the real binary if it's not a normal absolute path.""" if not bin_name: return None if bin_name.startswith("//"): # Case 1: Path is a label. Not supported yet. raise AssertionError( "Bazel does not support execution of Python interpreters via labels yet" ) elif os.path.isabs(bin_name): # Case 2: Absolute path. return bin_name # Use normpath() to convert slashes to os.sep on Windows. elif os.sep in os.path.normpath(bin_name): # Case 3: Path is relative to the repo root. return os.path.join(module_space, bin_name) else: # Case 4: Path has to be looked up in the search path. return SearchPath(bin_name) def CreatePythonPathEntries(python_imports, module_space): parts = python_imports.split(':') return [module_space] + ['%s/%s' % (module_space, path) for path in parts] def FindModuleSpace(main_rel_path): """Finds the runfiles tree.""" # When the calling process used the runfiles manifest to resolve the # location of this stub script, the path may be expanded. This means # argv[0] may no longer point to a location inside the runfiles # directory. We should therefore respect RUNFILES_DIR and # RUNFILES_MANIFEST_FILE set by the caller. runfiles_dir = os.environ.get('RUNFILES_DIR', None) if not runfiles_dir: runfiles_manifest_file = os.environ.get('RUNFILES_MANIFEST_FILE', '') if (runfiles_manifest_file.endswith('.runfiles_manifest') or runfiles_manifest_file.endswith('.runfiles/MANIFEST')): runfiles_dir = runfiles_manifest_file[:-9] # Be defensive: the runfiles dir should contain our main entry point. If # it doesn't, then it must not be our runfiles directory. if runfiles_dir and os.path.exists(os.path.join(runfiles_dir, main_rel_path)): return runfiles_dir stub_filename = sys.argv[0] if not os.path.isabs(stub_filename): stub_filename = os.path.join(os.getcwd(), stub_filename) while True: module_space = stub_filename + ('.exe' if IsWindows() else '') + '.runfiles' if os.path.isdir(module_space): return module_space runfiles_pattern = r'(.*\.runfiles)' + (r'\\' if IsWindows() else '/') + '.*' matchobj = re.match(runfiles_pattern, stub_filename) if matchobj: return matchobj.group(1) if not os.path.islink(stub_filename): break target = os.readlink(stub_filename) if os.path.isabs(target): stub_filename = target else: stub_filename = os.path.join(os.path.dirname(stub_filename), target) raise AssertionError('Cannot find .runfiles directory for %s' % sys.argv[0]) def ExtractZip(zip_path, dest_dir): """Extracts the contents of a zip file, preserving the unix file mode bits. These include the permission bits, and in particular, the executable bit. Ideally the zipfile module should set these bits, but it doesn't. See: https://bugs.python.org/issue15795. Args: zip_path: The path to the zip file to extract dest_dir: The path to the destination directory """ zip_path = GetWindowsPathWithUNCPrefix(zip_path) dest_dir = GetWindowsPathWithUNCPrefix(dest_dir) with zipfile.ZipFile(zip_path) as zf: for info in zf.infolist(): zf.extract(info, dest_dir) # UNC-prefixed paths must be absolute/normalized. See # https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file#maximum-path-length-limitation file_path = os.path.abspath(os.path.join(dest_dir, info.filename)) # The Unix st_mode bits (see "man 7 inode") are stored in the upper 16 # bits of external_attr. Of those, we set the lower 12 bits, which are the # file mode bits (since the file type bits can't be set by chmod anyway). attrs = info.external_attr >> 16 if attrs != 0: # Rumor has it these can be 0 for zips created on Windows. os.chmod(file_path, attrs & 0o7777) # Create the runfiles tree by extracting the zip file def CreateModuleSpace(): temp_dir = tempfile.mkdtemp('', 'Bazel.runfiles_') ExtractZip(os.path.dirname(__file__), temp_dir) # IMPORTANT: Later code does `rm -fr` on dirname(module_space) -- it's # important that deletion code be in sync with this directory structure return os.path.join(temp_dir, 'runfiles') # Returns repository roots to add to the import path. def GetRepositoriesImports(module_space, import_all): if import_all: repo_dirs = [os.path.join(module_space, d) for d in os.listdir(module_space)] repo_dirs.sort() return [d for d in repo_dirs if os.path.isdir(d)] return [os.path.join(module_space, '%workspace_name%')] def RunfilesEnvvar(module_space): """Finds the runfiles manifest or the runfiles directory. Returns: A tuple of (var_name, var_value) where var_name is either 'RUNFILES_DIR' or 'RUNFILES_MANIFEST_FILE' and var_value is the path to that directory or file, or (None, None) if runfiles couldn't be found. """ # If this binary is the data-dependency of another one, the other sets # RUNFILES_MANIFEST_FILE or RUNFILES_DIR for our sake. runfiles = os.environ.get('RUNFILES_MANIFEST_FILE', None) if runfiles: return ('RUNFILES_MANIFEST_FILE', runfiles) runfiles = os.environ.get('RUNFILES_DIR', None) if runfiles: return ('RUNFILES_DIR', runfiles) # If running from a zip, there's no manifest file. if IsRunningFromZip(): return ('RUNFILES_DIR', module_space) # Look for the runfiles "output" manifest, argv[0] + ".runfiles_manifest" runfiles = module_space + '_manifest' if os.path.exists(runfiles): return ('RUNFILES_MANIFEST_FILE', runfiles) # Look for the runfiles "input" manifest, argv[0] + ".runfiles/MANIFEST" # Normally .runfiles_manifest and MANIFEST are both present, but the # former will be missing for zip-based builds or if someone copies the # runfiles tree elsewhere. runfiles = os.path.join(module_space, 'MANIFEST') if os.path.exists(runfiles): return ('RUNFILES_MANIFEST_FILE', runfiles) # If running in a sandbox and no environment variables are set, then # Look for the runfiles next to the binary. if module_space.endswith('.runfiles') and os.path.isdir(module_space): return ('RUNFILES_DIR', module_space) return (None, None) def Deduplicate(items): """Efficiently filter out duplicates, keeping the first element only.""" seen = set() for it in items: if it not in seen: seen.add(it) yield it def InstrumentedFilePaths(): """Yields tuples of realpath of each instrumented file with the relative path.""" manifest_filename = os.environ.get('COVERAGE_MANIFEST') if not manifest_filename: return with open(manifest_filename, "r") as manifest: for line in manifest: filename = line.strip() if not filename: continue try: realpath = os.path.realpath(filename) except OSError: print( "Could not find instrumented file {}".format(filename), file=sys.stderr) continue if realpath != filename: PrintVerboseCoverage("Fixing up {} -> {}".format(realpath, filename)) yield (realpath, filename) def UnresolveSymlinks(output_filename): # type: (str) -> None """Replace realpath of instrumented files with the relative path in the lcov output. Though we are asking coveragepy to use relative file names, currently ignore that for purposes of generating the lcov report (and other reports which are not the XML report), so we need to go and fix up the report. This function is a workaround for that issue. Once that issue is fixed upstream and the updated version is widely in use, this should be removed. See https://github.com/nedbat/coveragepy/issues/963. """ substitutions = list(InstrumentedFilePaths()) if substitutions: unfixed_file = output_filename + '.tmp' os.rename(output_filename, unfixed_file) with open(unfixed_file, "r") as unfixed: with open(output_filename, "w") as output_file: for line in unfixed: if line.startswith('SF:'): for (realpath, filename) in substitutions: line = line.replace(realpath, filename) output_file.write(line) os.unlink(unfixed_file) def ExecuteFile(python_program, main_filename, args, env, module_space, coverage_entrypoint, workspace, delete_module_space): # type: (str, str, list[str], dict[str, str], str, str|None, str|None) -> ... """Executes the given Python file using the various environment settings. This will not return, and acts much like os.execv, except is much more restricted, and handles Bazel-related edge cases. Args: python_program: (str) Path to the Python binary to use for execution main_filename: (str) The Python file to execute args: (list[str]) Additional args to pass to the Python file env: (dict[str, str]) A dict of environment variables to set for the execution module_space: (str) Path to the module space/runfiles tree directory coverage_entrypoint: (str|None) Path to the coverage tool entry point file. workspace: (str|None) Name of the workspace to execute in. This is expected to be a directory under the runfiles tree. delete_module_space: (bool), True if the module space should be deleted after a successful (exit code zero) program run, False if not. """ # We want to use os.execv instead of subprocess.call, which causes # problems with signal passing (making it difficult to kill # Bazel). However, these conditions force us to run via # subprocess.call instead: # # - On Windows, os.execv doesn't handle arguments with spaces # correctly, and it actually starts a subprocess just like # subprocess.call. # - When running in a workspace or zip file, we need to clean up the # workspace after the process finishes so control must return here. # - If we may need to emit a host config warning after execution, we # can't execv because we need control to return here. This only # happens for targets built in the host config. # - For coverage targets, at least coveragepy requires running in # two invocations, which also requires control to return here. # if not (IsWindows() or workspace or coverage_entrypoint or delete_module_space): _RunExecv(python_program, main_filename, args, env) if coverage_entrypoint is not None: ret_code = _RunForCoverage(python_program, main_filename, args, env, coverage_entrypoint, workspace) else: ret_code = subprocess.call( [python_program, main_filename] + args, env=env, cwd=workspace ) if delete_module_space: # NOTE: dirname() is called because CreateModuleSpace() creates a # sub-directory within a temporary directory, and we want to remove the # whole temporary directory. shutil.rmtree(os.path.dirname(module_space), True) sys.exit(ret_code) def _RunExecv(python_program, main_filename, args, env): # type: (str, str, list[str], dict[str, str]) -> ... """Executes the given Python file using the various environment settings.""" os.environ.update(env) PrintVerbose("RunExecv: environ:", os.environ) argv = [python_program, main_filename] + args PrintVerbose("RunExecv: argv:", python_program, argv) os.execv(python_program, argv) def _RunForCoverage(python_program, main_filename, args, env, coverage_entrypoint, workspace): # type: (str, str, list[str], dict[str, str], str, str|None) -> int """Collects coverage infomration for the given Python file. Args: python_program: (str) Path to the Python binary to use for execution main_filename: (str) The Python file to execute args: (list[str]) Additional args to pass to the Python file env: (dict[str, str]) A dict of environment variables to set for the execution coverage_entrypoint: (str|None) Path to the coverage entry point to execute with. workspace: (str|None) Name of the workspace to execute in. This is expected to be a directory under the runfiles tree, and will recursively delete the runfiles directory if set. """ # We need for coveragepy to use relative paths. This can only be configured unique_id = uuid.uuid4() rcfile_name = os.path.join(os.environ['COVERAGE_DIR'], ".coveragerc_{}".format(unique_id)) with open(rcfile_name, "w") as rcfile: rcfile.write('''[run] relative_files = True ''') PrintVerboseCoverage('Coverage entrypoint:', coverage_entrypoint) # First run the target Python file via coveragepy to create a .coverage # database file, from which we can later export lcov. ret_code = subprocess.call( [ python_program, coverage_entrypoint, "run", "--rcfile=" + rcfile_name, "--append", "--branch", main_filename ] + args, env=env, cwd=workspace ) output_filename = os.path.join(os.environ['COVERAGE_DIR'], 'pylcov.dat') PrintVerboseCoverage('Converting coveragepy database to lcov:', output_filename) # Run coveragepy again to convert its .coverage database file into lcov. # Under normal conditions running lcov outputs to stdout/stderr, which causes problems for `coverage`. params = [python_program, coverage_entrypoint, "lcov", "--rcfile=" + rcfile_name, "-o", output_filename, "--quiet"] kparams = {"env": env, "cwd": workspace, "stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL} if IsVerboseCoverage(): # reconnect stdout/stderr to lcov generation. Should be useful for debugging `coverage` issues. params.remove("--quiet") kparams['stdout'] = sys.stderr kparams['stderr'] = sys.stderr ret_code = subprocess.call( params, **kparams ) or ret_code try: os.unlink(rcfile_name) except OSError as err: # It's possible that the profiled program might execute another Python # binary through a wrapper that would then delete the rcfile. Not much # we can do about that, besides ignore the failure here. PrintVerboseCoverage('Error removing temporary coverage rc file:', err) if os.path.isfile(output_filename): UnresolveSymlinks(output_filename) return ret_code def Main(): args = sys.argv[1:] new_env = {} # The main Python source file. # The magic string percent-main-percent is replaced with the runfiles-relative # filename of the main file of the Python binary in BazelPythonSemantics.java. main_rel_path = '%main%' if IsWindows(): main_rel_path = main_rel_path.replace('/', os.sep) if IsRunningFromZip(): module_space = CreateModuleSpace() delete_module_space = True else: module_space = FindModuleSpace(main_rel_path) delete_module_space = False python_imports = '%imports%' python_path_entries = CreatePythonPathEntries(python_imports, module_space) python_path_entries += GetRepositoriesImports(module_space, %import_all%) # Remove duplicates to avoid overly long PYTHONPATH (#10977). Preserve order, # keep first occurrence only. python_path_entries = [ GetWindowsPathWithUNCPrefix(d) for d in python_path_entries ] old_python_path = os.environ.get('PYTHONPATH') if old_python_path: python_path_entries += old_python_path.split(os.pathsep) python_path = os.pathsep.join(Deduplicate(python_path_entries)) if IsWindows(): python_path = python_path.replace('/', os.sep) new_env['PYTHONPATH'] = python_path runfiles_envkey, runfiles_envvalue = RunfilesEnvvar(module_space) if runfiles_envkey: new_env[runfiles_envkey] = runfiles_envvalue # Don't prepend a potentially unsafe path to sys.path # See: https://docs.python.org/3.11/using/cmdline.html#envvar-PYTHONSAFEPATH new_env['PYTHONSAFEPATH'] = '1' main_filename = os.path.join(module_space, main_rel_path) main_filename = GetWindowsPathWithUNCPrefix(main_filename) assert os.path.exists(main_filename), \ 'Cannot exec() %r: file not found.' % main_filename assert os.access(main_filename, os.R_OK), \ 'Cannot exec() %r: file not readable.' % main_filename program = python_program = FindPythonBinary(module_space) if python_program is None: raise AssertionError('Could not find python binary: ' + PYTHON_BINARY) # COVERAGE_DIR is set if coverage is enabled and instrumentation is configured # for something, though it could be another program executing this one or # one executed by this one (e.g. an extension module). if os.environ.get('COVERAGE_DIR'): cov_tool = FindCoverageEntryPoint(module_space) if cov_tool is None: PrintVerboseCoverage('Coverage was enabled, but python coverage tool was not configured.') else: # Inhibit infinite recursion: if 'PYTHON_COVERAGE' in os.environ: del os.environ['PYTHON_COVERAGE'] if not os.path.exists(cov_tool): raise EnvironmentError( 'Python coverage tool %r not found. ' 'Try running with VERBOSE_COVERAGE=1 to collect more information.' % cov_tool ) # coverage library expects sys.path[0] to contain the library, and replaces # it with the directory of the program it starts. Our actual sys.path[0] is # the runfiles directory, which must not be replaced. # CoverageScript.do_execute() undoes this sys.path[0] setting. # # Update sys.path such that python finds the coverage package. The coverage # entry point is coverage.coverage_main, so we need to do twice the dirname. python_path_entries = new_env['PYTHONPATH'].split(os.pathsep) python_path_entries.append(os.path.dirname(os.path.dirname(cov_tool))) new_env['PYTHONPATH'] = os.pathsep.join(Deduplicate(python_path_entries)) else: cov_tool = None # Some older Python versions on macOS (namely Python 3.7) may unintentionally # leave this environment variable set after starting the interpreter, which # causes problems with Python subprocesses correctly locating sys.executable, # which subsequently causes failure to launch on Python 3.11 and later. if '__PYVENV_LAUNCHER__' in os.environ: del os.environ['__PYVENV_LAUNCHER__'] new_env.update((key, val) for key, val in os.environ.items() if key not in new_env) workspace = None if IsRunningFromZip(): # If RUN_UNDER_RUNFILES equals 1, it means we need to # change directory to the right runfiles directory. # (So that the data files are accessible) if os.environ.get('RUN_UNDER_RUNFILES') == '1': workspace = os.path.join(module_space, '%workspace_name%') try: sys.stdout.flush() # NOTE: ExecuteFile may call execve() and lines after this will never run. ExecuteFile( python_program, main_filename, args, new_env, module_space, cov_tool, workspace, delete_module_space = delete_module_space, ) except EnvironmentError: # This works from Python 2.4 all the way to 3.x. e = sys.exc_info()[1] # This exception occurs when os.execv() fails for some reason. if not getattr(e, 'filename', None): e.filename = program # Add info to error message raise if __name__ == '__main__': Main()