Source code for infrahouse_core.orchestrator.raft_cluster

"""Module for OrchestratorRaftCluster — ties Raft membership to an ASG."""

from logging import getLogger

from infrahouse_core.aws.asg import ASG
from infrahouse_core.orchestrator.exceptions import (
    IHRaftLeaderNotFound,
    IHRaftPeerError,
)
from infrahouse_core.orchestrator.raft_node import OrchestratorRaftNode

LOG = getLogger(__name__)


[docs] class OrchestratorRaftCluster: """Ties MySQL Orchestrator Raft membership to an AWS Auto Scaling Group. Creates one :class:`OrchestratorRaftNode` per live ASG instance and provides methods to add/remove peers and reconcile the full Raft peer list against the live ASG membership. Commands are executed on instances via SSM, so the caller does not need direct network access to the Orchestrator HTTP port. :param asg_name: Name of the Auto Scaling Group running Orchestrator. :type asg_name: str :param region: AWS region. :type region: str :param role_arn: IAM role ARN for cross-account access. :type role_arn: str :param session: Pre-configured ``boto3.Session``. :param http_port: Orchestrator HTTP API port. :type http_port: int :param raft_port: Raft protocol port. :type raft_port: int """ def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments self, asg_name, region=None, role_arn=None, session=None, http_port=3000, raft_port=10008, ): self._asg_name = asg_name self._region = region self._role_arn = role_arn self._session = session self._http_port = http_port self._raft_port = raft_port self._asg_instance = None @property def _asg(self): """Lazily create and return the underlying :class:`ASG` instance.""" if self._asg_instance is None: self._asg_instance = ASG( self._asg_name, region=self._region, role_arn=self._role_arn, session=self._session, ) return self._asg_instance _SKIP_LIFECYCLE_STATES = frozenset( { "Pending", "Pending:Wait", "Pending:Proceed", "Terminated", } ) @property def nodes(self): """Return an :class:`OrchestratorRaftNode` for each queryable ASG instance. Instances in early lifecycle states (``Pending``, ``Pending:Wait``, etc.) are excluded because they have no SSM agent or Orchestrator running yet. :rtype: list[OrchestratorRaftNode] """ return [ OrchestratorRaftNode( instance, http_port=self._http_port, raft_port=self._raft_port, ) for instance in self._asg.instances if instance.lifecycle_state not in self._SKIP_LIFECYCLE_STATES ] def _node_lookup(self, nodes): """Build a dict mapping both private_ip and hostname to each node. Raft may identify peers by IP address (e.g. ``"10.1.100.195"``) or by short hostname (e.g. ``"ip-10-1-100-195"``). This lookup supports both. :rtype: dict[str, OrchestratorRaftNode] """ lookup = {} for node in nodes: ip = node.private_ip if ip is not None: lookup[ip] = node lookup[node.hostname] = node return lookup @property def leader(self): """Return the :class:`OrchestratorRaftNode` that is the current Raft leader. Queries each live ASG instance until one responds with a non-nil leader. All mutation operations (add/remove peer) should be sent to the leader. :rtype: OrchestratorRaftNode :raises IHRaftLeaderNotFound: If no node reports a leader. """ nodes = self.nodes lookup = self._node_lookup(nodes) for node in nodes: try: leader_addr = node.raft_leader except (IHRaftPeerError, TimeoutError, RuntimeError): LOG.warning("Could not reach node %s, skipping.", node.hostname) continue if leader_addr is not None: leader_host = leader_addr.split(":")[0] candidate = lookup.get(leader_host) if candidate is not None: LOG.info("Found Raft leader at %s", leader_host) return candidate LOG.warning( "Leader %s is not in the current ASG instance list.", leader_addr, ) raise IHRaftLeaderNotFound(f"No Raft leader found among ASG instances for {self._asg_name}") @property def peers(self): """Return the current Raft peer list from the leader as node objects. Live ASG instances are returned as full nodes; stale peers (terminated instances no longer in the ASG) are returned as instance-less nodes created via :meth:`OrchestratorRaftNode.from_peer_addr`. :rtype: list[OrchestratorRaftNode] """ lookup = self._node_lookup(self.nodes) return [ lookup.get(addr.split(":")[0], OrchestratorRaftNode.from_peer_addr(addr)) for addr in self.leader.raft_peers ]
[docs] def add_peer(self, node): """Add a peer to the Raft cluster. :param node: The node to add. :type node: OrchestratorRaftNode :raises IHRaftLeaderNotFound: If no leader is reachable. :raises IHRaftPeerError: If the add operation fails. """ self.leader.add_peer(node)
[docs] def remove_peer(self, node): """Remove a peer from the Raft cluster. :param node: The node to remove. :type node: OrchestratorRaftNode :raises IHRaftLeaderNotFound: If no leader is reachable. :raises IHRaftPeerError: If the remove operation fails. """ self.leader.remove_peer(node)
[docs] def reconcile(self): """Reconcile the Raft peer list against the live ASG membership. 1. Collect the hostnames of all currently live ASG instances. 2. Collect the hostnames from the leader's Raft peer list. 3. Remove stale peers (in Raft but not in ASG) by their Raft address. 4. Add missing peers (in ASG but not in Raft) via their node objects. :raises IHRaftLeaderNotFound: If no leader is reachable. :raises IHRaftPeerError: If any add or remove operation fails. """ leader = self.leader LOG.info("Reconciling Raft peers via leader %s", leader.hostname) live_nodes = self.nodes LOG.debug("Live ASG instances: %s", [node.hostname for node in live_nodes]) # Build a map of raft host -> stale node for peers not in the ASG raft_peer_addrs = {} for addr in leader.raft_peers: host = addr.split(":")[0] raft_peer_addrs[host] = OrchestratorRaftNode.from_peer_addr(addr) LOG.debug("Current Raft peer hosts: %s", set(raft_peer_addrs)) stale_hosts = ( set(raft_peer_addrs) - {node.private_ip for node in live_nodes} - {node.hostname for node in live_nodes} ) missing_nodes = [ node for node in live_nodes if node.private_ip not in raft_peer_addrs and node.hostname not in raft_peer_addrs ] for host in stale_hosts: LOG.info("Removing stale Raft peer %s", host) self.remove_peer(raft_peer_addrs[host]) for node in missing_nodes: LOG.info("Adding missing Raft peer %s", node.hostname) self.add_peer(node) LOG.info("Raft reconcile complete: removed=%d added=%d", len(stale_hosts), len(missing_nodes))