Source code for infrahouse_core.aws.iam_role

"""
IAM Role resource wrapper.

Provides ``exists`` / ``delete()`` support with dependency-aware teardown
(detach policies, remove from instance profiles, then delete the role).
"""

from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING

from botocore.exceptions import ClientError
from cached_property import cached_property_with_ttl

from infrahouse_core.aws.base import AWSResource
from infrahouse_core.aws.iam_policy import IAMPolicy

if TYPE_CHECKING:
    from infrahouse_core.aws.iam_instance_profile import IAMInstanceProfile

LOG = getLogger(__name__)


[docs] class IAMRole(AWSResource): """Wrapper around an IAM role. :param role_name: Name of the IAM role. :param region: AWS region. :param role_arn: IAM role ARN for cross-account access. """ def __init__(self, role_name, region=None, role_arn=None, session=None): super().__init__(role_name, "iam", region=region, role_arn=role_arn, session=session) @property def role_name(self) -> str: """Return the name of the role. :rtype: str """ return self._resource_id @property def exists(self) -> bool: """Return ``True`` if the role exists.""" try: self._client.get_role(RoleName=self._resource_id) return True except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": return False raise
[docs] def delete(self) -> None: """Delete the role after detaching all policies and instance profiles. Teardown order: 1. Detach all managed policies. 2. Delete all inline policies. 3. Remove role from all instance profiles. 4. Delete the role itself. Idempotent -- does nothing if the role does not exist. """ try: self._detach_managed_policies() self._delete_inline_policies() self._remove_from_instance_profiles() self._client.delete_role(RoleName=self._resource_id) LOG.info("Deleted IAM role %s", self._resource_id) except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": LOG.info("IAM role %s does not exist.", self._resource_id) else: raise
[docs] @cached_property_with_ttl(ttl=10) def managed_policies(self) -> list[IAMPolicy]: """Return a list of managed policies attached to the role. :return: List of :class:`IAMPolicy` instances. :rtype: list[IAMPolicy] """ policies = [] paginator = self._client.get_paginator("list_attached_role_policies") for page in paginator.paginate(RoleName=self._resource_id): for policy in page["AttachedPolicies"]: policies.append( IAMPolicy(policy["PolicyArn"], region=self._region, role_arn=self._role_arn, session=self._session) ) return policies
[docs] def detach_policy(self, policy: IAMPolicy) -> None: """Detach a managed policy from the role. :param policy: The managed policy to detach. :type policy: IAMPolicy """ self._client.detach_role_policy( RoleName=self._resource_id, PolicyArn=policy.policy_arn, ) LOG.debug("Detached policy %s from role %s", policy.policy_arn, self._resource_id)
def _detach_managed_policies(self) -> None: """Detach all managed policies from the role. Iterates through all attached managed policies (via :attr:`managed_policies`) and calls :meth:`detach_policy` for each one. :raises ClientError: If the IAM API call to detach a policy fails. ``NoSuchEntity`` errors are not caught here; the caller is responsible for handling them. """ policies = list(self.managed_policies) if policies: LOG.info("Detaching %d managed policies from role %s", len(policies), self._resource_id) for policy in policies: self.detach_policy(policy) def _delete_inline_policies(self) -> None: """Delete all inline policies from the role. Lists all inline policy names via ``list_role_policies`` pagination, then deletes each one with ``delete_role_policy``. :raises ClientError: If the IAM API call to list or delete a policy fails. ``NoSuchEntity`` errors are not caught here; the caller is responsible for handling them. """ policy_names = [] paginator = self._client.get_paginator("list_role_policies") for page in paginator.paginate(RoleName=self._resource_id): policy_names.extend(page["PolicyNames"]) if policy_names: LOG.info("Deleting %d inline policies from role %s", len(policy_names), self._resource_id) for policy_name in policy_names: self._client.delete_role_policy( RoleName=self._resource_id, PolicyName=policy_name, ) LOG.debug("Deleted inline policy %s from role %s", policy_name, self._resource_id)
[docs] @cached_property_with_ttl(ttl=10) def instance_profiles(self) -> list[IAMInstanceProfile]: """Return instance profiles that have this role attached. :return: List of :class:`IAMInstanceProfile` instances. :rtype: list[IAMInstanceProfile] """ # pylint: disable-next=import-outside-toplevel from infrahouse_core.aws.iam_instance_profile import IAMInstanceProfile profiles = [] paginator = self._client.get_paginator("list_instance_profiles_for_role") for page in paginator.paginate(RoleName=self._resource_id): for profile in page["InstanceProfiles"]: profiles.append( IAMInstanceProfile( profile["InstanceProfileName"], region=self._region, role_arn=self._role_arn, session=self._session, ) ) return profiles
def _remove_from_instance_profiles(self) -> None: """Remove the role from all instance profiles. Iterates through all instance profiles (via :attr:`instance_profiles`) and calls :meth:`~IAMInstanceProfile.remove_role` on each one. :raises ClientError: If the IAM API call to remove the role fails. ``NoSuchEntity`` errors are not caught here; the caller is responsible for handling them. """ profiles = list(self.instance_profiles) if profiles: LOG.info("Removing role %s from %d instance profiles", self._resource_id, len(profiles)) for profile in profiles: profile.remove_role()