Skip to main content

Mountain/IPC/Enhanced/
MessageCompressor.rs

1//! # Message Compressor and Batching
2//!
3//! Advanced message compression and batching for IPC performance optimization.
4//! Supports Brotli compression for large payloads and intelligent message
5//! batching.
6
7use std::{
8	collections::VecDeque,
9	io::{Read, Write},
10};
11
12use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
13use flate2::{
14	Compression,
15	write::{GzEncoder, ZlibEncoder},
16};
17use serde::{Deserialize, Serialize};
18use tokio::time::Instant;
19use bincode::serde::{decode_from_slice, encode_to_vec};
20
21/// Message compression levels
22#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
23pub enum CompressionLevel {
24	Fast = 1,
25	Balanced = 6,
26	High = 11,
27}
28
29/// Compression algorithm selection
30#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
31pub enum CompressionAlgorithm {
32	Brotli,
33	Gzip,
34	Zlib,
35}
36
37/// Message batch configuration
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BatchConfig {
40	pub MaxBatchSize:usize,
41	pub MaxBatchDelayMs:u64,
42	pub CompressionThresholdBytes:usize,
43	pub CompressionLevel:CompressionLevel,
44	pub Algorithm:CompressionAlgorithm,
45}
46
47impl Default for BatchConfig {
48	fn default() -> Self {
49		Self {
50			MaxBatchSize:100,
51			MaxBatchDelayMs:100,
52			// Compression threshold: 1KB (messages smaller than this won't be compressed)
53			CompressionThresholdBytes:1024,
54			CompressionLevel:CompressionLevel::Balanced,
55			Algorithm:CompressionAlgorithm::Brotli,
56		}
57	}
58}
59
60/// Message compressor with batching capabilities
61pub struct MessageCompressor {
62	Config:BatchConfig,
63	CurrentBatch:VecDeque<Vec<u8>>,
64	BatchStartTime:Option<Instant>,
65	BatchSizeBytes:usize,
66}
67
68impl MessageCompressor {
69	/// Create a new message compressor with configuration
70	pub fn new(config:BatchConfig) -> Self {
71		Self {
72			Config:config,
73			CurrentBatch:VecDeque::new(),
74			BatchStartTime:None,
75			BatchSizeBytes:0,
76		}
77	}
78
79	/// Add a message to the current batch
80	pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
81		let MessageSize = MessageData.len();
82		let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
83
84		// Check if we should flush based on size
85		if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
86			// The batch has reached its configured size limit and cannot accept more
87			// messages. Caller should flush the batch before adding additional messages.
88			return false;
89		}
90
91		// Add message to batch
92		self.CurrentBatch.push_back(MessageData.to_vec());
93		self.BatchSizeBytes += MessageSize;
94
95		// Initialize batch timer if this is the first message
96		if self.BatchStartTime.is_none() {
97			self.BatchStartTime = Some(Instant::now());
98		}
99
100		true
101	}
102
103	/// Check if batch should be flushed
104	pub fn should_flush(&self) -> bool {
105		if self.CurrentBatch.is_empty() {
106			return false;
107		}
108
109		// Check batch size limit
110		if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
111			return true;
112		}
113
114		// Check time limit
115		if let Some(start_time) = self.BatchStartTime {
116			let elapsed = start_time.elapsed();
117			if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
118				return true;
119			}
120		}
121
122		false
123	}
124
125	/// Compress and flush the current batch
126	pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
127		if self.CurrentBatch.is_empty() {
128			return Err("No messages in batch to flush".to_string());
129		}
130
131		let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
132		let total_size = self.BatchSizeBytes;
133
134		// Reset batch state
135		self.BatchStartTime = None;
136		self.BatchSizeBytes = 0;
137
138		// Serialize batch using bincode 2.0 API
139		let config = bincode::config::standard();
140		let serialized_batch =
141			encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
142
143		// Compress if needed
144		let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
145			self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
146		} else {
147			Ok((None, CompressionInfo::none()))
148		}?;
149
150		Ok(CompressedBatch {
151			messages_count:BatchMessages.len(),
152			original_size:total_size,
153			compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size) as usize,
154			compressed_data:CompressedData,
155			compression_info,
156			timestamp:std::time::SystemTime::now()
157				.duration_since(std::time::UNIX_EPOCH)
158				.unwrap_or_default()
159				.as_millis() as u64,
160		})
161	}
162
163	/// Compress data using configured algorithm
164	fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
165		match self.Config.Algorithm {
166			CompressionAlgorithm::Brotli => self.compress_brotli(data),
167			CompressionAlgorithm::Gzip => self.compress_gzip(data),
168			CompressionAlgorithm::Zlib => self.compress_zlib(data),
169		}
170	}
171
172	/// Compress using Brotli algorithm
173	fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
174		let mut params = BrotliEncoderParams::default();
175		params.quality = self.Config.CompressionLevel as i32;
176
177		let mut compressed = Vec::new();
178		{
179			let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), &params);
180			std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
181			writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
182		} // writer dropped here, release borrow on compressed
183
184		let ratio = data.len() as f64 / compressed.len() as f64;
185
186		Ok((
187			compressed,
188			CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
189		))
190	}
191
192	/// Compress using Gzip algorithm
193	fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
194		let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
195		encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
196
197		let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
198
199		let ratio = data.len() as f64 / compressed.len() as f64;
200
201		Ok((
202			compressed,
203			CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
204		))
205	}
206
207	/// Compress using Zlib algorithm
208	fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
209		let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
210		encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
211
212		let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
213
214		let ratio = data.len() as f64 / compressed.len() as f64;
215
216		Ok((
217			compressed,
218			CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
219		))
220	}
221
222	/// Decompress a batch
223	pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
224		let data = if let Some(ref compressed_data) = batch.compressed_data {
225			self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
226		} else {
227			encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
228		};
229
230		let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
231			.map_err(|e| format!("Failed to deserialize batch: {}", e))?;
232		Ok(decoded)
233	}
234
235	/// Decompress data using specified algorithm
236	fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
237		match algorithm {
238			"brotli" => self.decompress_brotli(data),
239			"gzip" => self.decompress_gzip(data),
240			"zlib" => self.decompress_zlib(data),
241			_ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
242		}
243	}
244
245	/// Decompress Brotli data
246	fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
247		let mut decompressed = Vec::new();
248		let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
249
250		std::io::Read::read_to_end(&mut reader, &mut decompressed)
251			.map_err(|e| format!("Brotli decompression failed: {}", e))?;
252
253		Ok(decompressed)
254	}
255
256	/// Decompress Gzip data
257	fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
258		use flate2::read::GzDecoder;
259
260		let mut decoder = GzDecoder::new(data);
261		let mut decompressed = Vec::new();
262		decoder
263			.read_to_end(&mut decompressed)
264			.map_err(|e| format!("Gzip decompression failed: {}", e))?;
265
266		Ok(decompressed)
267	}
268
269	/// Decompress Zlib data
270	fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
271		use flate2::read::ZlibDecoder;
272
273		let mut decoder = ZlibDecoder::new(data);
274		let mut decompressed = Vec::new();
275		decoder
276			.read_to_end(&mut decompressed)
277			.map_err(|e| format!("Zlib decompression failed: {}", e))?;
278
279		Ok(decompressed)
280	}
281
282	/// Get current batch statistics
283	pub fn get_batch_stats(&self) -> BatchStats {
284		BatchStats {
285			messages_count:self.CurrentBatch.len(),
286			total_size_bytes:self.BatchSizeBytes,
287			batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
288		}
289	}
290
291	/// Clear current batch without flushing
292	pub fn clear_batch(&mut self) {
293		self.CurrentBatch.clear();
294		self.BatchStartTime = None;
295		self.BatchSizeBytes = 0;
296	}
297}
298
299/// Compressed batch structure
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct CompressedBatch {
302	pub messages_count:usize,
303	pub original_size:usize,
304	pub compressed_size:usize,
305	pub compressed_data:Option<Vec<u8>>,
306	pub compression_info:CompressionInfo,
307	pub timestamp:u64,
308}
309
310/// Compression information
311#[derive(Debug, Clone, Serialize, Deserialize)]
312pub struct CompressionInfo {
313	pub algorithm:String,
314	pub level:u32,
315	pub ratio:f64,
316}
317
318impl CompressionInfo {
319	fn none() -> Self { Self { algorithm:"none".to_string(), level:0, ratio:1.0 } }
320}
321
322/// Batch statistics
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct BatchStats {
325	pub messages_count:usize,
326	pub total_size_bytes:usize,
327	pub batch_age_ms:u64,
328}
329
330/// Utility functions for message compression
331impl MessageCompressor {
332	/// Compress a single message
333	pub fn compress_single_message(
334		message_data:&[u8],
335		algorithm:CompressionAlgorithm,
336		level:CompressionLevel,
337	) -> Result<(Vec<u8>, CompressionInfo), String> {
338		let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
339
340		let compressor = MessageCompressor::new(config);
341		compressor.compress_data(message_data)
342	}
343
344	/// Calculate compression ratio
345	pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
346		if compressed_size == 0 {
347			return 0.0;
348		}
349		original_size as f64 / compressed_size as f64
350	}
351
352	/// Estimate compression savings
353	pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
354		(original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
355	}
356}
357
358#[cfg(test)]
359mod tests {
360	use super::*;
361
362	#[test]
363	fn test_message_compression() {
364		let test_data = b"This is a test message for compression evaluation".to_vec();
365
366		// Test Brotli compression
367		let (compressed, info) = MessageCompressor::compress_single_message(
368			&test_data,
369			CompressionAlgorithm::Brotli,
370			CompressionLevel::Balanced,
371		)
372		.unwrap();
373
374		assert!(compressed.len() < test_data.len());
375		assert!(info.ratio > 1.0);
376		assert_eq!(info.algorithm, "brotli");
377
378		// Test Gzip compression
379		let (compressed_gzip, info_gzip) = MessageCompressor::compress_single_message(
380			&test_data,
381			CompressionAlgorithm::Gzip,
382			CompressionLevel::Balanced,
383		)
384		.unwrap();
385
386		assert!(compressed_gzip.len() < test_data.len());
387		assert!(info_gzip.ratio > 1.0);
388		assert_eq!(info_gzip.algorithm, "gzip");
389	}
390
391	#[test]
392	fn test_batch_compression() {
393		let config = BatchConfig::default();
394		let mut compressor = MessageCompressor::new(config);
395
396		let messages:Vec<Vec<u8>> = (0..5).map(|i| format!("Message {}", i).into_bytes()).collect();
397
398		for message in &messages {
399			compressor.add_message(message);
400		}
401
402		assert!(compressor.should_flush());
403
404		let batch = compressor.flush_batch().unwrap();
405		assert_eq!(batch.messages_count, 5);
406		assert!(batch.compressed_size <= batch.original_size);
407	}
408
409	#[test]
410	fn test_compression_ratio_calculation() {
411		let ratio = MessageCompressor::calculate_compression_ratio(1000, 500);
412		assert_eq!(ratio, 2.0);
413
414		let savings = MessageCompressor::estimate_savings(1000, 2.0);
415		assert_eq!(savings, 500);
416	}
417}