import os
import re
import sys
from inspect import cleandoc
from typing import Callable, Dict, List, Optional, TextIO, Tuple, Union
import sh
from requests import Response
from . import automation_server, api_support, artifact_collector, reporter, code_report_collector
from .error_state import HasErrorState
from .output import HasOutput, Output
from .project_directory import ProjectDirectory
from .structure_handler import HasStructure, RunningStepBase
from .. import configuration_support
from ..lib import utils
from ..lib.ci_exception import CiException, CriticalCiException
from ..lib.gravity import Dependency
from ..lib.utils import make_block
__all__ = [
"Launcher",
"check_if_env_set"
]
def make_command(name: str) -> sh.Command:
try:
return sh.Command(name)
except sh.CommandNotFound as e:
raise CiException(f"No such file or command as '{name}'") from e
[docs]def check_if_env_set(configuration: configuration_support.Step) -> bool: # TODO move to configuration
"""
Predicate function for :func:`universum.configuration_support.Configuration.filter`,
used to decide whether this particular configuration should be executed in this
particular environment. For more information see :ref:`filtering`
>>> from universum.configuration_support import Configuration
>>> c = Configuration([dict(if_env_set="MY_VAR != some value")])
>>> check_if_env_set(c[0])
True
>>> c = Configuration([dict(if_env_set="MY_VAR != some value && OTHER_VAR")])
>>> check_if_env_set(c[0])
False
>>> c = Configuration([dict(if_env_set="MY_VAR == some value")])
>>> os.environ["MY_VAR"] = "some value"
>>> check_if_env_set(c[0])
True
:param configuration: :class:`~universum.configuration_support.Step` object
:return: True if environment satisfies described requirements; False otherwise
"""
if configuration.if_env_set:
variables = configuration.if_env_set.split("&&")
for var in variables:
if var.strip():
match = re.match(r"\s*([A-Za-z_]\w*)\s*(!=|==)\s*(.*?)\s*$", var)
# With no operator 'match' is None
# In this case variable should be obligatory set to any positive value
if not match:
if not os.getenv(var.strip()):
return False
if os.getenv(var.strip()) not in ["True", "true", "Yes", "yes", "Y", "y"]:
return False
continue
name, operator, value = match.groups()
# In "==" case variable should be obligatory set to 'value'
if operator == "==":
if os.getenv(name) is None:
return False
if os.getenv(name) != value:
return False
# In "!=" case variable can be unset or set to any value not matching 'value'
elif os.getenv(name) is not None:
if os.getenv(name) == value:
return False
return True
def check_str_match(string: str, include_substrings: List[str], exclude_substrings: List[str]) -> bool:
"""The function to check whether specified string contains 'include' and
does NOT contain 'exclude' substrings.
>>> check_str_match("step 1", [], [])
True
>>> check_str_match("step 1", ["step 1"], [])
True
>>> check_str_match("step 1", ["step 1"], ["step 1"])
False
>>> check_str_match("step 1", [], ["step 1"])
False
>>> check_str_match("step 1", ["step "], ["1"])
False
:rtype: bool
"""
result = not include_substrings
for substr in include_substrings:
if substr in string:
result = True
break
for substr in exclude_substrings:
if substr in string:
result = False
break
return result
def get_match_patterns(filters: Union[str, List[str]]) -> Tuple[List[str], List[str]]:
"""The function to parse 'filters' defined as a single string into the lists
of 'include' and 'exclude' patterns.
>>> get_match_patterns("")
([], [])
>>> get_match_patterns(":")
([], [])
>>> get_match_patterns(":!")
([], [])
>>> get_match_patterns("f:")
(['f'], [])
>>> get_match_patterns("f:!f 1")
(['f'], ['f 1'])
>>> get_match_patterns("f:!f 1:f 2:!f 3")
(['f', 'f 2'], ['f 1', 'f 3'])
>>> get_match_patterns(["f", "!f 1"])
(['f'], ['f 1'])
"""
if not isinstance(filters, str):
filters = ":".join(filters) if filters else ""
include: List[str] = []
exclude: List[str] = []
filters = filters.split(':')
for f in filters:
if f.startswith('!'):
if len(f) > 1:
exclude.append(f[1:])
elif f:
include.append(f)
return include, exclude
class RunningStep(RunningStepBase):
# TODO: change to non-singleton module and get all dependencies by ourselves
def __init__(self, item: configuration_support.Step,
out: Output,
send_tag: Callable[[str], Response],
log_file: Optional[TextIO],
working_directory: str,
additional_environment: Dict[str, str],
background: bool,
artifact_collector_obj: artifact_collector.ArtifactCollector) -> None:
super().__init__()
self.configuration: configuration_support.Step = item
self.out: Output = out
self.send_tag = send_tag
self.file: Optional[TextIO] = log_file
self.working_directory: str = working_directory
self.environment: Dict[str, str] = os.environ.copy()
self.environment.update(item.environment)
self.environment.update(additional_environment)
self.cmd: sh.Command
self.process: sh.RunningCommand
self._is_background = background
self._postponed_out: List[Tuple[Callable[[str], None], str]] = []
self._needs_finalization: bool = True
self._error: Optional[str] = None
self.artifact_collector = artifact_collector_obj
def prepare_command(self) -> bool: # FIXME: refactor
if not self.configuration.command:
self.out.log("No 'command' found. Nothing to execute")
return False
command_name: str = utils.strip_path_start(self.configuration.command[0])
try:
self.cmd = make_command(command_name)
except CiException:
command_name = os.path.abspath(os.path.join(self.working_directory, command_name))
self.cmd = make_command(command_name)
return True
def start(self):
self._error = None
try:
if not self.prepare_command():
self._needs_finalization = False
return
except CiException as ex:
self._error = str(ex)
return
self._postponed_out = []
self.process = self.cmd(*self.configuration.command[1:],
_iter=True,
_bg_exc=False,
_cwd=self.working_directory,
_env=self.environment,
_bg=self._is_background,
_out=self.handle_stdout,
_err=self.handle_stderr)
log_cmd = utils.trim_and_convert_to_unicode(self.process.ran)
self.out.log_external_command(log_cmd)
if self.file:
self.file.write("$ " + log_cmd + "\n")
def handle_stdout(self, line: str = "") -> None:
line = utils.trim_and_convert_to_unicode(line)
if self.file:
self.file.write(line + "\n")
elif self._is_background:
self._postponed_out.append((self.out.log_stdout, line))
else:
self.out.log_stdout(line)
def handle_stderr(self, line: str) -> None:
line = utils.trim_and_convert_to_unicode(line)
if self.file:
self.file.write("stderr: " + line + "\n")
elif self._is_background:
self._postponed_out.append((self.out.log_stderr, line))
else:
self.out.log_stderr(line)
def finalize(self) -> None:
self._error = None
if not self._needs_finalization:
if self._is_background:
self._is_background = False
self.out.log("Nothing was executed: this background step had no command")
return
try:
text = ""
try:
self.process.wait()
except Exception as e:
if isinstance(e, sh.ErrorReturnCode):
text = f"Module sh got exit code {e.exit_code}\n"
if e.stderr:
text += utils.trim_and_convert_to_unicode(e.stderr) + "\n"
else:
text = str(e) + '\n'
self._handle_postponed_out()
if text:
text = utils.trim_and_convert_to_unicode(text)
if self.file:
self.file.write(text + "\n")
self._error = text
finally:
tag: Optional[str] = self._get_teamcity_build_tag()
if tag:
self._assign_teamcity_build_tag(tag)
self.handle_stdout()
if self.file:
self.file.close()
self._is_background = False
def get_error(self) -> Optional[str]:
return self._error
def collect_artifacts(self) -> None:
self.artifact_collector.collect_step_artifacts(self.configuration.artifacts,
self.configuration.report_artifacts)
def _handle_postponed_out(self) -> None:
for item in self._postponed_out:
item[0](item[1])
self._postponed_out = []
def _get_teamcity_build_tag(self) -> Optional[str]:
if self.configuration.is_conditional:
return None # conditional steps always succeed, no sense to set a tag
tag: str = self.configuration.fail_tag if self._error else self.configuration.pass_tag
return tag # can be also None if not set for current Configuration
def _assign_teamcity_build_tag(self, tag: str) -> None:
response: Response = self.send_tag(tag)
if response.status_code != 200:
self.out.log_error(response.text)
else:
self.out.log("Tag '" + tag + "' added to build.")
class Launcher(ProjectDirectory, HasOutput, HasStructure, HasErrorState):
artifacts_factory = Dependency(artifact_collector.ArtifactCollector)
api_support_factory = Dependency(api_support.ApiSupport)
reporter_factory = Dependency(reporter.Reporter)
server_factory = Dependency(automation_server.AutomationServerForHostingBuild)
code_report_collector_factory = Dependency(code_report_collector.CodeReportCollector)
@staticmethod
def define_arguments(argument_parser):
output_parser = argument_parser.get_or_create_group("Output")
output_parser.add_argument("--out", "-o", dest="output", choices=["console", "file"],
help="Define whether to print build logs to console or file. "
"Log file names are generated based on the names of build steps. "
"By default, logs are printed to console when the build is launched on "
"Jenkins or TeamCity agent")
parser = argument_parser.get_or_create_group("Configuration execution",
"External command launching and reporting parameters")
parser.add_argument("--config", "-cfg", dest="config_path", metavar="CONFIG_PATH",
help="Path to project configuration file (example: -cfg=my/project/my_conf.py). "
"Default is ``.universum.py``")
parser.add_argument("--filter", "-f", dest="step_filter", action='append', metavar="STEP_FILTER",
help="Filter steps to execute. A single filter or a set of filters separated by ':'. "
"Exclude using '!' symbol before filter. "
"Example: -f='str1:!not str2' OR -f='str1' -f='!not str2'. "
"See online documentation for more details")
parser.add_hidden_argument("--launcher-output", "-lo", dest="output", choices=["console", "file"],
help="Deprecated option. Please use '--out' instead", is_hidden=True)
parser.add_hidden_argument("--launcher-config-path", "-lcp", dest="config_path", is_hidden=True,
help="Deprecated option. Please use '--steps-config' instead")
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.source_project_configs: configuration_support.Configuration
self.project_config: configuration_support.Configuration = configuration_support.Configuration()
self.output: Output = self.settings.output
if self.output is None:
if utils.detect_environment() == "terminal":
self.output = "file"
else:
self.output = "console"
self.config_path = self.settings.config_path
if not self.config_path:
self.config_path = ".universum.py"
self.artifact_collector = self.artifacts_factory()
self.api_support = self.api_support_factory()
self.reporter = self.reporter_factory()
self.server = self.server_factory()
self.code_report_collector = self.code_report_collector_factory()
self.include_patterns, self.exclude_patterns = get_match_patterns(self.settings.step_filter)
@make_block("Processing project configs")
def process_project_configs(self) -> configuration_support.Configuration:
config_path = utils.parse_path(self.config_path, self.settings.project_root)
configuration_support.set_project_root(self.settings.project_root)
configuration_support.set_config_path(self.settings.config_path)
config_globals: Dict[str, configuration_support.Configuration] = {}
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(os.path.join(os.path.dirname(config_path)))
try:
with open(config_path, encoding="utf-8") as config_file:
exec(config_file.read(), config_globals) # pylint: disable=exec-used
self.source_project_configs = config_globals["configs"]
dump_file: TextIO = self.artifact_collector.create_text_file("CONFIGS_DUMP.txt")
dump_file.write(self.source_project_configs.dump())
dump_file.close()
config = self.source_project_configs.filter(check_if_env_set)
self.project_config = config.filter(
lambda cfg: check_str_match(cfg.name, self.include_patterns, self.exclude_patterns))
except IOError as e:
text = f"""{e}\n
Possible reasons of this error:\n
* There is no Universum configuration file in project repository\n
* Config path, passed to the script ('{self.config_path}'),
does not lead to actual Universum configuration file location\n
* Some problems occurred while downloading or copying the repository
"""
raise CriticalCiException(cleandoc(text)) from e
except KeyError as e:
text = "KeyError: " + str(e) + '\n'
text += "Possible reason of this error: variable 'configs' is not defined in Universum configuration file"
raise CriticalCiException(text) from e
except Exception as e:
ex_traceback = sys.exc_info()[2]
text = "Exception while processing Universum configuration file:\n" + \
utils.format_traceback(e, ex_traceback) + \
"\nTry to execute ``configs.dump()`` to make sure no exceptions occur in that case."
raise CriticalCiException(text) from e
if not self.project_config:
text = "Project configs are empty, abort"
if self.include_patterns or self.exclude_patterns:
text += "\nRecheck filters applied:\n" + \
f"\tInclude patterns: {self.include_patterns}\n" + \
f"\tExclude patterns: {self.exclude_patterns}"
raise CriticalCiException(text)
if self._is_conditional_step_with_children_present(self.project_config):
raise CriticalCiException("Conditional steps with child configurations are not supported")
self._warn_if_critical_conditional_steps_present(self.project_config)
return self.project_config
def create_process(self, item: configuration_support.Step) -> RunningStep:
working_directory = utils.parse_path(utils.strip_path_start(item.directory.rstrip("/")),
self.settings.project_root)
log_file: Optional[TextIO] = None
if self.output == "file":
log_file = self.artifact_collector.create_text_file(item.name + "_log.txt")
self.out.log("Execution log is redirected to file")
additional_environment = self.api_support.get_environment_settings()
return RunningStep(item, self.out, self.server.add_build_tag, log_file, working_directory,
additional_environment, item.background, self.artifact_collector)
def launch_custom_configs(self, custom_configs: configuration_support.Configuration) -> None:
self.structure.execute_step_structure(custom_configs, self.create_process)
@make_block("Executing build steps")
def launch_project(self) -> None:
self.reporter.add_block_to_report(self.structure.get_current_block())
self.structure.execute_step_structure(self.project_config, self.create_process)
# TODO: implement support of conditional step with children
# https://github.com/Samsung/Universum/issues/709
@staticmethod
def _is_conditional_step_with_children_present(
configuration: Optional[configuration_support.Configuration]) -> bool:
if not configuration:
return False
for step in configuration.configs:
if step.is_conditional and step.children:
return True
if Launcher._is_conditional_step_with_children_present(step.children) or \
Launcher._is_conditional_step_with_children_present(step.if_succeeded) or \
Launcher._is_conditional_step_with_children_present(step.if_failed):
return True
return False
def _warn_if_critical_conditional_steps_present(
self, configuration: Optional[configuration_support.Configuration]) -> None:
step_names: List[str] = Launcher._get_critical_conditional_step_names_recursively(configuration)
if not step_names:
return
step_names = [f'\t* "{name}"' for name in step_names]
step_names_str: str = "\n".join(step_names)
self.out.log(f"WARNING: 'critical' flag will be ignored for conditional steps: \n{step_names_str}\n "
"Set it to the 'if_failed' branch step instead")
@staticmethod
def _get_critical_conditional_step_names_recursively(
configuration: Optional[configuration_support.Configuration]) -> List[str]:
step_names: List[str] = []
if not configuration:
return step_names
for step in configuration.configs:
if step.is_conditional and step.critical:
step_names.append(step.name)
step_names.extend(Launcher._get_critical_conditional_step_names_recursively(step.children))
step_names.extend(Launcher._get_critical_conditional_step_names_recursively(step.if_succeeded))
step_names.extend(Launcher._get_critical_conditional_step_names_recursively(step.if_failed))
return step_names