Skip to main content

Mountain/RunTime/Execute/
Fn.rs

1//! # Fn (RunTime::Execute)
2//!
3//! ## RESPONSIBILITIES
4//!
5//! Core effect execution functions that bridge the declarative ActionEffect
6//! system with the Echo scheduler for high-performance task execution.
7//!
8//! ## ARCHITECTURAL ROLE
9//!
10//! The execution engine in Mountain's architecture that handles the "how"
11//! of effect execution, while ActionEffect defines the "what".
12//!
13//! ## KEY COMPONENTS
14//!
15//! - **Run**: Basic effect execution through Echo scheduler
16//! - **RunWithTimeout**: Timeout-based execution with cancellation
17//! - **RunWithRetry**: Retry mechanisms with exponential backoff
18//!
19//! ## ERROR HANDLING
20//!
21//! All errors are propagated through Result<T, E> with detailed context.
22//! Effect errors are converted to CommonError when appropriate.
23//! Timeouts return timeout-specific errors.
24//! Retry failures include attempt count and last error information.
25//!
26//! ## LOGGING
27//!
28//! Uses log crate with appropriate severity levels:
29//! - `info`: Effect submission and completion
30//! - `debug`: Detailed execution steps
31//! - `warn`: Retry attempts and recoverable errors
32//! - `error`: Failed operations and timeout occurrences
33//!
34//! ## PERFORMANCE CONSIDERATIONS
35//!
36//! - Uses oneshot channels for result collection (minimal overhead)
37//! - Tasks are submitted to Echo's work-stealing scheduler
38//! - Timeout uses tokio::time::timeout for efficient cancellation
39//! - Retry with exponential backoff prevents system overload
40//!
41//! ## TODO
42//!
43//! None
44
45use std::sync::Arc;
46
47use CommonLibrary::{
48	Effect::{ActionEffect::ActionEffect, ApplicationRunTime::ApplicationRunTime as ApplicationRunTimeTrait},
49	Environment::Requires::Requires,
50	Error::CommonError::CommonError,
51};
52use Echo::Task::Priority::Priority;
53use async_trait::async_trait;
54
55use crate::{RunTime::ApplicationRunTime::ApplicationRunTime, dev_log};
56
57/// The core integration logic between `Common::ActionEffect` and
58/// `Echo::Scheduler`.
59#[async_trait]
60impl ApplicationRunTimeTrait for ApplicationRunTime {
61	async fn Run<TCapabilityProvider, TError, TOutput>(
62		&self,
63		Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
64	) -> Result<TOutput, TError>
65	where
66		TCapabilityProvider: ?Sized + Send + Sync + 'static,
67		<Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
68			Requires<TCapabilityProvider>,
69		TError: From<CommonError> + Send + Sync + 'static,
70		TOutput: Send + Sync + 'static, {
71		let (ResultSender, ResultReceiver) = tokio::sync::oneshot::channel::<Result<TOutput, TError>>();
72
73		let CapabilityProvider:Arc<TCapabilityProvider> = self.Environment.Require();
74
75		let Task = async move {
76			let Result = Effect.Apply(CapabilityProvider).await;
77
78			if ResultSender.send(Result).is_err() {
79				dev_log!(
80					"lifecycle",
81					"error: [ApplicationRunTime] Failed to send effect result; receiver was dropped."
82				);
83			}
84		};
85
86		self.Scheduler.Submit(Task, Priority::Normal);
87
88		match ResultReceiver.await {
89			Ok(Result) => Result,
90
91			Err(_RecvError) => {
92				let Message = "Effect execution canceled; oneshot channel closed.".to_string();
93
94				dev_log!("lifecycle", "error: {}", Message);
95
96				Err(CommonError::IPCError { Description:Message }.into())
97			},
98		}
99	}
100}
101
102impl ApplicationRunTime {
103	/// Enhanced effect execution with timeout and recovery.
104	pub async fn RunWithTimeout<TCapabilityProvider, TError, TOutput>(
105		&self,
106		Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
107		timeout:std::time::Duration,
108	) -> Result<TOutput, TError>
109	where
110		TCapabilityProvider: ?Sized + Send + Sync + 'static,
111		<Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
112			Requires<TCapabilityProvider>,
113		TError: From<CommonError> + Send + Sync + 'static,
114		TOutput: Send + Sync + 'static, {
115		tokio::time::timeout(timeout, ApplicationRunTimeTrait::Run(self, Effect))
116			.await
117			.map_err(|_| {
118				CommonError::Unknown { Description:format!("Effect execution timed out after {:?}", timeout) }.into()
119			})?
120	}
121
122	/// Execute effect with retry mechanism.
123	pub async fn RunWithRetry<TCapabilityProvider, TError, TOutput>(
124		&self,
125		Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
126		max_retries:u32,
127		initial_delay:std::time::Duration,
128	) -> Result<TOutput, TError>
129	where
130		TCapabilityProvider: ?Sized + Send + Sync + 'static,
131		<Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
132			Requires<TCapabilityProvider>,
133		TError: From<CommonError> + Send + Sync + 'static + std::fmt::Display,
134		TOutput: Send + Sync + 'static, {
135		let mut retry_count = 0;
136		let mut current_delay = initial_delay;
137
138		while retry_count <= max_retries {
139			match ApplicationRunTimeTrait::Run(self, Effect.clone()).await {
140				Ok(result) => return Ok(result),
141				Err(error) => {
142					if retry_count == max_retries {
143						return Err(error);
144					}
145
146					retry_count += 1;
147					dev_log!(
148						"lifecycle",
149						"warn: [ApplicationRunTime] Effect execution failed (attempt {}): {}. Retrying in {:?}...",
150						retry_count,
151						error,
152						current_delay
153					);
154
155					tokio::time::sleep(current_delay).await;
156
157					// Apply exponential backoff by doubling the delay after each failure
158					// to prevent overwhelming the system during recovery attempts.
159					current_delay *= 2;
160				},
161			}
162		}
163
164		Err(
165			CommonError::Unknown { Description:format!("Effect execution failed after {} retries", max_retries) }
166				.into(),
167		)
168	}
169}