Source code for infrahouse_core.aws.ecs_service
"""
ECS Service resource wrapper.
Provides ``exists`` / ``delete()`` support plus service status queries.
"""
from __future__ import annotations
from logging import getLogger
from botocore.exceptions import ClientError
from infrahouse_core.aws.base import AWSResource
LOG = getLogger(__name__)
[docs]
class ECSService(AWSResource):
"""Wrapper around an ECS service.
:param cluster_name: Name or ARN of the ECS cluster.
:param service_name: Name of the ECS service.
:param region: AWS region.
:param role_arn: IAM role ARN for cross-account access.
:param session: Pre-configured ``boto3.Session``.
"""
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
self, cluster_name, service_name, region=None, role_arn=None, session=None
):
super().__init__(service_name, "ecs", region=region, role_arn=role_arn, session=session)
self._cluster_name = cluster_name
@property
def cluster_name(self) -> str:
"""Return the cluster name.
:rtype: str
"""
return self._cluster_name
@property
def service_name(self) -> str:
"""Return the service name.
:rtype: str
"""
return self._resource_id
@property
def exists(self) -> bool:
"""Return ``True`` if the service exists and is ``ACTIVE``."""
response = self._client.describe_services(
cluster=self._cluster_name,
services=[self._resource_id],
)
services = response.get("services", [])
if not services:
return False
return services[0].get("status") == "ACTIVE"
@property
def status(self) -> str:
"""Return the service status string (``ACTIVE``, ``DRAINING``, ``INACTIVE``).
:rtype: str
"""
return self._describe()["status"]
@property
def task_definition_arn(self) -> str:
"""Return the ARN of the active task definition.
:rtype: str
"""
return self._describe()["taskDefinition"]
@property
def desired_count(self) -> int:
"""Return the current desired count.
:rtype: int
"""
return self._describe()["desiredCount"]
@property
def running_count(self) -> int:
"""Return the current running count.
:rtype: int
"""
return self._describe()["runningCount"]
@property
def is_steady_state(self) -> bool:
"""Return ``True`` if the service has reached steady state.
Steady state means ``runningCount == desiredCount`` and all
deployments have ``rolloutState == "COMPLETED"``.
:rtype: bool
"""
desc = self._describe()
if desc["runningCount"] != desc["desiredCount"]:
return False
for deployment in desc.get("deployments", []):
if deployment.get("rolloutState") != "COMPLETED":
return False
return True
[docs]
def delete(self) -> None:
"""Delete the service.
Sets ``desiredCount`` to 0 then force-deletes the service.
Idempotent -- does nothing if the service does not exist.
"""
try:
self._client.update_service(
cluster=self._cluster_name,
service=self._resource_id,
desiredCount=0,
)
self._client.delete_service(
cluster=self._cluster_name,
service=self._resource_id,
force=True,
)
LOG.info("Deleted ECS service %s in cluster %s", self._resource_id, self._cluster_name)
except ClientError as err:
error_code = err.response["Error"]["Code"]
if error_code in ("ServiceNotFoundException", "ServiceNotActiveException", "ClusterNotFoundException"):
LOG.info(
"ECS service %s in cluster %s does not exist.",
self._resource_id,
self._cluster_name,
)
else:
raise
def _describe(self) -> dict:
"""Return the service description dict.
:raises RuntimeError: If the service is not found.
:rtype: dict
"""
response = self._client.describe_services(
cluster=self._cluster_name,
services=[self._resource_id],
)
services = response.get("services", [])
if not services:
raise RuntimeError(f"ECS service {self._resource_id} not found in cluster {self._cluster_name}")
return services[0]