integrate docker and implement state tracking for server lifecycle management

This commit is contained in:
2026-02-02 17:00:32 +01:00
parent 84b56a163a
commit 753d9e1e64
10 changed files with 756 additions and 93 deletions

View File

@@ -1,5 +1,5 @@
use std::error::Error;
use std::fs;
use anyhow::Result;
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
@@ -11,10 +11,26 @@ pub struct Config {
pub rcon_password: String,
pub idle_timeout_secs: u64,
pub polling_interval_millis: u64,
pub container_name: String,
#[serde(default = "default_startup_timeout_secs")]
pub startup_timeout_secs: u64,
#[serde(default = "default_rcon_retry_interval_secs")]
pub rcon_retry_interval_secs: u64,
}
fn default_startup_timeout_secs() -> u64 {
600
}
fn default_rcon_retry_interval_secs() -> u64 {
1
}
impl Config {
pub fn load(path: &str) -> Result<Self, Box<dyn Error>> {
pub fn load(path: &str) -> Result<Self> {
let content = fs::read_to_string(path)?;
let config: Config = toml::from_str(&content)?;
Ok(config)

174
src/docker.rs Normal file
View File

@@ -0,0 +1,174 @@
use std::time::Duration;
use anyhow::{Result, bail};
use bollard::Docker;
use tokio::sync::mpsc;
use tokio::time::Instant;
use crate::config::Config;
use crate::rcon;
use crate::state::{ServerState, SharedServerState};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ContainerStatus {
Running,
Stopped,
NotFound,
}
pub struct DockerManager {
docker: Docker,
container_name: String,
}
impl DockerManager {
pub async fn new(container_name: String) -> Result<Self> {
let docker = Docker::connect_with_socket_defaults()?;
Ok(Self {
docker,
container_name,
})
}
pub async fn get_container_status(&self) -> Result<ContainerStatus> {
match self
.docker
.inspect_container(&self.container_name, None)
.await
{
Ok(details) => {
let Some(state) = details.state else {
bail!("No state in container details");
};
let running = state.running.unwrap_or(false);
if running {
Ok(ContainerStatus::Running)
} else {
Ok(ContainerStatus::Stopped)
}
}
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, ..
}) => Ok(ContainerStatus::NotFound),
Err(e) => Err(e.into()),
}
}
pub async fn start_container(&self) -> Result<()> {
println!("Starting Docker container: {}", self.container_name);
self.docker
.start_container(&self.container_name, None)
.await?;
Ok(())
}
}
pub async fn run_docker_lifecycle_manager(
config: Config,
state: SharedServerState,
mut player_connect_rx: mpsc::Receiver<()>,
) -> Result<()> {
let docker = DockerManager::new(config.container_name.clone()).await?;
match docker.get_container_status().await {
Ok(ContainerStatus::Running) => {
println!(
"Container '{}' is already running, checking for RCON availability...",
config.container_name
);
state.transition_to_starting().await;
}
Ok(ContainerStatus::Stopped) => {
println!(
"Container '{}' is stopped, waiting for player connection...",
config.container_name
);
state.transition_to_stopped().await;
}
Ok(ContainerStatus::NotFound) => {
eprintln!("ERROR: Container '{}' not found!", config.container_name);
bail!("Container not found");
}
Err(e) => {
eprintln!("ERROR: Failed to connect to Docker: {}", e);
return Err(e);
}
}
let startup_timeout = Duration::from_secs(config.startup_timeout_secs);
let stop_timeout = Duration::from_secs(30);
loop {
tokio::select! {
Some(_) = player_connect_rx.recv() => {
let current_state = state.get().await;
if matches!(current_state, ServerState::Stopped | ServerState::Unknown) {
println!("Player connection detected, starting container...");
if let Err(e) = docker.start_container().await {
eprintln!("Failed to start container: {}", e);
} else {
state.transition_to_starting().await;
}
} else {
println!("Player connection detected, but server is already in state {:?}", current_state);
}
}
_ = tokio::time::sleep(Duration::from_millis(500)) => {
let current_state = state.get().await;
match current_state {
ServerState::Starting { started_at } => {
match docker.get_container_status().await {
Ok(ContainerStatus::Stopped) => {
eprintln!("Container stopped unexpectedly during startup (crashed/exited)");
state.transition_to_stopped().await;
continue;
}
Ok(ContainerStatus::NotFound) => {
eprintln!("Container disappeared during startup");
state.transition_to_stopped().await;
continue;
}
Err(e) => {
eprintln!("Failed to check container status: {}", e);
}
Ok(ContainerStatus::Running) => {
}
}
let rcon_available = rcon::connect_rcon(&config.rcon_addr, &config.rcon_password).await.is_ok();
if rcon_available {
let startup_duration = (Instant::now() - started_at).as_secs();
println!("RCON connection established, server is ready!");
state.record_startup_duration(startup_duration).await;
state.transition_to_running().await;
} else if Instant::now() - started_at > startup_timeout {
eprintln!(
"Server start timeout ({}s), transitioning back to Stopped",
startup_timeout.as_secs()
);
state.transition_to_stopped().await;
}
}
ServerState::Stopping { stop_requested_at } => {
match docker.get_container_status().await {
Ok(ContainerStatus::Stopped) => {
println!("Container stopped successfully");
state.transition_to_stopped().await;
}
_ => {
if Instant::now() - stop_requested_at > stop_timeout {
eprintln!("Container stop timeout, forcing transition to Stopped");
state.transition_to_stopped().await;
}
}
}
}
_ => {}
}
}
}
}
}

View File

@@ -1,29 +1,57 @@
mod config;
mod motd;
mod docker;
mod monitor;
mod motd;
mod proxy;
mod rcon;
mod state;
use std::error::Error;
use anyhow::Result;
use tokio::main;
use tokio::sync::mpsc;
use config::Config;
use state::SharedServerState;
#[main]
async fn main() -> Result<(), Box<dyn Error>> {
async fn main() -> Result<()> {
let config = Config::load("config.toml")?;
let shared_state = SharedServerState::new();
let (player_connect_tx, player_connect_rx) = mpsc::channel::<()>(10);
let motd_config = config.clone();
let motd_state = shared_state.clone();
let motd_tx = player_connect_tx.clone();
tokio::spawn(async move {
motd::create_motd_server(&motd_config).run();
motd::create_motd_server(&motd_config, motd_state, Some(motd_tx)).run();
});
let monitor_config = config.clone();
let docker_config = config.clone();
let docker_state = shared_state.clone();
tokio::spawn(async move {
if let Err(e) = monitor::run_idle_monitor(monitor_config).await {
eprintln!("Idle monitor error: {e}");
if let Err(e) =
docker::run_docker_lifecycle_manager(docker_config, docker_state, player_connect_rx)
.await
{
eprintln!("Docker lifecycle manager error: {}", e);
}
});
proxy::run_proxy(config.listen_addr, config.server_addr).await
let monitor_config = config.clone();
let monitor_state = shared_state.clone();
tokio::spawn(async move {
if let Err(e) = monitor::run_idle_monitor(monitor_config, monitor_state).await {
eprintln!("Idle monitor error: {}", e);
}
});
proxy::run_proxy(
config.listen_addr,
config.server_addr,
config.motd_server_addr,
shared_state,
)
.await
}

View File

@@ -1,43 +1,73 @@
use std::error::Error;
use std::time::Duration;
use anyhow::Result;
use tokio::time::Instant;
use crate::config::Config;
use crate::rcon;
use crate::state::{ServerState, SharedServerState};
fn parse_players_online(s: &str) -> Option<u32> {
s.split_whitespace()
.find_map(|tok| tok.parse::<u32>().ok())
}
pub async fn run_idle_monitor(config: Config) -> Result<(), Box<dyn Error>> {
let mut conn = rcon::connect_rcon(&config.rcon_addr, &config.rcon_password).await?;
let mut idle = false;
pub async fn run_idle_monitor(
config: Config,
state: SharedServerState,
) -> Result<()> {
let mut last_online = Instant::now();
let idle_timeout = Duration::from_secs(config.idle_timeout_secs);
let polling_interval = Duration::from_millis(config.polling_interval_millis);
let mut backoff = Duration::from_secs(config.rcon_retry_interval_secs);
let max_backoff = Duration::from_secs(30);
loop {
let players_cmd_output = conn.cmd("list").await?;
let players_number = parse_players_online(&players_cmd_output).unwrap_or(0);
if players_number > 0 {
last_online = Instant::now();
}
println!("{players_number} {idle}");
if Instant::now() - last_online > idle_timeout {
if !idle {
idle = true;
println!("Stopping the server");
conn.cmd("stop").await?;
}
} else {
idle = false;
}
tokio::time::sleep(polling_interval).await;
let current_state = state.get().await;
if !matches!(current_state, ServerState::Running { .. }) {
last_online = Instant::now();
backoff = Duration::from_secs(config.rcon_retry_interval_secs);
continue;
}
let conn_result = rcon::connect_rcon(&config.rcon_addr, &config.rcon_password).await;
let mut conn = if let Ok(c) = conn_result {
backoff = Duration::from_secs(config.rcon_retry_interval_secs);
c
} else {
eprintln!("RCON connection failed, retrying in {:?}", backoff);
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
continue;
};
match conn.cmd("list").await {
Ok(output) => {
let players_number = parse_players_online(&output).unwrap_or(0);
if players_number > 0 {
last_online = Instant::now();
}
if Instant::now() - last_online > idle_timeout {
println!("Idle timeout reached, stopping server");
state.transition_to_stopping().await;
if let Err(e) = conn.cmd("stop").await {
eprintln!("Failed to send stop command: {}", e);
}
last_online = Instant::now();
}
}
Err(e) => {
eprintln!("RCON command failed: {}", e);
}
}
}
}

View File

@@ -1,4 +1,5 @@
use std::net::SocketAddr;
use tokio::sync::mpsc;
use valence::network::{
async_trait, BroadcastToLan, CleanupFn, ConnectionMode, HandshakeData, ServerListPing,
};
@@ -6,8 +7,12 @@ use valence::prelude::*;
use valence::MINECRAFT_VERSION;
use crate::config::Config;
use crate::state::{ServerState, SharedServerState};
struct MotdCallbacks;
struct MotdCallbacks {
state: SharedServerState,
player_connect_tx: Option<mpsc::Sender<()>>,
}
#[async_trait]
impl NetworkCallbacks for MotdCallbacks {
@@ -17,19 +22,65 @@ impl NetworkCallbacks for MotdCallbacks {
_remote_addr: SocketAddr,
handshake_data: &HandshakeData,
) -> ServerListPing {
let current_state = self.state.get().await;
let (description, eta_secs) = match current_state {
ServerState::Stopped | ServerState::Unknown => {
let avg_time = self.state.get_average_startup_time().await;
(
"Serwer sobie ".into_text()
+ "śpi".into_text().color(Color::rgb(250, 50, 50))
+ "!\n"
+ "Dołącz aby go obudzić! :3 "
.into_text()
.color(Color::rgb(255, 150, 230)),
avg_time,
)
}
ServerState::Starting { started_at } => {
let elapsed = (tokio::time::Instant::now() - started_at).as_secs();
let avg_time = self.state.get_average_startup_time().await;
let remaining = avg_time.map(|avg| avg.saturating_sub(elapsed));
(
"Serwer się ".into_text()
+ "budzi UwU".into_text().color(Color::rgb(255, 200, 50))
+ "...\n"
+ "Poczekaj chwileczkę!!! :3 "
.into_text()
.color(Color::rgb(150, 255, 150)),
remaining,
)
}
ServerState::Running { .. } => {
(
"Serwer jest ".into_text()
+ "obudzony".into_text().color(Color::rgb(50, 250, 50))
+ "!\n",
None,
)
}
ServerState::Stopping { .. } => (
"Serwer idzie sobie ".into_text()
+ "spać".into_text().color(Color::rgb(250, 150, 50))
+ "...\n",
Some(30),
),
};
let description = if let Some(secs) = eta_secs {
description
+ format!("{}s", secs)
.into_text()
.color(Color::rgb(80, 80, 80))
} else {
description
};
ServerListPing::Respond {
online_players: 0,
max_players: 0,
player_sample: vec![],
description: "Serwer jest ".into_text()
+ "wyłączony".into_text().color(Color::rgb(250, 50, 50))
+ "!\n"
+ "Dołącz aby uruchomić serwer! "
.into_text()
.color(Color::rgb(255, 150, 230))
+ format!("{}s", 20)
.into_text()
.color(Color::rgb(80, 80, 80)),
description,
favicon_png: include_bytes!("../assets/icon.png"),
version_name: MINECRAFT_VERSION.to_string(),
protocol: handshake_data.protocol_version,
@@ -37,7 +88,7 @@ impl NetworkCallbacks for MotdCallbacks {
}
async fn broadcast_to_lan(&self, _shared: &SharedNetworkState) -> BroadcastToLan {
BroadcastToLan::Enabled("Hello Valence!".into())
BroadcastToLan::Disabled
}
async fn login(
@@ -45,15 +96,55 @@ impl NetworkCallbacks for MotdCallbacks {
_shared: &SharedNetworkState,
_info: &NewClientInfo,
) -> Result<CleanupFn, Text> {
Err("You are not meant to join this example".color(Color::rgb(250, 30, 21)))
let current_state = self.state.get().await;
match current_state {
ServerState::Stopped | ServerState::Unknown => {
if let Some(tx) = &self.player_connect_tx {
let _ = tx.send(()).await;
}
if let Some(avg_secs) = self.state.get_average_startup_time().await {
Err(format!("Serwer się słodko budzi, poczekaj jeszcze ~{}s... :>", avg_secs)
.color(Color::rgb(255, 200, 50)))
} else {
Err("Serwer się słodko budzi, poczekaj chwilę... :>"
.color(Color::rgb(255, 200, 50)))
}
}
ServerState::Starting { started_at } => {
let elapsed = (tokio::time::Instant::now() - started_at).as_secs();
if let Some(avg) = self.state.get_average_startup_time().await {
let remaining = avg.saturating_sub(elapsed);
Err(format!("Serwer się słodko budzi, poczekaj jeszcze ~{}s... :>", remaining)
.color(Color::rgb(255, 150, 50)))
} else {
Err("Serwer się słodko budzi, poczekaj chwilę... :>"
.color(Color::rgb(255, 150, 50)))
}
}
ServerState::Stopping { .. } => Err("Serwer idzie spać, poczekaj... >:c"
.color(Color::rgb(250, 150, 50))),
ServerState::Running { .. } => {
Err("Połącz się z głównym serwerem".color(Color::rgb(50, 250, 50)))
}
}
}
}
pub fn create_motd_server(config: &Config) -> App {
pub fn create_motd_server(
config: &Config,
state: SharedServerState,
player_connect_tx: Option<mpsc::Sender<()>>,
) -> App {
let mut app = App::new();
app.insert_resource(NetworkSettings {
connection_mode: ConnectionMode::Offline,
callbacks: MotdCallbacks.into(),
callbacks: MotdCallbacks {
state,
player_connect_tx,
}
.into(),
address: config.motd_server_addr.parse().unwrap(),
..Default::default()
})

View File

@@ -1,25 +1,48 @@
use std::error::Error;
use anyhow::Result;
use tokio::io::copy_bidirectional;
use tokio::net::{TcpListener, TcpStream};
pub async fn run_proxy(listen_addr: String, server_addr: String) -> Result<(), Box<dyn Error>> {
use crate::state::{ServerState, SharedServerState};
pub async fn run_proxy(
listen_addr: String,
server_addr: String,
motd_server_addr: String,
state: SharedServerState,
) -> Result<()> {
println!("Listening on {}", listen_addr);
println!("Proxying to {}", server_addr);
println!("Server backend: {}", server_addr);
println!("MOTD backend: {}", motd_server_addr);
let listener = TcpListener::bind(&listen_addr).await?;
while let Ok((mut inbound, _)) = listener.accept().await {
while let Ok((mut inbound, client_addr)) = listener.accept().await {
let server_addr = server_addr.clone();
let motd_server_addr = motd_server_addr.clone();
let state = state.clone();
tokio::spawn(async move {
match TcpStream::connect(&server_addr).await {
let current_state = state.get().await;
let backend = match current_state {
ServerState::Running { .. } => {
println!("Client {} -> server (Running)", client_addr);
server_addr
}
_ => {
println!("Client {} -> MOTD (state: {:?})", client_addr, current_state);
motd_server_addr
}
};
match TcpStream::connect(&backend).await {
Ok(mut outbound) => {
if let Err(e) = copy_bidirectional(&mut inbound, &mut outbound).await {
println!("Failed to transfer; error={e}");
eprintln!("Failed to transfer; error={}", e);
}
}
Err(e) => {
println!("Failed to connect to server; error={e}");
eprintln!("Failed to connect to backend {}; error={}", backend, e);
}
}
});

View File

@@ -1,37 +1,14 @@
use std::error::Error;
use std::time::Duration;
use anyhow::Result;
use rcon::Connection;
use tokio::net::TcpStream;
pub async fn connect_rcon(
addr: &str,
password: &str,
) -> Result<Connection<TcpStream>, Box<dyn Error>> {
) -> Result<Connection<TcpStream>> {
let conn = <Connection<TcpStream>>::builder()
.enable_minecraft_quirks(true)
.connect(addr, password)
.await?;
Ok(conn)
}
pub async fn wait_for_rcon(
addr: &str,
password: &str,
timeout: Duration,
retry_interval: Duration,
) -> Result<Connection<TcpStream>, Box<dyn Error>> {
let start = tokio::time::Instant::now();
loop {
if tokio::time::Instant::now() - start > timeout {
return Err("RCON connection timeout".into());
}
match connect_rcon(addr, password).await {
Ok(conn) => return Ok(conn),
Err(_) => {
tokio::time::sleep(retry_interval).await;
}
}
}
}

109
src/state.rs Normal file
View File

@@ -0,0 +1,109 @@
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::Instant;
#[derive(Debug, Clone, Default)]
pub struct StartupMetrics {
pub total_startups: u32,
pub total_duration_secs: u64,
}
impl StartupMetrics {
pub fn record_startup(&mut self, duration_secs: u64) {
self.total_startups += 1;
self.total_duration_secs += duration_secs;
}
pub fn average_startup_time(&self) -> Option<u64> {
if self.total_startups == 0 {
None
} else {
Some(self.total_duration_secs / self.total_startups as u64)
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ServerState {
Unknown,
Stopped,
Starting {
started_at: Instant,
},
Running {
rcon_connected_at: Instant,
},
Stopping {
stop_requested_at: Instant,
},
}
#[derive(Clone)]
pub struct SharedServerState {
state: Arc<RwLock<ServerState>>,
metrics: Arc<RwLock<StartupMetrics>>,
}
impl SharedServerState {
pub fn new() -> Self {
Self {
state: Arc::new(RwLock::new(ServerState::Unknown)),
metrics: Arc::new(RwLock::new(StartupMetrics::default())),
}
}
pub async fn get(&self) -> ServerState {
self.state.read().await.clone()
}
pub async fn set(&self, new_state: ServerState) {
let mut state = self.state.write().await;
let old_state = state.clone();
*state = new_state.clone();
if old_state != new_state {
println!("State transition: {:?} -> {:?}", old_state, new_state);
}
}
pub async fn transition_to_starting(&self) {
self.set(ServerState::Starting {
started_at: Instant::now(),
}).await;
}
pub async fn transition_to_running(&self) {
self.set(ServerState::Running {
rcon_connected_at: Instant::now(),
}).await;
}
pub async fn transition_to_stopping(&self) {
self.set(ServerState::Stopping {
stop_requested_at: Instant::now(),
}).await;
}
pub async fn transition_to_stopped(&self) {
self.set(ServerState::Stopped).await;
}
pub async fn record_startup_duration(&self, duration_secs: u64) {
let mut metrics = self.metrics.write().await;
metrics.record_startup(duration_secs);
println!("Server startup took {}s (avg: {}s over {} startups)",
duration_secs,
metrics.average_startup_time().unwrap_or(0),
metrics.total_startups);
}
pub async fn get_average_startup_time(&self) -> Option<u64> {
self.metrics.read().await.average_startup_time()
}
}
impl Default for SharedServerState {
fn default() -> Self {
Self::new()
}
}