Source code for keystone.tests.test_keystoneclient_sql

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os
import uuid

from keystoneclient.contrib.ec2 import utils as ec2_utils

from keystone import config
from keystone import tests
from keystone.tests import test_keystoneclient


CONF = config.CONF


[docs]class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase):
[docs] def config_files(self): config_files = super(KcMasterSqlTestCase, self).config_files() config_files.append(tests.dirs.tests_conf('backend_sql.conf')) return config_files
[docs] def setUp(self): super(KcMasterSqlTestCase, self).setUp() self.default_client = self.get_client() self.addCleanup(self.cleanup_instance('default_client'))
[docs] def test_endpoint_crud(self): from keystoneclient import exceptions as client_exceptions client = self.get_client(admin=True) service = client.services.create(name=uuid.uuid4().hex, service_type=uuid.uuid4().hex, description=uuid.uuid4().hex) endpoint_region = uuid.uuid4().hex invalid_service_id = uuid.uuid4().hex endpoint_publicurl = uuid.uuid4().hex endpoint_internalurl = uuid.uuid4().hex endpoint_adminurl = uuid.uuid4().hex # a non-existent service ID should trigger a 404 self.assertRaises(client_exceptions.NotFound, client.endpoints.create, region=endpoint_region, service_id=invalid_service_id, publicurl=endpoint_publicurl, adminurl=endpoint_adminurl, internalurl=endpoint_internalurl) endpoint = client.endpoints.create(region=endpoint_region, service_id=service.id, publicurl=endpoint_publicurl, adminurl=endpoint_adminurl, internalurl=endpoint_internalurl) self.assertEqual(endpoint.region, endpoint_region) self.assertEqual(endpoint.service_id, service.id) self.assertEqual(endpoint.publicurl, endpoint_publicurl) self.assertEqual(endpoint.internalurl, endpoint_internalurl) self.assertEqual(endpoint.adminurl, endpoint_adminurl) client.endpoints.delete(id=endpoint.id) self.assertRaises(client_exceptions.NotFound, client.endpoints.delete, id=endpoint.id)
def _send_ec2_auth_request(self, credentials, client=None): if not client: client = self.default_client url = '%s/ec2tokens' % self.default_client.auth_url (resp, token) = client.request( url=url, method='POST', body={'credentials': credentials}) return resp, token def _generate_default_user_ec2_credentials(self): cred = self. default_client.ec2.create( user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) return self._generate_user_ec2_credentials(cred.access, cred.secret) def _generate_user_ec2_credentials(self, access, secret): signer = ec2_utils.Ec2Signer(secret) credentials = {'params': {'SignatureVersion': '2'}, 'access': access, 'verb': 'GET', 'host': 'localhost', 'path': '/service/cloud'} signature = signer.generate(credentials) return credentials, signature
[docs] def test_ec2_auth_success(self): credentials, signature = self._generate_default_user_ec2_credentials() credentials['signature'] = signature resp, token = self._send_ec2_auth_request(credentials) self.assertEqual(resp.status_code, 200) self.assertIn('access', token)
[docs] def test_ec2_auth_success_trust(self): # Add "other" role user_foo and create trust delegating it to user_two self.assignment_api.add_role_to_user_and_project( self.user_foo['id'], self.tenant_bar['id'], self.role_other['id']) trust_id = 'atrust123' trust = {'trustor_user_id': self.user_foo['id'], 'trustee_user_id': self.user_two['id'], 'project_id': self.tenant_bar['id'], 'impersonation': True} roles = [self.role_other] self.trust_api.create_trust(trust_id, trust, roles) # Create a client for user_two, scoped to the trust client = self.get_client(self.user_two) ret = client.authenticate(trust_id=trust_id, tenant_id=self.tenant_bar['id']) self.assertTrue(ret) self.assertTrue(client.auth_ref.trust_scoped) self.assertEqual(trust_id, client.auth_ref.trust_id) # Create an ec2 keypair using the trust client impersonating user_foo cred = client.ec2.create(user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) credentials, signature = self._generate_user_ec2_credentials( cred.access, cred.secret) credentials['signature'] = signature resp, token = self._send_ec2_auth_request(credentials) self.assertEqual(resp.status_code, 200) self.assertEqual(trust_id, token['access']['trust']['id']) #TODO(shardy) we really want to check the roles and trustee # but because of where the stubbing happens we don't seem to # hit the necessary code in controllers.py _authenticate_token # so although all is OK via a real request, it incorrect in # this test..
[docs] def test_ec2_auth_failure(self): from keystoneclient import exceptions as client_exceptions credentials, signature = self._generate_default_user_ec2_credentials() credentials['signature'] = uuid.uuid4().hex self.assertRaises(client_exceptions.Unauthorized, self._send_ec2_auth_request, credentials)
[docs] def test_ec2_credential_crud(self): creds = self.default_client.ec2.list(user_id=self.user_foo['id']) self.assertEqual(creds, []) cred = self.default_client.ec2.create(user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) creds = self.default_client.ec2.list(user_id=self.user_foo['id']) self.assertEqual(creds, [cred]) got = self.default_client.ec2.get(user_id=self.user_foo['id'], access=cred.access) self.assertEqual(cred, got) self.default_client.ec2.delete(user_id=self.user_foo['id'], access=cred.access) creds = self.default_client.ec2.list(user_id=self.user_foo['id']) self.assertEqual(creds, [])
[docs] def test_ec2_credential_crud_non_admin(self): na_client = self.get_client(self.user_two) creds = na_client.ec2.list(user_id=self.user_two['id']) self.assertEqual(creds, []) cred = na_client.ec2.create(user_id=self.user_two['id'], tenant_id=self.tenant_baz['id']) creds = na_client.ec2.list(user_id=self.user_two['id']) self.assertEqual(creds, [cred]) got = na_client.ec2.get(user_id=self.user_two['id'], access=cred.access) self.assertEqual(cred, got) na_client.ec2.delete(user_id=self.user_two['id'], access=cred.access) creds = na_client.ec2.list(user_id=self.user_two['id']) self.assertEqual(creds, [])
[docs] def test_ec2_list_credentials(self): cred_1 = self.default_client.ec2.create( user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) cred_2 = self.default_client.ec2.create( user_id=self.user_foo['id'], tenant_id=self.tenant_service['id']) cred_3 = self.default_client.ec2.create( user_id=self.user_foo['id'], tenant_id=self.tenant_mtu['id']) two = self.get_client(self.user_two) cred_4 = two.ec2.create(user_id=self.user_two['id'], tenant_id=self.tenant_bar['id']) creds = self.default_client.ec2.list(user_id=self.user_foo['id']) self.assertEqual(len(creds), 3) self.assertEqual(sorted([cred_1, cred_2, cred_3], key=lambda x: x.access), sorted(creds, key=lambda x: x.access)) self.assertNotIn(cred_4, creds)
[docs] def test_ec2_credentials_create_404(self): from keystoneclient import exceptions as client_exceptions self.assertRaises(client_exceptions.NotFound, self.default_client.ec2.create, user_id=uuid.uuid4().hex, tenant_id=self.tenant_bar['id']) self.assertRaises(client_exceptions.NotFound, self.default_client.ec2.create, user_id=self.user_foo['id'], tenant_id=uuid.uuid4().hex)
[docs] def test_ec2_credentials_delete_404(self): from keystoneclient import exceptions as client_exceptions self.assertRaises(client_exceptions.NotFound, self.default_client.ec2.delete, user_id=uuid.uuid4().hex, access=uuid.uuid4().hex)
[docs] def test_ec2_credentials_get_404(self): from keystoneclient import exceptions as client_exceptions self.assertRaises(client_exceptions.NotFound, self.default_client.ec2.get, user_id=uuid.uuid4().hex, access=uuid.uuid4().hex)
[docs] def test_ec2_credentials_list_404(self): from keystoneclient import exceptions as client_exceptions self.assertRaises(client_exceptions.NotFound, self.default_client.ec2.list, user_id=uuid.uuid4().hex)
[docs] def test_ec2_credentials_list_user_forbidden(self): from keystoneclient import exceptions as client_exceptions two = self.get_client(self.user_two) self.assertRaises(client_exceptions.Forbidden, two.ec2.list, user_id=self.user_foo['id'])
[docs] def test_ec2_credentials_get_user_forbidden(self): from keystoneclient import exceptions as client_exceptions cred = self.default_client.ec2.create(user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) two = self.get_client(self.user_two) self.assertRaises(client_exceptions.Forbidden, two.ec2.get, user_id=self.user_foo['id'], access=cred.access) self.default_client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
[docs] def test_ec2_credentials_delete_user_forbidden(self): from keystoneclient import exceptions as client_exceptions cred = self.default_client.ec2.create(user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) two = self.get_client(self.user_two) self.assertRaises(client_exceptions.Forbidden, two.ec2.delete, user_id=self.user_foo['id'], access=cred.access) self.default_client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
[docs] def test_endpoint_create_404(self): from keystoneclient import exceptions as client_exceptions client = self.get_client(admin=True) self.assertRaises(client_exceptions.NotFound, client.endpoints.create, region=uuid.uuid4().hex, service_id=uuid.uuid4().hex, publicurl=uuid.uuid4().hex, adminurl=uuid.uuid4().hex, internalurl=uuid.uuid4().hex)
[docs] def test_endpoint_delete_404(self): from keystoneclient import exceptions as client_exceptions client = self.get_client(admin=True) self.assertRaises(client_exceptions.NotFound, client.endpoints.delete, id=uuid.uuid4().hex)
[docs] def test_policy_crud(self): # FIXME(dolph): this test was written prior to the v3 implementation of # the client and essentially refers to a non-existent # policy manager in the v2 client. this test needs to be # moved to a test suite running against the v3 api self.skipTest('Written prior to v3 client; needs refactor') from keystoneclient import exceptions as client_exceptions client = self.get_client(admin=True) policy_blob = uuid.uuid4().hex policy_type = uuid.uuid4().hex service = client.services.create( name=uuid.uuid4().hex, service_type=uuid.uuid4().hex, description=uuid.uuid4().hex) endpoint = client.endpoints.create( service_id=service.id, region=uuid.uuid4().hex, adminurl=uuid.uuid4().hex, internalurl=uuid.uuid4().hex, publicurl=uuid.uuid4().hex) # create policy = client.policies.create( blob=policy_blob, type=policy_type, endpoint=endpoint.id) self.assertEqual(policy_blob, policy.policy) self.assertEqual(policy_type, policy.type) self.assertEqual(endpoint.id, policy.endpoint_id) policy = client.policies.get(policy=policy.id) self.assertEqual(policy_blob, policy.policy) self.assertEqual(policy_type, policy.type) self.assertEqual(endpoint.id, policy.endpoint_id) endpoints = [x for x in client.endpoints.list() if x.id == endpoint.id] endpoint = endpoints[0] self.assertEqual(policy_blob, policy.policy) self.assertEqual(policy_type, policy.type) self.assertEqual(endpoint.id, policy.endpoint_id) # update policy_blob = uuid.uuid4().hex policy_type = uuid.uuid4().hex endpoint = client.endpoints.create( service_id=service.id, region=uuid.uuid4().hex, adminurl=uuid.uuid4().hex, internalurl=uuid.uuid4().hex, publicurl=uuid.uuid4().hex) policy = client.policies.update( policy=policy.id, blob=policy_blob, type=policy_type, endpoint=endpoint.id) policy = client.policies.get(policy=policy.id) self.assertEqual(policy_blob, policy.policy) self.assertEqual(policy_type, policy.type) self.assertEqual(endpoint.id, policy.endpoint_id) # delete client.policies.delete(policy=policy.id) self.assertRaises( client_exceptions.NotFound, client.policies.get, policy=policy.id) policies = [x for x in client.policies.list() if x.id == policy.id] self.assertEqual(len(policies), 0)
[docs]class KcOptTestCase(KcMasterSqlTestCase): # Set KSCTEST_PATH to the keystoneclient directory, then run this test. # # For example, to test your local keystoneclient, # # KSCTEST_PATH=/opt/stack/python-keystoneclient \ # tox -e py27 test_keystoneclient_sql.KcOptTestCase
[docs] def setUp(self): self.checkout_info = os.environ.get('KSCTEST_PATH') if not self.checkout_info: self.skip('Set KSCTEST_PATH env to test with local client') super(KcOptTestCase, self).setUp()