Source code for infrahouse_core.orchestrator.raft_node
"""Module for OrchestratorRaftNode — wraps a single Orchestrator node's HTTP API via SSM."""
import json
from logging import getLogger
from cached_property import cached_property_with_ttl
from infrahouse_core.orchestrator.exceptions import IHRaftPeerError
LOG = getLogger(__name__)
[docs]
class OrchestratorRaftNode:
"""Wraps the HTTP API of a single MySQL Orchestrator node.
Commands are executed on the instance via SSM (``execute_command``),
so the caller does not need direct network access to the Orchestrator
HTTP port.
A node may also represent a stale Raft peer whose EC2 instance no longer
exists. Use :meth:`from_peer_addr` to create such a node. Stale nodes
expose :attr:`hostname` and :attr:`peer_addr` but cannot execute API calls.
:param instance: An ASG instance running Orchestrator, or ``None`` for stale peers.
:type instance: infrahouse_core.aws.asg_instance.ASGInstance or None
:param http_port: Orchestrator HTTP API port.
:type http_port: int
:param raft_port: Raft protocol port.
:type raft_port: int
"""
def __init__(self, instance=None, http_port=3000, raft_port=10008, hostname=None):
self._instance = instance
self._http_port = http_port
self._raft_port = raft_port
self._hostname = hostname
[docs]
@classmethod
def from_peer_addr(cls, peer_addr):
"""Create a node from a Raft peer address string.
Useful for representing stale peers that no longer have a live EC2
instance.
:param peer_addr: Raft peer address, e.g. ``"ip-10-1-100-195:10008"``.
:type peer_addr: str
:rtype: OrchestratorRaftNode
"""
hostname, port_str = peer_addr.split(":")
return cls(hostname=hostname, raft_port=int(port_str))
@property
def private_ip(self):
"""Return the private IP address of the underlying EC2 instance.
:raises AttributeError: If the node has no live instance (stale peer).
"""
return self._instance.private_ip
@property
def hostname(self):
"""Return the short private hostname of the underlying EC2 instance.
This is what Orchestrator uses as the Raft node identifier,
e.g. ``"ip-10-1-100-195"``.
"""
if self._hostname is not None:
return self._hostname
return self._instance.hostname
@property
def instance(self):
"""Return the underlying ASGInstance, or ``None`` for stale peers."""
return self._instance
@property
def peer_addr(self):
"""Return the Raft peer address for this node in ``hostname:raft_port`` form.
:rtype: str
"""
return f"{self.hostname}:{self._raft_port}"
[docs]
@cached_property_with_ttl(ttl=10)
def raft_peers(self):
"""Retrieve the current Raft peer list from this node.
:return: List of peer addresses, e.g. ``["ip-10-1-100-195:10008", ...]``.
:rtype: list[str]
:raises IHRaftPeerError: If the command fails.
"""
return self._api_get("/api/raft-peers")
[docs]
@cached_property_with_ttl(ttl=10)
def raft_leader(self):
"""Retrieve the current Raft leader address as seen by this node.
:return: Leader address (``"hostname:raft_port"``), or ``None`` if no leader
is currently elected.
:rtype: str or None
:raises IHRaftPeerError: If the command fails.
"""
leader = self._api_get("/api/raft-leader")
if not leader or leader == "nil":
return None
return leader
[docs]
@cached_property_with_ttl(ttl=10)
def raft_health(self):
"""Retrieve the Raft health status from this node.
:return: Raft health payload as returned by Orchestrator.
:rtype: dict
:raises IHRaftPeerError: If the command fails.
"""
return self._api_get("/api/raft-health")
@property
def is_leader(self):
"""Return ``True`` if this node believes itself to be the Raft leader.
:rtype: bool
"""
return self.raft_leader == self.peer_addr # pylint: disable=comparison-with-callable
[docs]
def add_peer(self, peer):
"""Add a peer to this node's Raft cluster.
:param peer: The node to add.
:type peer: OrchestratorRaftNode
:raises IHRaftPeerError: If Orchestrator reports a failure.
"""
addr = peer.peer_addr
LOG.info("Adding Raft peer %s via %s", addr, self.hostname)
result = self._api_get(f"/api/raft-add-peer/{addr}")
self._check_raft_response(result, f"add peer {addr}")
[docs]
def remove_peer(self, peer):
"""Remove a peer from this node's Raft cluster.
:param peer: The node to remove.
:type peer: OrchestratorRaftNode
:raises IHRaftPeerError: If Orchestrator reports a failure.
"""
addr = peer.peer_addr
LOG.info("Removing Raft peer %s via %s", addr, self.hostname)
result = self._api_get(f"/api/raft-remove-peer/{addr}")
self._check_raft_response(result, f"remove peer {addr}")
def _api_get(self, path):
"""Run ``curl`` on the instance via SSM and return the parsed JSON response.
:param path: API path, e.g. ``"/api/raft-peers"``.
:type path: str
:return: Parsed JSON response.
:raises IHRaftPeerError: If the curl command fails (non-zero exit code).
"""
url = f"http://localhost:{self._http_port}{path}"
exit_code, stdout, stderr = self._instance.execute_command(f"curl -sf {url}")
if exit_code != 0:
raise IHRaftPeerError(f"curl {url} on {self.hostname} failed (exit {exit_code}): {stderr}")
return json.loads(stdout)
@staticmethod
def _check_raft_response(result, operation):
"""Inspect the parsed Orchestrator response for application-level errors.
Orchestrator returns HTTP 200 even when the operation failed; the
``Code`` field in the body signals success or failure.
:raises IHRaftPeerError: If ``result["Code"]`` equals ``"ERROR"``.
"""
if isinstance(result, dict) and result.get("Code") == "ERROR":
message = result.get("Message", "unknown error")
raise IHRaftPeerError(f"Orchestrator {operation} failed: {message}")