""" Profiles to represent individual test hosts or a user-provided inventory file. """
from __future__ import annotations
import abc
import dataclasses
import os
import tempfile
import time
import typing as t
from . io import (
write_text_file ,
)
from . config import (
CommonConfig ,
EnvironmentConfig ,
IntegrationConfig ,
TerminateMode ,
)
from . host_configs import (
ControllerConfig ,
ControllerHostConfig ,
DockerConfig ,
HostConfig ,
NetworkInventoryConfig ,
NetworkRemoteConfig ,
OriginConfig ,
PosixConfig ,
PosixRemoteConfig ,
PosixSshConfig ,
PythonConfig ,
RemoteConfig ,
VirtualPythonConfig ,
WindowsInventoryConfig ,
WindowsRemoteConfig ,
)
from . core_ci import (
AnsibleCoreCI ,
SshKey ,
VmResource ,
)
from . util import (
ApplicationError ,
SubprocessError ,
cache ,
display ,
get_type_map ,
sanitize_host_name ,
sorted_versions ,
InternalError ,
)
from . util_common import (
intercept_python ,
)
from . docker_util import (
docker_exec ,
docker_rm ,
get_docker_hostname ,
)
from . bootstrap import (
BootstrapDocker ,
BootstrapRemote ,
)
from . venv import (
get_virtual_python ,
)
from . ssh import (
SshConnectionDetail ,
)
from . ansible_util import (
ansible_environment ,
get_hosts ,
parse_inventory ,
)
from . containers import (
CleanupMode ,
HostType ,
get_container_database ,
run_support_container ,
)
from . connections import (
Connection ,
DockerConnection ,
LocalConnection ,
SshConnection ,
)
from . become import (
Become ,
Su ,
Sudo ,
)
TControllerHostConfig = t . TypeVar ( ' TControllerHostConfig ' , bound = ControllerHostConfig )
THostConfig = t . TypeVar ( ' THostConfig ' , bound = HostConfig )
TPosixConfig = t . TypeVar ( ' TPosixConfig ' , bound = PosixConfig )
TRemoteConfig = t . TypeVar ( ' TRemoteConfig ' , bound = RemoteConfig )
@dataclasses.dataclass ( frozen = True )
class Inventory :
""" Simple representation of an Ansible inventory. """
host_groups : t . Dict [ str , t . Dict [ str , t . Dict [ str , t . Union [ str , int ] ] ] ]
extra_groups : t . Optional [ t . Dict [ str , t . List [ str ] ] ] = None
@staticmethod
def create_single_host ( name , variables ) : # type: (str, t.Dict[str, t.Union[str, int]]) -> Inventory
""" Return an inventory instance created from the given hostname and variables. """
return Inventory ( host_groups = dict ( all = { name : variables } ) )
def write ( self , args , path ) : # type: (CommonConfig, str) -> None
""" Write the given inventory to the specified path on disk. """
# NOTE: Switching the inventory generation to write JSON would be nice, but is currently not possible due to the use of hard-coded inventory filenames.
# The name `inventory` works for the POSIX integration tests, but `inventory.winrm` and `inventory.networking` will only parse in INI format.
# If tests are updated to use the `INVENTORY_PATH` environment variable, then this could be changed.
# Also, some tests detect the test type by inspecting the suffix on the inventory filename, which would break if it were changed.
inventory_text = ' '
for group , hosts in self . host_groups . items ( ) :
inventory_text + = f ' [ { group } ] \n '
for host , variables in hosts . items ( ) :
kvp = ' ' . join ( f ' { key } = " { value } " ' for key , value in variables . items ( ) )
inventory_text + = f ' { host } { kvp } \n '
inventory_text + = ' \n '
for group , children in ( self . extra_groups or { } ) . items ( ) :
inventory_text + = f ' [ { group } ] \n '
for child in children :
inventory_text + = f ' { child } \n '
inventory_text + = ' \n '
inventory_text = inventory_text . strip ( )
if not args . explain :
write_text_file ( path , inventory_text + ' \n ' )
display . info ( f ' >>> Inventory \n { inventory_text } ' , verbosity = 3 )
class HostProfile ( t . Generic [ THostConfig ] , metaclass = abc . ABCMeta ) :
""" Base class for host profiles. """
def __init__ ( self ,
* ,
args , # type: EnvironmentConfig
config , # type: THostConfig
targets , # type: t.Optional[t.List[HostConfig]]
) : # type: (...) -> None
self . args = args
self . config = config
self . controller = bool ( targets )
self . targets = targets or [ ]
self . state = { } # type: t.Dict[str, t.Any]
""" State that must be persisted across delegation. """
self . cache = { } # type: t.Dict[str, t.Any]
""" Cache that must not be persisted across delegation. """
def provision ( self ) : # type: () -> None
""" Provision the host before delegation. """
def setup ( self ) : # type: () -> None
""" Perform out-of-band setup before delegation. """
def deprovision ( self ) : # type: () -> None
""" Deprovision the host after delegation has completed. """
def wait ( self ) : # type: () -> None
""" Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets. """
def configure ( self ) : # type: () -> None
""" Perform in-band configuration. Executed before delegation for the controller and after delegation for targets. """
def __getstate__ ( self ) :
return { key : value for key , value in self . __dict__ . items ( ) if key not in ( ' args ' , ' cache ' ) }
def __setstate__ ( self , state ) :
self . __dict__ . update ( state )
# args will be populated after the instances are restored
self . cache = { }
class PosixProfile ( HostProfile [ TPosixConfig ] , metaclass = abc . ABCMeta ) :
""" Base class for POSIX host profiles. """
@property
def python ( self ) : # type: () -> PythonConfig
"""
The Python to use for this profile .
If it is a virtual python , it will be created the first time it is requested .
"""
python = self . state . get ( ' python ' )
if not python :
python = self . config . python
if isinstance ( python , VirtualPythonConfig ) :
python = get_virtual_python ( self . args , python )
self . state [ ' python ' ] = python
return python
class ControllerHostProfile ( PosixProfile [ TControllerHostConfig ] , metaclass = abc . ABCMeta ) :
""" Base class for profiles usable as a controller. """
@abc.abstractmethod
def get_origin_controller_connection ( self ) : # type: () -> Connection
""" Return a connection for accessing the host as a controller from the origin. """
@abc.abstractmethod
def get_working_directory ( self ) : # type: () -> str
""" Return the working directory for the host. """
class SshTargetHostProfile ( HostProfile [ THostConfig ] , metaclass = abc . ABCMeta ) :
""" Base class for profiles offering SSH connectivity. """
@abc.abstractmethod
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
class RemoteProfile ( SshTargetHostProfile [ TRemoteConfig ] , metaclass = abc . ABCMeta ) :
""" Base class for remote instance profiles. """
@property
def core_ci_state ( self ) : # type: () -> t.Optional[t.Dict[str, str]]
""" The saved Ansible Core CI state. """
return self . state . get ( ' core_ci ' )
@core_ci_state.setter
def core_ci_state ( self , value ) : # type: (t.Dict[str, str]) -> None
""" The saved Ansible Core CI state. """
self . state [ ' core_ci ' ] = value
def provision ( self ) : # type: () -> None
""" Provision the host before delegation. """
self . core_ci = self . create_core_ci ( load = True )
self . core_ci . start ( )
self . core_ci_state = self . core_ci . save ( )
def deprovision ( self ) : # type: () -> None
""" Deprovision the host after delegation has completed. """
if self . args . remote_terminate == TerminateMode . ALWAYS or ( self . args . remote_terminate == TerminateMode . SUCCESS and self . args . success ) :
self . delete_instance ( )
@property
def core_ci ( self ) : # type: () -> t.Optional[AnsibleCoreCI]
""" Return the cached AnsibleCoreCI instance, if any, otherwise None. """
return self . cache . get ( ' core_ci ' )
@core_ci.setter
def core_ci ( self , value ) : # type: (AnsibleCoreCI) -> None
""" Cache the given AnsibleCoreCI instance. """
self . cache [ ' core_ci ' ] = value
def get_instance ( self ) : # type: () -> t.Optional[AnsibleCoreCI]
""" Return the current AnsibleCoreCI instance, loading it if not already loaded. """
if not self . core_ci and self . core_ci_state :
self . core_ci = self . create_core_ci ( load = False )
self . core_ci . load ( self . core_ci_state )
return self . core_ci
def delete_instance ( self ) :
""" Delete the AnsibleCoreCI VM instance. """
core_ci = self . get_instance ( )
if not core_ci :
return # instance has not been provisioned
core_ci . stop ( )
def wait_for_instance ( self ) : # type: () -> AnsibleCoreCI
""" Wait for an AnsibleCoreCI VM instance to become ready. """
core_ci = self . get_instance ( )
core_ci . wait ( )
return core_ci
def create_core_ci ( self , load ) : # type: (bool) -> AnsibleCoreCI
""" Create and return an AnsibleCoreCI instance. """
if not self . config . arch :
raise InternalError ( f ' No arch specified for config: { self . config } ' )
return AnsibleCoreCI (
args = self . args ,
resource = VmResource (
platform = self . config . platform ,
version = self . config . version ,
architecture = self . config . arch ,
provider = self . config . provider ,
tag = ' controller ' if self . controller else ' target ' ,
) ,
load = load ,
)
class ControllerProfile ( SshTargetHostProfile [ ControllerConfig ] , PosixProfile [ ControllerConfig ] ) :
""" Host profile for the controller as a target. """
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
settings = SshConnectionDetail (
name = ' localhost ' ,
host = ' localhost ' ,
port = None ,
user = ' root ' ,
identity_file = SshKey ( self . args ) . key ,
python_interpreter = self . args . controller_python . path ,
)
return [ SshConnection ( self . args , settings ) ]
class DockerProfile ( ControllerHostProfile [ DockerConfig ] , SshTargetHostProfile [ DockerConfig ] ) :
""" Host profile for a docker instance. """
@property
def container_name ( self ) : # type: () -> t.Optional[str]
""" Return the stored container name, if any, otherwise None. """
return self . state . get ( ' container_name ' )
@container_name.setter
def container_name ( self , value ) : # type: (str) -> None
""" Store the given container name. """
self . state [ ' container_name ' ] = value
def provision ( self ) : # type: () -> None
""" Provision the host before delegation. """
container = run_support_container (
args = self . args ,
context = ' __test_hosts__ ' ,
image = self . config . image ,
name = f ' ansible-test- { " controller " if self . controller else " target " } - { self . args . session_name } ' ,
ports = [ 22 ] ,
publish_ports = not self . controller , # connections to the controller over SSH are not required
options = self . get_docker_run_options ( ) ,
cleanup = CleanupMode . NO ,
)
if not container :
return
self . container_name = container . name
def setup ( self ) : # type: () -> None
""" Perform out-of-band setup before delegation. """
bootstrapper = BootstrapDocker (
controller = self . controller ,
python_versions = [ self . python . version ] ,
ssh_key = SshKey ( self . args ) ,
)
setup_sh = bootstrapper . get_script ( )
shell = setup_sh . splitlines ( ) [ 0 ] [ 2 : ]
docker_exec ( self . args , self . container_name , [ shell ] , data = setup_sh , capture = False )
def deprovision ( self ) : # type: () -> None
""" Deprovision the host after delegation has completed. """
if not self . container_name :
return # provision was never called or did not succeed, so there is no container to remove
if self . args . docker_terminate == TerminateMode . ALWAYS or ( self . args . docker_terminate == TerminateMode . SUCCESS and self . args . success ) :
docker_rm ( self . args , self . container_name )
def wait ( self ) : # type: () -> None
""" Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets. """
if not self . controller :
con = self . get_controller_target_connections ( ) [ 0 ]
for dummy in range ( 1 , 60 ) :
try :
con . run ( [ ' id ' ] , capture = True )
except SubprocessError as ex :
if ' Permission denied ' in ex . message :
raise
time . sleep ( 1 )
else :
return
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
containers = get_container_database ( self . args )
access = containers . data [ HostType . control ] [ ' __test_hosts__ ' ] [ self . container_name ]
host = access . host_ip
port = dict ( access . port_map ( ) ) [ 22 ]
settings = SshConnectionDetail (
name = self . config . name ,
user = ' root ' ,
host = host ,
port = port ,
identity_file = SshKey ( self . args ) . key ,
python_interpreter = self . python . path ,
)
return [ SshConnection ( self . args , settings ) ]
def get_origin_controller_connection ( self ) : # type: () -> DockerConnection
""" Return a connection for accessing the host as a controller from the origin. """
return DockerConnection ( self . args , self . container_name )
def get_working_directory ( self ) : # type: () -> str
""" Return the working directory for the host. """
return ' /root '
def get_docker_run_options ( self ) : # type: () -> t.List[str]
""" Return a list of options needed to run the container. """
options = [
' --volume ' , ' /sys/fs/cgroup:/sys/fs/cgroup:ro ' ,
f ' --privileged= { str ( self . config . privileged ) . lower ( ) } ' ,
]
if self . config . memory :
options . extend ( [
f ' --memory= { self . config . memory } ' ,
f ' --memory-swap= { self . config . memory } ' ,
] )
if self . config . seccomp != ' default ' :
options . extend ( [ ' --security-opt ' , f ' seccomp= { self . config . seccomp } ' ] )
docker_socket = ' /var/run/docker.sock '
if get_docker_hostname ( ) != ' localhost ' or os . path . exists ( docker_socket ) :
options . extend ( [ ' --volume ' , f ' { docker_socket } : { docker_socket } ' ] )
return options
class NetworkInventoryProfile ( HostProfile [ NetworkInventoryConfig ] ) :
""" Host profile for a network inventory. """
class NetworkRemoteProfile ( RemoteProfile [ NetworkRemoteConfig ] ) :
""" Host profile for a network remote instance. """
def wait ( self ) : # type: () -> None
""" Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets. """
self . wait_until_ready ( )
def get_inventory_variables ( self ) : # type: () -> t.Dict[str, t.Optional[t.Union[str, int]]]
""" Return inventory variables for accessing this host. """
core_ci = self . wait_for_instance ( )
connection = core_ci . connection
variables = dict (
ansible_connection = self . config . connection ,
ansible_pipelining = ' yes ' ,
ansible_host = connection . hostname ,
ansible_port = connection . port ,
ansible_user = connection . username ,
ansible_ssh_private_key_file = core_ci . ssh_key . key ,
ansible_network_os = f ' { self . config . collection } . { self . config . platform } ' if self . config . collection else self . config . platform ,
) # type: t.Dict[str, t.Optional[t.Union[str, int]]]
return variables
def wait_until_ready ( self ) : # type: () -> None
""" Wait for the host to respond to an Ansible module request. """
core_ci = self . wait_for_instance ( )
if not isinstance ( self . args , IntegrationConfig ) :
return # skip extended checks unless we're running integration tests
inventory = Inventory . create_single_host ( sanitize_host_name ( self . config . name ) , self . get_inventory_variables ( ) )
env = ansible_environment ( self . args )
module_name = f ' { self . config . collection + " . " if self . config . collection else " " } { self . config . platform } _command '
with tempfile . NamedTemporaryFile ( ) as inventory_file :
inventory . write ( self . args , inventory_file . name )
cmd = [ ' ansible ' , ' -m ' , module_name , ' -a ' , ' commands=? ' , ' -i ' , inventory_file . name , ' all ' ]
for dummy in range ( 1 , 90 ) :
try :
intercept_python ( self . args , self . args . controller_python , cmd , env , capture = True )
except SubprocessError as ex :
display . warning ( str ( ex ) )
time . sleep ( 10 )
else :
return
raise ApplicationError ( f ' Timeout waiting for { self . config . name } instance { core_ci . instance_id } . ' )
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
core_ci = self . wait_for_instance ( )
settings = SshConnectionDetail (
name = core_ci . name ,
host = core_ci . connection . hostname ,
port = core_ci . connection . port ,
user = core_ci . connection . username ,
identity_file = core_ci . ssh_key . key ,
)
return [ SshConnection ( self . args , settings ) ]
class OriginProfile ( ControllerHostProfile [ OriginConfig ] ) :
""" Host profile for origin. """
def get_origin_controller_connection ( self ) : # type: () -> LocalConnection
""" Return a connection for accessing the host as a controller from the origin. """
return LocalConnection ( self . args )
def get_working_directory ( self ) : # type: () -> str
""" Return the working directory for the host. """
return os . getcwd ( )
class PosixRemoteProfile ( ControllerHostProfile [ PosixRemoteConfig ] , RemoteProfile [ PosixRemoteConfig ] ) :
""" Host profile for a POSIX remote instance. """
def wait ( self ) : # type: () -> None
""" Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets. """
self . wait_until_ready ( )
def configure ( self ) : # type: () -> None
""" Perform in-band configuration. Executed before delegation for the controller and after delegation for targets. """
# a target uses a single python version, but a controller may include additional versions for targets running on the controller
python_versions = [ self . python . version ] + [ target . python . version for target in self . targets if isinstance ( target , ControllerConfig ) ]
python_versions = sorted_versions ( list ( set ( python_versions ) ) )
core_ci = self . wait_for_instance ( )
pwd = self . wait_until_ready ( )
display . info ( f ' Remote working directory: { pwd } ' , verbosity = 1 )
bootstrapper = BootstrapRemote (
controller = self . controller ,
platform = self . config . platform ,
platform_version = self . config . version ,
python_versions = python_versions ,
ssh_key = core_ci . ssh_key ,
)
setup_sh = bootstrapper . get_script ( )
shell = setup_sh . splitlines ( ) [ 0 ] [ 2 : ]
ssh = self . get_origin_controller_connection ( )
ssh . run ( [ shell ] , data = setup_sh , capture = False )
def get_ssh_connection ( self ) : # type: () -> SshConnection
""" Return an SSH connection for accessing the host. """
core_ci = self . wait_for_instance ( )
settings = SshConnectionDetail (
name = core_ci . name ,
user = core_ci . connection . username ,
host = core_ci . connection . hostname ,
port = core_ci . connection . port ,
identity_file = core_ci . ssh_key . key ,
python_interpreter = self . python . path ,
)
if settings . user == ' root ' :
become = None # type: t.Optional[Become]
elif self . config . platform == ' freebsd ' :
become = Su ( )
elif self . config . platform == ' macos ' :
become = Sudo ( )
elif self . config . platform == ' rhel ' :
become = Sudo ( )
elif self . config . platform == ' ubuntu ' :
become = Sudo ( )
else :
raise NotImplementedError ( f ' Become support has not been implemented for platform " { self . config . platform } " and user " { settings . user } " is not root. ' )
return SshConnection ( self . args , settings , become )
def wait_until_ready ( self ) : # type: () -> str
""" Wait for instance to respond to SSH, returning the current working directory once connected. """
core_ci = self . wait_for_instance ( )
for dummy in range ( 1 , 90 ) :
try :
return self . get_working_directory ( )
except SubprocessError as ex :
if ' Permission denied ' in ex . message :
raise
time . sleep ( 10 )
raise ApplicationError ( f ' Timeout waiting for { self . config . name } instance { core_ci . instance_id } . ' )
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
return [ self . get_ssh_connection ( ) ]
def get_origin_controller_connection ( self ) : # type: () -> SshConnection
""" Return a connection for accessing the host as a controller from the origin. """
return self . get_ssh_connection ( )
def get_working_directory ( self ) : # type: () -> str
""" Return the working directory for the host. """
if not self . pwd :
ssh = self . get_origin_controller_connection ( )
stdout = ssh . run ( [ ' pwd ' ] , capture = True ) [ 0 ]
if self . args . explain :
return ' /pwd '
pwd = stdout . strip ( ) . splitlines ( ) [ - 1 ]
if not pwd . startswith ( ' / ' ) :
raise Exception ( f ' Unexpected current working directory " { pwd } " from " pwd " command output: \n { stdout . strip ( ) } ' )
self . pwd = pwd
return self . pwd
@property
def pwd ( self ) : # type: () -> t.Optional[str]
""" Return the cached pwd, if any, otherwise None. """
return self . cache . get ( ' pwd ' )
@pwd.setter
def pwd ( self , value ) : # type: (str) -> None
""" Cache the given pwd. """
self . cache [ ' pwd ' ] = value
class PosixSshProfile ( SshTargetHostProfile [ PosixSshConfig ] , PosixProfile [ PosixSshConfig ] ) :
""" Host profile for a POSIX SSH instance. """
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
settings = SshConnectionDetail (
name = ' target ' ,
user = self . config . user ,
host = self . config . host ,
port = self . config . port ,
identity_file = SshKey ( self . args ) . key ,
python_interpreter = self . python . path ,
)
return [ SshConnection ( self . args , settings ) ]
class WindowsInventoryProfile ( SshTargetHostProfile [ WindowsInventoryConfig ] ) :
""" Host profile for a Windows inventory. """
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
inventory = parse_inventory ( self . args , self . config . path )
hosts = get_hosts ( inventory , ' windows ' )
identity_file = SshKey ( self . args ) . key
settings = [ SshConnectionDetail (
name = name ,
host = config [ ' ansible_host ' ] ,
port = 22 ,
user = config [ ' ansible_user ' ] ,
identity_file = identity_file ,
shell_type = ' powershell ' ,
) for name , config in hosts . items ( ) ]
if settings :
details = ' \n ' . join ( f ' { ssh . name } { ssh . user } @ { ssh . host } : { ssh . port } ' for ssh in settings )
display . info ( f ' Generated SSH connection details from inventory: \n { details } ' , verbosity = 1 )
return [ SshConnection ( self . args , setting ) for setting in settings ]
class WindowsRemoteProfile ( RemoteProfile [ WindowsRemoteConfig ] ) :
""" Host profile for a Windows remote instance. """
def wait ( self ) : # type: () -> None
""" Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets. """
self . wait_until_ready ( )
def get_inventory_variables ( self ) : # type: () -> t.Dict[str, t.Optional[t.Union[str, int]]]
""" Return inventory variables for accessing this host. """
core_ci = self . wait_for_instance ( )
connection = core_ci . connection
variables = dict (
ansible_connection = ' winrm ' ,
ansible_pipelining = ' yes ' ,
ansible_winrm_server_cert_validation = ' ignore ' ,
ansible_host = connection . hostname ,
ansible_port = connection . port ,
ansible_user = connection . username ,
ansible_password = connection . password ,
ansible_ssh_private_key_file = core_ci . ssh_key . key ,
) # type: t.Dict[str, t.Optional[t.Union[str, int]]]
# HACK: force 2016 to use NTLM + HTTP message encryption
if self . config . version == ' 2016 ' :
variables . update (
ansible_winrm_transport = ' ntlm ' ,
ansible_winrm_scheme = ' http ' ,
ansible_port = ' 5985 ' ,
)
return variables
def wait_until_ready ( self ) : # type: () -> None
""" Wait for the host to respond to an Ansible module request. """
core_ci = self . wait_for_instance ( )
if not isinstance ( self . args , IntegrationConfig ) :
return # skip extended checks unless we're running integration tests
inventory = Inventory . create_single_host ( sanitize_host_name ( self . config . name ) , self . get_inventory_variables ( ) )
env = ansible_environment ( self . args )
module_name = ' ansible.windows.win_ping '
with tempfile . NamedTemporaryFile ( ) as inventory_file :
inventory . write ( self . args , inventory_file . name )
cmd = [ ' ansible ' , ' -m ' , module_name , ' -i ' , inventory_file . name , ' all ' ]
for dummy in range ( 1 , 120 ) :
try :
intercept_python ( self . args , self . args . controller_python , cmd , env , capture = True )
except SubprocessError as ex :
display . warning ( str ( ex ) )
time . sleep ( 10 )
else :
return
raise ApplicationError ( f ' Timeout waiting for { self . config . name } instance { core_ci . instance_id } . ' )
def get_controller_target_connections ( self ) : # type: () -> t.List[SshConnection]
""" Return SSH connection(s) for accessing the host as a target from the controller. """
core_ci = self . wait_for_instance ( )
settings = SshConnectionDetail (
name = core_ci . name ,
host = core_ci . connection . hostname ,
port = 22 ,
user = core_ci . connection . username ,
identity_file = core_ci . ssh_key . key ,
shell_type = ' powershell ' ,
)
return [ SshConnection ( self . args , settings ) ]
@cache
def get_config_profile_type_map ( ) : # type: () -> t.Dict[t.Type[HostConfig], t.Type[HostProfile]]
""" Create and return a mapping of HostConfig types to HostProfile types. """
return get_type_map ( HostProfile , HostConfig )
def create_host_profile (
args , # type: EnvironmentConfig
config , # type: HostConfig
controller , # type: bool
) : # type: (...) -> HostProfile
""" Create and return a host profile from the given host configuration. """
profile_type = get_config_profile_type_map ( ) [ type ( config ) ]
profile = profile_type ( args = args , config = config , targets = args . targets if controller else None )
return profile