"""
IAM Policy resource wrapper.
Provides ``exists`` / ``delete()`` support with dependency-aware teardown
(detach from all entities, delete non-default versions, then delete the policy).
"""
from __future__ import annotations
from logging import getLogger
from typing import TYPE_CHECKING
from botocore.exceptions import ClientError
from infrahouse_core.aws.base import AWSResource
if TYPE_CHECKING:
from infrahouse_core.aws.iam_group import IAMGroup
from infrahouse_core.aws.iam_role import IAMRole
from infrahouse_core.aws.iam_user import IAMUser
LOG = getLogger(__name__)
[docs]
class IAMPolicy(AWSResource):
"""Wrapper around an IAM managed policy.
:param policy_arn: ARN of the IAM policy.
:param region: AWS region.
:param role_arn: IAM role ARN for cross-account access.
"""
def __init__(self, policy_arn, region=None, role_arn=None, session=None):
super().__init__(policy_arn, "iam", region=region, role_arn=role_arn, session=session)
self._attached_roles = None
self._attached_users = None
self._attached_groups = None
@property
def policy_arn(self) -> str:
"""Return the ARN of the policy.
:rtype: str
"""
return self._resource_id
@property
def is_aws_managed(self) -> bool:
"""Return ``True`` if this is an AWS-managed policy.
AWS-managed policies (e.g. ``arn:aws:iam::aws:policy/ReadOnlyAccess``)
cannot be deleted or modified.
:rtype: bool
"""
return ":iam::aws:policy/" in self._resource_id
@property
def exists(self) -> bool:
"""Return ``True`` if the policy exists."""
try:
self._client.get_policy(PolicyArn=self._resource_id)
return True
except ClientError as err:
if err.response["Error"]["Code"] == "NoSuchEntity":
return False
raise
# -- Attached entities ---------------------------------------------------
def _fetch_attached_entities(self) -> None:
"""Fetch all attached roles, users, and groups in a single pagination.
Uses ``list_entities_for_policy`` to retrieve all three entity types
in one paginated call, then stores the results in ``_attached_roles``,
``_attached_users``, and ``_attached_groups``. Call
:meth:`_reset_attached_entities` to invalidate the cache.
:raises ClientError: If the IAM API call to list entities fails.
"""
# pylint: disable=import-outside-toplevel
from infrahouse_core.aws.iam_group import IAMGroup
from infrahouse_core.aws.iam_role import IAMRole
from infrahouse_core.aws.iam_user import IAMUser
roles, users, groups = [], [], []
paginator = self._client.get_paginator("list_entities_for_policy")
for page in paginator.paginate(PolicyArn=self._resource_id):
for role in page.get("PolicyRoles", []):
roles.append(
IAMRole(role["RoleName"], region=self._region, role_arn=self._role_arn, session=self._session)
)
for user in page.get("PolicyUsers", []):
users.append(
IAMUser(user["UserName"], region=self._region, role_arn=self._role_arn, session=self._session)
)
for group in page.get("PolicyGroups", []):
groups.append(
IAMGroup(group["GroupName"], region=self._region, role_arn=self._role_arn, session=self._session)
)
self._attached_roles = roles
self._attached_users = users
self._attached_groups = groups
def _reset_attached_entities(self) -> None:
"""Invalidate the cached attached-entity lists."""
self._attached_roles = None
self._attached_users = None
self._attached_groups = None
@property
def attached_roles(self) -> list[IAMRole]:
"""Return roles that have this policy attached.
:return: List of :class:`IAMRole` instances.
:rtype: list[IAMRole]
"""
if self._attached_roles is None:
self._fetch_attached_entities()
return self._attached_roles
@property
def attached_users(self) -> list[IAMUser]:
"""Return users that have this policy attached.
:return: List of :class:`IAMUser` instances.
:rtype: list[IAMUser]
"""
if self._attached_users is None:
self._fetch_attached_entities()
return self._attached_users
@property
def attached_groups(self) -> list[IAMGroup]:
"""Return groups that have this policy attached.
:return: List of :class:`IAMGroup` instances.
:rtype: list[IAMGroup]
"""
if self._attached_groups is None:
self._fetch_attached_entities()
return self._attached_groups
# -- Delete --------------------------------------------------------------
[docs]
def delete(self) -> None:
"""Delete the policy after detaching from all entities and removing non-default versions.
Teardown order:
1. Detach from all IAM roles, users, and groups.
2. Delete all non-default policy versions.
3. Delete the policy itself.
AWS-managed policies cannot be deleted and are silently skipped.
Idempotent -- does nothing if the policy does not exist.
"""
if self.is_aws_managed:
LOG.info("Skipping deletion of AWS-managed policy %s", self._resource_id)
return
try:
self._detach_from_all_entities()
self._delete_non_default_versions()
self._client.delete_policy(PolicyArn=self._resource_id)
LOG.info("Deleted IAM policy %s", self._resource_id)
except ClientError as err:
if err.response["Error"]["Code"] == "NoSuchEntity":
LOG.info("IAM policy %s does not exist.", self._resource_id)
else:
raise
def _detach_from_all_entities(self) -> None:
"""Detach the policy from all roles, users, and groups.
Delegates to each entity's own ``detach_policy()`` method
(:meth:`~IAMRole.detach_policy`, :meth:`~IAMUser.detach_policy`,
:meth:`~IAMGroup.detach_policy`). Invalidates the attached-entity
cache after all detach operations complete.
:raises ClientError: If the IAM API call to detach a policy fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
roles, users, groups = self.attached_roles, self.attached_users, self.attached_groups
total = len(roles) + len(users) + len(groups)
if total:
LOG.info(
"Detaching policy %s from %d entities (%d roles, %d users, %d groups)",
self._resource_id,
total,
len(roles),
len(users),
len(groups),
)
for role in roles:
role.detach_policy(self)
for user in users:
user.detach_policy(self)
for group in groups:
group.detach_policy(self)
self._reset_attached_entities()
def _delete_non_default_versions(self) -> None:
"""Delete all non-default policy versions.
Lists all versions via ``list_policy_versions`` pagination, then
deletes each non-default version with ``delete_policy_version``.
The default version cannot be deleted directly; it is removed
automatically when the policy itself is deleted.
:raises ClientError: If the IAM API call to list or delete versions fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
non_default = []
paginator = self._client.get_paginator("list_policy_versions")
for page in paginator.paginate(PolicyArn=self._resource_id):
for version in page["Versions"]:
if not version["IsDefaultVersion"]:
non_default.append(version["VersionId"])
if non_default:
LOG.info("Deleting %d non-default versions of policy %s", len(non_default), self._resource_id)
for version_id in non_default:
self._client.delete_policy_version(
PolicyArn=self._resource_id,
VersionId=version_id,
)
LOG.debug("Deleted policy version %s of %s", version_id, self._resource_id)