Source code for infrahouse_core.aws.cloudfront_cache_policy

"""
CloudFront Cache Policy resource wrapper.

Provides ``exists`` / ``delete()`` support for CloudFront cache policies.
"""

from __future__ import annotations

from logging import getLogger

from botocore.exceptions import ClientError

from infrahouse_core.aws.base import AWSResource

LOG = getLogger(__name__)


[docs] class CloudFrontCachePolicy(AWSResource): """Wrapper around a CloudFront cache policy. :param policy_id: The cache policy ID. :param region: AWS region. :param role_arn: IAM role ARN for cross-account access. """ def __init__(self, policy_id, region=None, role_arn=None, session=None): super().__init__(policy_id, "cloudfront", region=region, role_arn=role_arn, session=session) @property def policy_id(self) -> str: """Return the cache policy ID. :rtype: str """ return self._resource_id @property def exists(self) -> bool: """Return ``True`` if the cache policy exists. Returns ``False`` if the API raises ``NoSuchCachePolicy``. """ try: self._client.get_cache_policy(Id=self._resource_id) return True except ClientError as err: if err.response["Error"]["Code"] == "NoSuchCachePolicy": return False raise # -- Delete --------------------------------------------------------------
[docs] def delete(self) -> None: """Delete the cache policy. Fetches the current ETag and issues the delete call. Idempotent -- does nothing if the policy does not exist. .. note:: ``CachePolicyInUse`` is **not** caught and will propagate to the caller. The policy must be detached from all distributions first. """ try: response = self._client.get_cache_policy(Id=self._resource_id) etag = response["ETag"] except ClientError as err: if err.response["Error"]["Code"] == "NoSuchCachePolicy": LOG.info("CloudFront cache policy %s does not exist.", self._resource_id) return raise self._client.delete_cache_policy(Id=self._resource_id, IfMatch=etag) LOG.info("Deleted CloudFront cache policy %s", self._resource_id)