Source code for infrahouse_core.aws.cloudfront_distribution

"""
CloudFront Distribution resource wrapper.

Provides ``exists`` / ``delete()`` support for Amazon CloudFront
distributions.  Deletion is a multi-step process:

1. Disable the distribution (if enabled).
2. Wait for the distribution to reach the ``Deployed`` state.
3. Delete the distribution using the current ETag.
"""

from __future__ import annotations

import time
from logging import getLogger

from botocore.exceptions import ClientError

from infrahouse_core.aws.base import AWSResource
from infrahouse_core.timeout import timeout

LOG = getLogger(__name__)

# How long to sleep between status polls while waiting for deployment.
_POLL_INTERVAL_SECONDS = 30

# Maximum time (seconds) to wait for deployment before giving up.
_MAX_WAIT_SECONDS = 1800  # 30 minutes


[docs] class CloudFrontDistribution(AWSResource): """Wrapper around an Amazon CloudFront distribution. :param distribution_id: The CloudFront distribution ID (e.g. ``E1A2B3C4D5E6F7``). :param region: AWS region. :param role_arn: IAM role ARN for cross-account access. """ def __init__(self, distribution_id, region=None, role_arn=None, session=None): super().__init__(distribution_id, "cloudfront", region=region, role_arn=role_arn, session=session) @property def distribution_id(self) -> str: """Return the distribution ID. :rtype: str """ return self._resource_id @property def exists(self) -> bool: """Return ``True`` if the distribution exists. Returns ``False`` if the API raises ``NoSuchDistribution``. """ try: self._client.get_distribution(Id=self._resource_id) return True except ClientError as err: if err.response["Error"]["Code"] == "NoSuchDistribution": return False raise # -- Enable / Disable -----------------------------------------------------
[docs] def enable(self) -> None: """Enable the distribution. No-op if the distribution is already enabled. :raises ClientError: For unexpected AWS API errors. """ self._set_enabled(True)
[docs] def disable(self) -> None: """Disable the distribution. No-op if the distribution is already disabled. :raises ClientError: For unexpected AWS API errors. """ self._set_enabled(False)
def _set_enabled(self, enabled: bool) -> None: """Set the ``Enabled`` flag on the distribution config. :param enabled: ``True`` to enable, ``False`` to disable. """ config_response = self._client.get_distribution_config(Id=self._resource_id) config = config_response["DistributionConfig"] etag = config_response["ETag"] if config["Enabled"] is enabled: LOG.info( "CloudFront distribution %s is already %s.", self._resource_id, "enabled" if enabled else "disabled", ) return config["Enabled"] = enabled self._client.update_distribution( DistributionConfig=config, Id=self._resource_id, IfMatch=etag, ) LOG.info( "%s CloudFront distribution %s", "Enabled" if enabled else "Disabled", self._resource_id, ) # -- Delete --------------------------------------------------------------
[docs] def delete(self) -> None: """Disable (if needed), wait for deployment, then delete the distribution. Idempotent -- does nothing if the distribution does not exist. :raises TimeoutError: If the distribution does not reach ``Deployed`` status within the allowed wait time. :raises ClientError: For unexpected AWS API errors or if the distribution cannot be deleted (e.g. still in use). """ try: self.disable() except ClientError as err: if err.response["Error"]["Code"] == "NoSuchDistribution": LOG.info("CloudFront distribution %s does not exist.", self._resource_id) return raise # Wait for the distribution to be fully deployed. self._wait_until_deployed() # Fetch the latest ETag and delete. # # Re-fetch because the ETag may have changed after the status # transition from InProgress → Deployed. config_response = self._client.get_distribution_config(Id=self._resource_id) etag = config_response["ETag"] self._client.delete_distribution(Id=self._resource_id, IfMatch=etag) LOG.info("Deleted CloudFront distribution %s", self._resource_id)
def _wait_until_deployed(self) -> None: """Poll ``get_distribution`` until the status is ``Deployed``. :raises TimeoutError: If the status does not reach ``Deployed`` within :data:`_MAX_WAIT_SECONDS`. """ with timeout(_MAX_WAIT_SECONDS): while True: response = self._client.get_distribution(Id=self._resource_id) status = response["Distribution"]["Status"] if status == "Deployed": return LOG.debug( "Distribution %s status: %s — waiting %ds", self._resource_id, status, _POLL_INTERVAL_SECONDS, ) time.sleep(_POLL_INTERVAL_SECONDS)