Source code for infrahouse_core.aws.iam_group
"""
IAM Group resource wrapper.
Provides ``exists`` / ``delete()`` support with dependency-aware teardown
(detach policies, delete inline policies, remove all users, then delete the group).
"""
from __future__ import annotations
from logging import getLogger
from typing import TYPE_CHECKING
from botocore.exceptions import ClientError
from cached_property import cached_property_with_ttl
from infrahouse_core.aws.base import AWSResource
if TYPE_CHECKING:
from infrahouse_core.aws.iam_policy import IAMPolicy
from infrahouse_core.aws.iam_user import IAMUser
LOG = getLogger(__name__)
[docs]
class IAMGroup(AWSResource):
"""Wrapper around an IAM group.
:param group_name: Name of the IAM group.
:param region: AWS region.
:param role_arn: IAM role ARN for cross-account access.
"""
def __init__(self, group_name, region=None, role_arn=None, session=None):
super().__init__(group_name, "iam", region=region, role_arn=role_arn, session=session)
@property
def group_name(self) -> str:
"""Return the name of the group.
:rtype: str
"""
return self._resource_id
@property
def exists(self) -> bool:
"""Return ``True`` if the group exists."""
try:
self._client.get_group(GroupName=self._resource_id)
return True
except ClientError as err:
if err.response["Error"]["Code"] == "NoSuchEntity":
return False
raise
# -- Users ---------------------------------------------------------------
[docs]
@cached_property_with_ttl(ttl=10)
def users(self) -> list[IAMUser]:
"""Return users that belong to this group.
:return: List of :class:`IAMUser` instances.
:rtype: list[IAMUser]
"""
# pylint: disable-next=import-outside-toplevel
from infrahouse_core.aws.iam_user import IAMUser
result = []
paginator = self._client.get_paginator("get_group")
for page in paginator.paginate(GroupName=self._resource_id):
for user in page["Users"]:
result.append(
IAMUser(user["UserName"], region=self._region, role_arn=self._role_arn, session=self._session)
)
return result
[docs]
def remove_user(self, user: IAMUser) -> None:
"""Remove a user from this group.
:param user: The IAM user to remove.
:type user: IAMUser
"""
self._client.remove_user_from_group(
GroupName=self._resource_id,
UserName=user.user_name,
)
LOG.debug("Removed user %s from group %s", user.user_name, self._resource_id)
# -- Policy operations ---------------------------------------------------
[docs]
@cached_property_with_ttl(ttl=10)
def managed_policies(self) -> list[IAMPolicy]:
"""Return managed policies attached to this group.
:return: List of :class:`IAMPolicy` instances.
:rtype: list[IAMPolicy]
"""
# pylint: disable-next=import-outside-toplevel
from infrahouse_core.aws.iam_policy import IAMPolicy
policies = []
paginator = self._client.get_paginator("list_attached_group_policies")
for page in paginator.paginate(GroupName=self._resource_id):
for policy in page["AttachedPolicies"]:
policies.append(
IAMPolicy(policy["PolicyArn"], region=self._region, role_arn=self._role_arn, session=self._session)
)
return policies
[docs]
def detach_policy(self, policy: IAMPolicy) -> None:
"""Detach a managed policy from the group.
:param policy: The managed policy to detach.
:type policy: IAMPolicy
"""
self._client.detach_group_policy(
GroupName=self._resource_id,
PolicyArn=policy.policy_arn,
)
LOG.debug("Detached policy %s from group %s", policy.policy_arn, self._resource_id)
# -- Delete --------------------------------------------------------------
[docs]
def delete(self) -> None:
"""Delete the group after removing all dependencies.
Teardown order:
1. Detach all managed policies.
2. Delete all inline policies.
3. Remove all users from the group.
4. Delete the group itself.
Idempotent -- does nothing if the group does not exist.
"""
try:
self._detach_managed_policies()
self._delete_inline_policies()
self._remove_all_users()
self._client.delete_group(GroupName=self._resource_id)
LOG.info("Deleted IAM group %s", self._resource_id)
except ClientError as err:
if err.response["Error"]["Code"] == "NoSuchEntity":
LOG.info("IAM group %s does not exist.", self._resource_id)
else:
raise
def _detach_managed_policies(self) -> None:
"""Detach all managed policies from the group.
Iterates through all attached managed policies (via :attr:`managed_policies`)
and calls :meth:`detach_policy` for each one.
:raises ClientError: If the IAM API call to detach a policy fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
policies = list(self.managed_policies)
if policies:
LOG.info("Detaching %d managed policies from group %s", len(policies), self._resource_id)
for policy in policies:
self.detach_policy(policy)
def _delete_inline_policies(self) -> None:
"""Delete all inline policies from the group.
Lists all inline policy names via ``list_group_policies`` pagination,
then deletes each one with ``delete_group_policy``.
:raises ClientError: If the IAM API call to list or delete a policy fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
policy_names = []
paginator = self._client.get_paginator("list_group_policies")
for page in paginator.paginate(GroupName=self._resource_id):
policy_names.extend(page["PolicyNames"])
if policy_names:
LOG.info("Deleting %d inline policies from group %s", len(policy_names), self._resource_id)
for policy_name in policy_names:
self._client.delete_group_policy(
GroupName=self._resource_id,
PolicyName=policy_name,
)
LOG.debug("Deleted inline policy %s from group %s", policy_name, self._resource_id)
def _remove_all_users(self) -> None:
"""Remove all users from the group.
Iterates through all users (via :attr:`users`) and calls
:meth:`remove_user` for each one.
:raises ClientError: If the IAM API call to remove a user fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
users = list(self.users)
if users:
LOG.info("Removing %d users from group %s", len(users), self._resource_id)
for user in users:
self.remove_user(user)