Source code for infrahouse_core.aws.dynamodb

"""
Module for DynamoDB class.
"""

import contextlib
from logging import getLogger
from time import sleep, time
from typing import Union

from botocore.exceptions import ClientError

from infrahouse_core.aws import get_client, get_resource
from infrahouse_core.aws.exceptions import IHItemNotFound

LOG = getLogger(__name__)


[docs] class DynamoDBTable: """ DynamoDB table wrapper with distributed locking support. :param table_name: DynamoDB table name. It must exist. :type table_name: str :param region: AWS region :type region: str :param role_arn: IAM role ARN to assume for cross-account access. :type role_arn: str """ def __init__(self, table_name: str, region: str = None, role_arn: str = None, session=None): self._table_name = table_name self._region = region self._role_arn = role_arn self._session = session self._table_instance = None self._client_instance = None @property def exists(self) -> bool: """ Check whether the DynamoDB table currently exists. :return: ``True`` if the table exists, ``False`` otherwise. """ try: self._dynamodb_client.describe_table(TableName=self._table_name) return True except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": return False raise
[docs] def delete(self) -> None: """ Delete the DynamoDB table. Idempotent -- does nothing if the table does not exist. """ try: self._dynamodb_client.delete_table(TableName=self._table_name) LOG.info("Deleted DynamoDB table %s", self._table_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": LOG.info("DynamoDB table %s does not exist.", self._table_name) else: raise
[docs] def delete_item(self, **kwargs): """Delete record from the table.""" self._table().delete_item(**kwargs)
[docs] def get_item(self, **kwargs) -> dict: """Get a record from the table. :param kwargs: Arguments passed to boto3 DynamoDB get_item(). Key (required): Primary key of the item to retrieve. :return: The item attributes as a dictionary. :raises IHItemNotFound: If the item does not exist. """ response = self._table().get_item(**kwargs) item = response.get("Item") if item is None: raise IHItemNotFound(f"Item not found in '{self._table_name}': {kwargs.get('Key')}") return item
[docs] @contextlib.contextmanager def lock( self, lock_name: str, timeout: int = 30, ttl: Union[int, None] = 300, key_name: str = "ResourceId", ): """Global exclusive lock context manager. This function attempts to acquire a lock on a specific resource in the DynamoDB table using a conditional put operation. If the lock is acquired, the code within the 'with' block will execute. The lock is released after the block execution. If a lock exists but has expired (based on TTL), it will be automatically overwritten, allowing recovery from crashed processes that left stale locks. :param lock_name: The name of the lock (resource) to be acquired. :param timeout: Maximum time in seconds to attempt acquiring the lock. :param ttl: Lock expiration time in seconds. If a process crashes while holding the lock, other processes can acquire it after this time. Set to None to disable TTL (not recommended). Default is 300 seconds (5 minutes). :param key_name: The partition key name in the DynamoDB table (default: "ResourceId"). :raises RuntimeError: If the lock cannot be acquired within the timeout. :raises ClientError: If an unexpected error occurs while trying to acquire the lock. Example:: table = DynamoDBTable("my-locks-table", region="us-east-1") with table.lock("deploy-my-service", timeout=60, ttl=600): # Only one process can hold this lock at a time. run_deployment() """ start = time() while True: if time() > start + timeout: raise RuntimeError(f"Failed to acquire lock '{lock_name}' after {timeout} seconds") try: if ttl is not None: # With TTL: acquire if lock doesn't exist OR if it's expired self.put_item( Item={key_name: lock_name, "expires_at": int(time()) + ttl}, ConditionExpression="attribute_not_exists(#r) OR #e < :now", ExpressionAttributeNames={"#r": key_name, "#e": "expires_at"}, ExpressionAttributeValues={":now": int(time())}, ) else: # Without TTL: only acquire if lock doesn't exist self.put_item( Item={key_name: lock_name}, ConditionExpression="attribute_not_exists(#r)", ExpressionAttributeNames={"#r": key_name}, ) LOG.info("Lock acquired: %s", lock_name) break except ClientError as e: if e.response["Error"]["Code"] == "ConditionalCheckFailedException": LOG.debug("Lock busy, waiting...") sleep(1) else: raise try: yield finally: self.delete_item(Key={key_name: lock_name}) LOG.info("Lock released: %s", lock_name)
[docs] def put_item(self, **kwargs): """Add record to the table.""" self._table().put_item(**kwargs)
@property def _dynamodb_client(self): """Lazy-loaded low-level DynamoDB client (for describe/delete table operations).""" if self._client_instance is None: if self._session is not None: self._client_instance = self._session.client("dynamodb", region_name=self._region) else: self._client_instance = get_client("dynamodb", role_arn=self._role_arn, region=self._region) LOG.debug("Created DynamoDB client for %s", self._table_name) return self._client_instance def _table(self): if self._table_instance is None: if self._session is not None: resource = self._session.resource("dynamodb", region_name=self._region) else: resource = get_resource("dynamodb", role_arn=self._role_arn, region=self._region) self._table_instance = resource.Table(self._table_name) LOG.debug("Created DynamoDB table resource for %s", self._table_name) return self._table_instance