@ -57,6 +57,8 @@ def get_test_galaxy_api(url, version, token_ins=None, token_value=None):
token_value = token_value or " my token "
token_ins = token_ins or GalaxyToken ( token_value )
api = GalaxyAPI ( None , " test " , url )
# Warning, this doesn't test g_connect() because _availabe_api_versions is set here. That means
# that urls for v2 servers have to append '/api/' themselves in the input data.
api . _available_api_versions = { version : ' %s ' % version }
api . token = token_ins
@ -332,16 +334,16 @@ def test_publish_failure(api_version, collection_url, response, expected, collec
api . publish_collection ( collection_artifact )
@pytest.mark.parametrize ( ' api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
@pytest.mark.parametrize ( ' server_url, api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' https://galaxy.server.com/api' , ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/v2/collection-imports/1234 ' ) ,
( ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
( ' https://galaxy.server.com/api/automation-hub/' , ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/automation-hub/v3/imports/collections/1234/ ' ) ,
] )
def test_wait_import_task ( api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( ' https://galaxy.server.com/api/ ' , api_version , token_ins = token_ins )
def test_wait_import_task ( server_url, api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( server_url , api_version , token_ins = token_ins )
if token_ins :
mock_token_get = MagicMock ( )
@ -365,16 +367,16 @@ def test_wait_import_task(api_version, token_type, token_ins, import_uri, full_i
assert mock_display . mock_calls [ 0 ] [ 1 ] [ 0 ] == ' Waiting until Galaxy import task %s has completed ' % full_import_uri
@pytest.mark.parametrize ( ' api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
@pytest.mark.parametrize ( ' server_url, api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' https://galaxy.server.com/api/' , ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/v2/collection-imports/1234 ' ) ,
( ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
( ' https://galaxy.server.com/api/automation-hub' , ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/automation-hub/v3/imports/collections/1234/ ' ) ,
] )
def test_wait_import_task_multiple_requests ( api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( ' https://galaxy.server.com/api/ ' , api_version , token_ins = token_ins )
def test_wait_import_task_multiple_requests ( server_url, api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( server_url , api_version , token_ins = token_ins )
if token_ins :
mock_token_get = MagicMock ( )
@ -412,16 +414,16 @@ def test_wait_import_task_multiple_requests(api_version, token_type, token_ins,
' Galaxy import process has a status of test, wait 2 seconds before trying again '
@pytest.mark.parametrize ( ' api_version, token_type, token_ins, import_uri, full_import_uri,' , [
( ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
@pytest.mark.parametrize ( ' server_url, api_version, token_type, token_ins, import_uri, full_import_uri,' , [
( ' https://galaxy.server.com/api/' , ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/v2/collection-imports/1234 ' ) ,
( ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
( ' https://galaxy.server.com/api/automation-hub/' , ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/automation-hub/v3/imports/collections/1234/ ' ) ,
] )
def test_wait_import_task_with_failure ( api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( ' https://galaxy.server.com/api/ ' , api_version , token_ins = token_ins )
def test_wait_import_task_with_failure ( server_url, api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( server_url , api_version , token_ins = token_ins )
if token_ins :
mock_token_get = MagicMock ( )
@ -489,16 +491,16 @@ def test_wait_import_task_with_failure(api_version, token_type, token_ins, impor
assert mock_err . mock_calls [ 0 ] [ 1 ] [ 0 ] == u ' Galaxy import error message: Somé error '
@pytest.mark.parametrize ( ' api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' v2' , ' Token ' , GalaxyToken ( ' my_token ' ) ,
@pytest.mark.parametrize ( ' server_url, api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' https://galaxy.server.com/api/' , ' v2' , ' Token ' , GalaxyToken ( ' my_token ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/v2/collection-imports/1234 ' ) ,
( ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
( ' https://galaxy.server.com/api/automation-hub/' , ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/automation-hub/v3/imports/collections/1234/ ' ) ,
] )
def test_wait_import_task_with_failure_no_error ( api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( ' https://galaxy.server.com/api/ ' , api_version , token_ins = token_ins )
def test_wait_import_task_with_failure_no_error ( server_url, api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( server_url , api_version , token_ins = token_ins )
if token_ins :
mock_token_get = MagicMock ( )
@ -562,16 +564,16 @@ def test_wait_import_task_with_failure_no_error(api_version, token_type, token_i
assert mock_err . mock_calls [ 0 ] [ 1 ] [ 0 ] == u ' Galaxy import error message: Somé error '
@pytest.mark.parametrize ( ' api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
@pytest.mark.parametrize ( ' server_url, api_version, token_type, token_ins, import_uri, full_import_uri' , [
( ' https://galaxy.server.com/api' , ' v2' , ' Token ' , GalaxyToken ( ' my token ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/v2/collection-imports/1234 ' ) ,
( ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
( ' https://galaxy.server.com/api/automation-hub' , ' v3' , ' Bearer ' , KeycloakToken ( auth_url = ' https://api.test/ ' ) ,
' 1234 ' ,
' https://galaxy.server.com/api/automation-hub/v3/imports/collections/1234/ ' ) ,
] )
def test_wait_import_task_timeout ( api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( ' https://galaxy.server.com/api/ ' , api_version , token_ins = token_ins )
def test_wait_import_task_timeout ( server_url, api_version, token_type , token_ins , import_uri , full_import_uri , monkeypatch ) :
api = get_test_galaxy_api ( server_url , api_version , token_ins = token_ins )
if token_ins :
mock_token_get = MagicMock ( )