Source code for infrahouse_core.aws.iam_user
"""
IAM User resource wrapper.
Provides ``exists`` / ``delete()`` support with dependency-aware teardown
(detach policies, delete inline policies, remove from groups,
delete access keys, then delete the user).
"""
from __future__ import annotations
from logging import getLogger
from typing import TYPE_CHECKING
from botocore.exceptions import ClientError
from cached_property import cached_property_with_ttl
from infrahouse_core.aws.base import AWSResource
if TYPE_CHECKING:
from infrahouse_core.aws.iam_group import IAMGroup
from infrahouse_core.aws.iam_policy import IAMPolicy
LOG = getLogger(__name__)
[docs]
class IAMUser(AWSResource):
"""Wrapper around an IAM user.
:param user_name: Name of the IAM user.
:param region: AWS region.
:param role_arn: IAM role ARN for cross-account access.
"""
def __init__(self, user_name, region=None, role_arn=None, session=None):
super().__init__(user_name, "iam", region=region, role_arn=role_arn, session=session)
@property
def user_name(self) -> str:
"""Return the name of the user.
:rtype: str
"""
return self._resource_id
@property
def exists(self) -> bool:
"""Return ``True`` if the user exists."""
try:
self._client.get_user(UserName=self._resource_id)
return True
except ClientError as err:
if err.response["Error"]["Code"] == "NoSuchEntity":
return False
raise
# -- Groups --------------------------------------------------------------
[docs]
@cached_property_with_ttl(ttl=10)
def groups(self) -> list[IAMGroup]:
"""Return groups that this user belongs to.
:return: List of :class:`IAMGroup` instances.
:rtype: list[IAMGroup]
"""
# pylint: disable-next=import-outside-toplevel
from infrahouse_core.aws.iam_group import IAMGroup
result = []
paginator = self._client.get_paginator("list_groups_for_user")
for page in paginator.paginate(UserName=self._resource_id):
for group in page["Groups"]:
result.append(
IAMGroup(group["GroupName"], region=self._region, role_arn=self._role_arn, session=self._session)
)
return result
# -- Policy operations ---------------------------------------------------
[docs]
@cached_property_with_ttl(ttl=10)
def managed_policies(self) -> list[IAMPolicy]:
"""Return managed policies attached to this user.
:return: List of :class:`IAMPolicy` instances.
:rtype: list[IAMPolicy]
"""
# pylint: disable-next=import-outside-toplevel
from infrahouse_core.aws.iam_policy import IAMPolicy
policies = []
paginator = self._client.get_paginator("list_attached_user_policies")
for page in paginator.paginate(UserName=self._resource_id):
for policy in page["AttachedPolicies"]:
policies.append(
IAMPolicy(policy["PolicyArn"], region=self._region, role_arn=self._role_arn, session=self._session)
)
return policies
[docs]
def detach_policy(self, policy: IAMPolicy) -> None:
"""Detach a managed policy from the user.
:param policy: The managed policy to detach.
:type policy: IAMPolicy
"""
self._client.detach_user_policy(
UserName=self._resource_id,
PolicyArn=policy.policy_arn,
)
LOG.debug("Detached policy %s from user %s", policy.policy_arn, self._resource_id)
# -- Delete --------------------------------------------------------------
[docs]
def delete(self) -> None:
"""Delete the user after removing all dependencies.
Teardown order:
1. Detach all managed policies.
2. Delete all inline policies.
3. Remove from all groups.
4. Delete all access keys.
5. Delete the user itself.
Idempotent -- does nothing if the user does not exist.
"""
try:
self._detach_managed_policies()
self._delete_inline_policies()
self._remove_from_groups()
self._delete_access_keys()
self._client.delete_user(UserName=self._resource_id)
LOG.info("Deleted IAM user %s", self._resource_id)
except ClientError as err:
if err.response["Error"]["Code"] == "NoSuchEntity":
LOG.info("IAM user %s does not exist.", self._resource_id)
else:
raise
def _detach_managed_policies(self) -> None:
"""Detach all managed policies from the user.
Iterates through all attached managed policies (via :attr:`managed_policies`)
and calls :meth:`detach_policy` for each one.
:raises ClientError: If the IAM API call to detach a policy fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
policies = list(self.managed_policies)
if policies:
LOG.info("Detaching %d managed policies from user %s", len(policies), self._resource_id)
for policy in policies:
self.detach_policy(policy)
def _delete_inline_policies(self) -> None:
"""Delete all inline policies from the user.
Lists all inline policy names via ``list_user_policies`` pagination,
then deletes each one with ``delete_user_policy``.
:raises ClientError: If the IAM API call to list or delete a policy fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
policy_names = []
paginator = self._client.get_paginator("list_user_policies")
for page in paginator.paginate(UserName=self._resource_id):
policy_names.extend(page["PolicyNames"])
if policy_names:
LOG.info("Deleting %d inline policies from user %s", len(policy_names), self._resource_id)
for policy_name in policy_names:
self._client.delete_user_policy(
UserName=self._resource_id,
PolicyName=policy_name,
)
LOG.debug("Deleted inline policy %s from user %s", policy_name, self._resource_id)
def _remove_from_groups(self) -> None:
"""Remove the user from all groups.
Iterates through all groups (via :attr:`groups`) and calls
:meth:`~IAMGroup.remove_user` on each one.
:raises ClientError: If the IAM API call to remove the user fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
groups = list(self.groups)
if groups:
LOG.info("Removing user %s from %d groups", self._resource_id, len(groups))
for group in groups:
group.remove_user(self)
def _delete_access_keys(self) -> None:
"""Delete all access keys for the user.
Lists all access keys via ``list_access_keys`` pagination, then
deletes each one with ``delete_access_key``.
:raises ClientError: If the IAM API call to list or delete keys fails.
``NoSuchEntity`` errors are not caught here; the caller is
responsible for handling them.
"""
keys = []
paginator = self._client.get_paginator("list_access_keys")
for page in paginator.paginate(UserName=self._resource_id):
keys.extend(page["AccessKeyMetadata"])
if keys:
LOG.info("Deleting %d access keys from user %s", len(keys), self._resource_id)
for key in keys:
self._client.delete_access_key(
UserName=self._resource_id,
AccessKeyId=key["AccessKeyId"],
)
LOG.debug("Deleted access key %s from user %s", key["AccessKeyId"], self._resource_id)