Skip to content
Snippets Groups Projects
Commit 15c8f45e authored by kaiyou's avatar kaiyou
Browse files

Add end to end authentication tests

parent faaf72e9
No related branches found
No related tags found
No related merge requests found
import pytest
import tempfile
import subprocess
import random
import os
import time
import requests
@pytest.fixture
# Generic usefule fixtures
@pytest.fixture
def temp():
"""Write data to a temporary file and return the handle
This is a fixture instead of a utility function so that
temporary files have a test-scoped lifetime
"""
handles = []
def fun(data):
handle = tempfile.NamedTemporaryFile(delete=False)
if type(data) is str:
data = data.encode("utf8")
handle.write(data)
handle.flush()
handles.append(handle)
return handle.name
yield fun
del handles # Useless but explicit
# Test application instance
@pytest.fixture(scope="module")
def app(username, password):
"""Run an isolated application instance and return the URL"""
data = tempfile.TemporaryDirectory()
port = 5000 + random.randint(1, 999) # Port = 5XXX
url = f"http://localhost:{port}"
env = os.environ
env.update(FLASK_APP="hiboo", SQLALCHEMY_DATABASE_URI=f"sqlite:///{data.name}/hiboo.db")
subprocess.run(["flask", "db", "upgrade"], env=env)
subprocess.run(["flask", "user", "create", username, password], env=env)
subprocess.run(["flask", "user", "promote", username], env=env)
proc = subprocess.Popen(["flask", "run", "-p", str(port)], env=env)
# Wait for the server to be ready
for _ in range(30):
try:
assert requests.get(url).status_code == 200
except Exception as e:
print(e)
time.sleep(1)
else:
yield url
break
proc.terminate()
proc.wait()
del data
@pytest.fixture(scope="session")
def username():
"""Default username for tests"""
return "admin"
@pytest.fixture
@pytest.fixture(scope="session")
def password():
"""Default password for tests"""
return "admin"
@pytest.fixture
def service_name():
"""A randomly generated service name"""
return "Test service " + random.randbytes(3).hex()
# Test webserver
@pytest.fixture
def httpd(temp):
"""Test httpd server"""
proc = []
def start(config):
print(config)
proc.append(subprocess.Popen(["httpd", "-DFOREGROUND", "-f", temp(config)]))
time.sleep(1) # Sleep so apache can start
yield start
for pid in proc:
pid.terminate()
pid.wait()
@pytest.fixture
def httpd_minimal():
"""Minimal httpd config"""
rootDirectory = tempfile.TemporaryDirectory()
with open(os.path.join(rootDirectory.name, "index.html"), "w") as handle:
handle.write("Hello world!")
yield f"""
ServerName localhost
ServerRoot /usr/lib/httpd
PidFile /tmp/apache.pid
LoadModule mpm_event_module modules/mod_mpm_event.so
LoadModule unixd_module modules/mod_unixd.so
LoadModule env_module modules/mod_env.so
LoadModule dir_module modules/mod_dir.so
LoadModule log_config_module modules/mod_log_config.so
LoadModule authn_core_module modules/mod_authn_core.so
LoadModule authz_core_module modules/mod_authz_core.so
LoadModule authz_user_module modules/mod_authz_user.so
LogLevel debug
ErrorLog /dev/stderr
TransferLog /dev/stdout
Listen 127.0.0.1:8123
DocumentRoot {rootDirectory.name}
DirectoryIndex index.html
"""
del rootDirectory
@pytest.fixture
def httpd_saml(httpd_minimal):
return (httpd_minimal + """
LoadModule auth_mellon_module modules/mod_auth_mellon.so
<Location />
Require valid-user
AuthType "Mellon"
MellonEnable "auth"
MellonEndpointPath "/mellon"
MellonSPPrivateKeyFile {key}
MellonSPCertFile {cert}
MellonIdPMetadataFile {metadata}
SetEnv MELLON_DISABLE_SAMESITE 1
</Location>
""")
@pytest.fixture
def httpd_oidc(httpd_minimal):
return (httpd_minimal + """
LoadModule auth_openidc_module modules/mod_auth_openidc.so
OIDCProviderMetadataURL {metadata}
OIDCClientID {client_id}
OIDCClientSecret {client_secret}
OIDCRedirectURI http://localhost:8123/redirect_uri
OIDCCryptoPassphrase changeme
OIDCScope "openid email profile"
<Location />
Require valid-user
AuthType "openid-connect"
</Location>
""")
from playwright import sync_api as pw
import requests
def test_has_login_button(page: pw.Page):
"""Test that homepage has a login button when not logged in"""
page.goto("/")
pw.expect(page.get_by_role("button", name="Sign in")).to_be_visible()
def test_login(page: pw.Page, username: str, password: str):
"""Test that logs in as default test user"""
page.goto("/")
def do_login(page, username, password):
"""Logs into hiboo"""
page.get_by_label("Username").fill(username)
page.get_by_label("Password").fill(password)
page.get_by_role("button", name="Sign in").click()
def test_login(app, context, username, password):
"""Test that logs in as default test user"""
page = context.new_page()
page.goto(app)
do_login(page, username, password)
pw.expect(page.get_by_text("Sign out")).to_be_visible()
def test_saml_auth(app, context, username, password, service_name, httpd, httpd_saml, temp):
# First create a SAML service
page = context.new_page()
page.goto(app + "/service/create/saml")
do_login(page, username, password)
page.get_by_label("Service name").fill(service_name)
page.get_by_label("Provider").fill("test")
page.get_by_label("Description").fill("test")
page.get_by_label("Profile policy").select_option("open")
page.get_by_label("Maximum profile count").fill("10")
page.get_by_label("SP entity id").fill("http://localhost:8123/mellon/metadata")
page.get_by_label("SP ACS").fill("http://localhost:8123/mellon/postResponse")
page.get_by_role("button", name="submit").click()
# Then access the service and extract useful data
page.get_by_text(service_name).click()
httpd(httpd_saml.format(
metadata=temp(requests.get(page.get_by_label("SAML Metadata").text_content()).content),
key=temp(page.get_by_label("SP private key").text_content()),
cert=temp(page.get_by_label("SP certificate").text_content())
))
# Finally log into the service provider, validate a new profile and get in
page.goto("http://localhost:8123/")
page.get_by_role("button", name="Sign up").click()
pw.expect(page.get_by_text("Hello world")).to_be_visible()
def test_oidc_auth(app, context, username, password, service_name, httpd, httpd_oidc, temp):
# First create an OIDC service
page = context.new_page()
page.goto(app + "/service/create/oidc")
do_login(page, username, password)
page.get_by_label("Service name").fill(service_name)
page.get_by_label("Provider").fill("test")
page.get_by_label("Description").fill("test")
page.get_by_label("Profile policy").select_option("open")
page.get_by_label("Maximum profile count").fill("10")
page.get_by_label("Redirect URI").fill("http://localhost:8123/redirect_uri")
page.get_by_label("OpenID Connect grant type").select_option("authorization_code")
page.get_by_label("Allowed response types").select_option("code")
page.get_by_role("button", name="submit").click()
# Then access the service and extract useful data
page.get_by_text(service_name).click()
httpd(httpd_oidc.format(
metadata=page.get_by_label("OIDC discovery endpoint").text_content(),
client_id=page.get_by_label("Client ID").text_content(),
client_secret=page.get_by_label("Client secret").text_content()
))
# Finally log into the client app, validate a new profile and get in
page.goto("http://localhost:8123/")
page.get_by_role("button", name="Sign up").click()
pw.expect(page.get_by_text("Hello world")).to_be_visible()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment