Source code for infrahouse_core.aws.emr_serverless_application
"""
EMR Serverless Application resource wrapper.
Provides ``exists`` / ``delete()`` support. An EMR Serverless application
must be in ``CREATED`` or ``STOPPED`` state before it can be deleted. If the
application is ``STARTED``, ``delete()`` will stop it first.
"""
from __future__ import annotations
from logging import getLogger
from time import sleep
from botocore.exceptions import ClientError
from infrahouse_core.aws.base import AWSResource
from infrahouse_core.timeout import timeout
LOG = getLogger(__name__)
_STOP_POLL_INTERVAL = 5
_STOP_TIMEOUT = 120
[docs]
class EMRServerlessApplication(AWSResource):
"""Wrapper around an EMR Serverless application.
:param application_id: Application identifier.
:param region: AWS region.
:param role_arn: IAM role ARN for cross-account access.
"""
def __init__(self, application_id, region=None, role_arn=None, session=None):
super().__init__(application_id, "emr-serverless", region=region, role_arn=role_arn, session=session)
@property
def application_id(self) -> str:
"""Return the application identifier.
:rtype: str
"""
return self._resource_id
@property
def exists(self) -> bool:
"""Return ``True`` if the application exists and is not terminated.
Returns ``False`` if the application is not found or its state is
``TERMINATED``.
"""
try:
response = self._client.get_application(applicationId=self._resource_id)
return response["application"]["state"] != "TERMINATED"
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
return False
raise
[docs]
def delete(self) -> None:
"""Delete the application.
If the application is in ``STARTED`` state, it will be stopped first.
Idempotent -- does nothing if the application does not exist.
"""
try:
response = self._client.get_application(applicationId=self._resource_id)
state = response["application"]["state"]
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
LOG.info("EMR Serverless application %s does not exist.", self._resource_id)
return
raise
if state == "STARTED":
LOG.info("Stopping EMR Serverless application %s before deletion", self._resource_id)
self._client.stop_application(applicationId=self._resource_id)
self._wait_for_stopped()
try:
self._client.delete_application(applicationId=self._resource_id)
LOG.info("Deleted EMR Serverless application %s", self._resource_id)
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
LOG.info("EMR Serverless application %s does not exist.", self._resource_id)
else:
raise
def _wait_for_stopped(self) -> None:
"""Poll until the application reaches ``STOPPED`` state."""
with timeout(_STOP_TIMEOUT):
while True:
response = self._client.get_application(applicationId=self._resource_id)
state = response["application"]["state"]
if state == "STOPPED":
return
LOG.debug("Application %s state: %s -- waiting %ds", self._resource_id, state, _STOP_POLL_INTERVAL)
sleep(_STOP_POLL_INTERVAL)