Mountain/RunTime/Execute/
Fn.rs1use std::sync::Arc;
46
47use CommonLibrary::{
48 Effect::{ActionEffect::ActionEffect, ApplicationRunTime::ApplicationRunTime as ApplicationRunTimeTrait},
49 Environment::Requires::Requires,
50 Error::CommonError::CommonError,
51};
52use Echo::Task::Priority::Priority;
53use async_trait::async_trait;
54
55use crate::{RunTime::ApplicationRunTime::ApplicationRunTime, dev_log};
56
57#[async_trait]
60impl ApplicationRunTimeTrait for ApplicationRunTime {
61 async fn Run<TCapabilityProvider, TError, TOutput>(
62 &self,
63 Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
64 ) -> Result<TOutput, TError>
65 where
66 TCapabilityProvider: ?Sized + Send + Sync + 'static,
67 <Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
68 Requires<TCapabilityProvider>,
69 TError: From<CommonError> + Send + Sync + 'static,
70 TOutput: Send + Sync + 'static, {
71 let (ResultSender, ResultReceiver) = tokio::sync::oneshot::channel::<Result<TOutput, TError>>();
72
73 let CapabilityProvider:Arc<TCapabilityProvider> = self.Environment.Require();
74
75 let Task = async move {
76 let Result = Effect.Apply(CapabilityProvider).await;
77
78 if ResultSender.send(Result).is_err() {
79 dev_log!(
80 "lifecycle",
81 "error: [ApplicationRunTime] Failed to send effect result; receiver was dropped."
82 );
83 }
84 };
85
86 self.Scheduler.Submit(Task, Priority::Normal);
87
88 match ResultReceiver.await {
89 Ok(Result) => Result,
90
91 Err(_RecvError) => {
92 let Message = "Effect execution canceled; oneshot channel closed.".to_string();
93
94 dev_log!("lifecycle", "error: {}", Message);
95
96 Err(CommonError::IPCError { Description:Message }.into())
97 },
98 }
99 }
100}
101
102impl ApplicationRunTime {
103 pub async fn RunWithTimeout<TCapabilityProvider, TError, TOutput>(
105 &self,
106 Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
107 timeout:std::time::Duration,
108 ) -> Result<TOutput, TError>
109 where
110 TCapabilityProvider: ?Sized + Send + Sync + 'static,
111 <Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
112 Requires<TCapabilityProvider>,
113 TError: From<CommonError> + Send + Sync + 'static,
114 TOutput: Send + Sync + 'static, {
115 tokio::time::timeout(timeout, ApplicationRunTimeTrait::Run(self, Effect))
116 .await
117 .map_err(|_| {
118 CommonError::Unknown { Description:format!("Effect execution timed out after {:?}", timeout) }.into()
119 })?
120 }
121
122 pub async fn RunWithRetry<TCapabilityProvider, TError, TOutput>(
124 &self,
125 Effect:ActionEffect<Arc<TCapabilityProvider>, TError, TOutput>,
126 max_retries:u32,
127 initial_delay:std::time::Duration,
128 ) -> Result<TOutput, TError>
129 where
130 TCapabilityProvider: ?Sized + Send + Sync + 'static,
131 <Self as CommonLibrary::Environment::HasEnvironment::HasEnvironment>::EnvironmentType:
132 Requires<TCapabilityProvider>,
133 TError: From<CommonError> + Send + Sync + 'static + std::fmt::Display,
134 TOutput: Send + Sync + 'static, {
135 let mut retry_count = 0;
136 let mut current_delay = initial_delay;
137
138 while retry_count <= max_retries {
139 match ApplicationRunTimeTrait::Run(self, Effect.clone()).await {
140 Ok(result) => return Ok(result),
141 Err(error) => {
142 if retry_count == max_retries {
143 return Err(error);
144 }
145
146 retry_count += 1;
147 dev_log!(
148 "lifecycle",
149 "warn: [ApplicationRunTime] Effect execution failed (attempt {}): {}. Retrying in {:?}...",
150 retry_count,
151 error,
152 current_delay
153 );
154
155 tokio::time::sleep(current_delay).await;
156
157 current_delay *= 2;
160 },
161 }
162 }
163
164 Err(
165 CommonError::Unknown { Description:format!("Effect execution failed after {} retries", max_retries) }
166 .into(),
167 )
168 }
169}