Source code for infrahouse_core.aws.dynamodb
"""
Module for DynamoDB class.
"""
import contextlib
from logging import getLogger
from time import sleep, time
from typing import Union
from botocore.exceptions import ClientError
from infrahouse_core.aws import get_client, get_resource
from infrahouse_core.aws.exceptions import IHItemNotFound
LOG = getLogger(__name__)
[docs]
class DynamoDBTable:
"""
DynamoDB table wrapper with distributed locking support.
:param table_name: DynamoDB table name. It must exist.
:type table_name: str
:param region: AWS region
:type region: str
:param role_arn: IAM role ARN to assume for cross-account access.
:type role_arn: str
"""
def __init__(self, table_name: str, region: str = None, role_arn: str = None, session=None):
self._table_name = table_name
self._region = region
self._role_arn = role_arn
self._session = session
self._table_instance = None
self._client_instance = None
@property
def exists(self) -> bool:
"""
Check whether the DynamoDB table currently exists.
:return: ``True`` if the table exists, ``False`` otherwise.
"""
try:
self._dynamodb_client.describe_table(TableName=self._table_name)
return True
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
return False
raise
[docs]
def delete(self) -> None:
"""
Delete the DynamoDB table.
Idempotent -- does nothing if the table does not exist.
"""
try:
self._dynamodb_client.delete_table(TableName=self._table_name)
LOG.info("Deleted DynamoDB table %s", self._table_name)
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
LOG.info("DynamoDB table %s does not exist.", self._table_name)
else:
raise
[docs]
def delete_item(self, **kwargs):
"""Delete record from the table."""
self._table().delete_item(**kwargs)
[docs]
def get_item(self, **kwargs) -> dict:
"""Get a record from the table.
:param kwargs: Arguments passed to boto3 DynamoDB get_item().
Key (required): Primary key of the item to retrieve.
:return: The item attributes as a dictionary.
:raises IHItemNotFound: If the item does not exist.
"""
response = self._table().get_item(**kwargs)
item = response.get("Item")
if item is None:
raise IHItemNotFound(f"Item not found in '{self._table_name}': {kwargs.get('Key')}")
return item
[docs]
@contextlib.contextmanager
def lock(
self,
lock_name: str,
timeout: int = 30,
ttl: Union[int, None] = 300,
key_name: str = "ResourceId",
):
"""Global exclusive lock context manager.
This function attempts to acquire a lock on a specific resource in the
DynamoDB table using a conditional put operation. If the lock is acquired,
the code within the 'with' block will execute. The lock is released after
the block execution.
If a lock exists but has expired (based on TTL), it will be automatically
overwritten, allowing recovery from crashed processes that left stale locks.
:param lock_name: The name of the lock (resource) to be acquired.
:param timeout: Maximum time in seconds to attempt acquiring the lock.
:param ttl: Lock expiration time in seconds. If a process crashes while holding
the lock, other processes can acquire it after this time. Set to None to
disable TTL (not recommended). Default is 300 seconds (5 minutes).
:param key_name: The partition key name in the DynamoDB table (default: "ResourceId").
:raises RuntimeError: If the lock cannot be acquired within the timeout.
:raises ClientError: If an unexpected error occurs while trying to acquire the lock.
Example::
table = DynamoDBTable("my-locks-table", region="us-east-1")
with table.lock("deploy-my-service", timeout=60, ttl=600):
# Only one process can hold this lock at a time.
run_deployment()
"""
start = time()
while True:
if time() > start + timeout:
raise RuntimeError(f"Failed to acquire lock '{lock_name}' after {timeout} seconds")
try:
if ttl is not None:
# With TTL: acquire if lock doesn't exist OR if it's expired
self.put_item(
Item={key_name: lock_name, "expires_at": int(time()) + ttl},
ConditionExpression="attribute_not_exists(#r) OR #e < :now",
ExpressionAttributeNames={"#r": key_name, "#e": "expires_at"},
ExpressionAttributeValues={":now": int(time())},
)
else:
# Without TTL: only acquire if lock doesn't exist
self.put_item(
Item={key_name: lock_name},
ConditionExpression="attribute_not_exists(#r)",
ExpressionAttributeNames={"#r": key_name},
)
LOG.info("Lock acquired: %s", lock_name)
break
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
LOG.debug("Lock busy, waiting...")
sleep(1)
else:
raise
try:
yield
finally:
self.delete_item(Key={key_name: lock_name})
LOG.info("Lock released: %s", lock_name)
[docs]
def put_item(self, **kwargs):
"""Add record to the table."""
self._table().put_item(**kwargs)
@property
def _dynamodb_client(self):
"""Lazy-loaded low-level DynamoDB client (for describe/delete table operations)."""
if self._client_instance is None:
if self._session is not None:
self._client_instance = self._session.client("dynamodb", region_name=self._region)
else:
self._client_instance = get_client("dynamodb", role_arn=self._role_arn, region=self._region)
LOG.debug("Created DynamoDB client for %s", self._table_name)
return self._client_instance
def _table(self):
if self._table_instance is None:
if self._session is not None:
resource = self._session.resource("dynamodb", region_name=self._region)
else:
resource = get_resource("dynamodb", role_arn=self._role_arn, region=self._region)
self._table_instance = resource.Table(self._table_name)
LOG.debug("Created DynamoDB table resource for %s", self._table_name)
return self._table_instance