Source code for infrahouse_core.aws.rds_cluster
"""
RDS Cluster resource wrapper.
Provides ``exists`` / ``delete()`` support for RDS DB clusters.
Deletion removes all cluster member instances first, then the cluster itself.
"""
from __future__ import annotations
from logging import getLogger
from botocore.exceptions import ClientError
from infrahouse_core.aws.base import AWSResource
LOG = getLogger(__name__)
[docs]
class RDSCluster(AWSResource):
"""Wrapper around an RDS DB cluster.
:param db_cluster_id: DB cluster identifier.
:param region: AWS region.
:param role_arn: IAM role ARN for cross-account access.
"""
def __init__(self, db_cluster_id, region=None, role_arn=None, session=None):
super().__init__(db_cluster_id, "rds", region=region, role_arn=role_arn, session=session)
@property
def db_cluster_id(self) -> str:
"""Return the DB cluster identifier.
:rtype: str
"""
return self._resource_id
@property
def exists(self) -> bool:
"""Return ``True`` if the DB cluster exists."""
try:
self._client.describe_db_clusters(DBClusterIdentifier=self._resource_id)
return True
except ClientError as err:
if err.response["Error"]["Code"] == "DBClusterNotFoundFault":
return False
raise
[docs]
def delete(self) -> None:
"""Delete the cluster and all its member instances.
Deletes all cluster member DB instances first (skipping final
snapshots), then deletes the cluster itself.
Idempotent -- does nothing if the cluster does not exist.
"""
# Get cluster members before deletion
try:
response = self._client.describe_db_clusters(DBClusterIdentifier=self._resource_id)
members = response["DBClusters"][0].get("DBClusterMembers", [])
except ClientError as err:
if err.response["Error"]["Code"] == "DBClusterNotFoundFault":
LOG.info("RDS cluster %s does not exist.", self._resource_id)
return
raise
# Delete member instances first
for member in members:
instance_id = member["DBInstanceIdentifier"]
try:
self._client.delete_db_instance(
DBInstanceIdentifier=instance_id,
SkipFinalSnapshot=True,
)
LOG.info("Deleted RDS cluster member instance %s", instance_id)
except ClientError as err:
if err.response["Error"]["Code"] == "DBInstanceNotFound":
LOG.info("RDS cluster member instance %s does not exist.", instance_id)
else:
raise
# Delete the cluster
try:
self._client.delete_db_cluster(
DBClusterIdentifier=self._resource_id,
SkipFinalSnapshot=True,
)
LOG.info("Deleted RDS cluster %s", self._resource_id)
except ClientError as err:
if err.response["Error"]["Code"] == "DBClusterNotFoundFault":
LOG.info("RDS cluster %s does not exist.", self._resource_id)
else:
raise