1use std::{
8 collections::VecDeque,
9 io::{Read, Write},
10};
11
12use brotli::{CompressorReader, CompressorWriter, enc::BrotliEncoderParams};
13use flate2::{
14 Compression,
15 write::{GzEncoder, ZlibEncoder},
16};
17use serde::{Deserialize, Serialize};
18use tokio::time::Instant;
19use bincode::serde::{decode_from_slice, encode_to_vec};
20
21#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
23pub enum CompressionLevel {
24 Fast = 1,
25 Balanced = 6,
26 High = 11,
27}
28
29#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
31pub enum CompressionAlgorithm {
32 Brotli,
33 Gzip,
34 Zlib,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BatchConfig {
40 pub MaxBatchSize:usize,
41 pub MaxBatchDelayMs:u64,
42 pub CompressionThresholdBytes:usize,
43 pub CompressionLevel:CompressionLevel,
44 pub Algorithm:CompressionAlgorithm,
45}
46
47impl Default for BatchConfig {
48 fn default() -> Self {
49 Self {
50 MaxBatchSize:100,
51 MaxBatchDelayMs:100,
52 CompressionThresholdBytes:1024,
54 CompressionLevel:CompressionLevel::Balanced,
55 Algorithm:CompressionAlgorithm::Brotli,
56 }
57 }
58}
59
60pub struct MessageCompressor {
62 Config:BatchConfig,
63 CurrentBatch:VecDeque<Vec<u8>>,
64 BatchStartTime:Option<Instant>,
65 BatchSizeBytes:usize,
66}
67
68impl MessageCompressor {
69 pub fn new(config:BatchConfig) -> Self {
71 Self {
72 Config:config,
73 CurrentBatch:VecDeque::new(),
74 BatchStartTime:None,
75 BatchSizeBytes:0,
76 }
77 }
78
79 pub fn add_message(&mut self, MessageData:&[u8]) -> bool {
81 let MessageSize = MessageData.len();
82 let _should_compress = MessageSize >= self.Config.CompressionThresholdBytes;
83
84 if self.BatchSizeBytes + MessageSize > self.Config.MaxBatchSize * 1024 {
86 return false;
89 }
90
91 self.CurrentBatch.push_back(MessageData.to_vec());
93 self.BatchSizeBytes += MessageSize;
94
95 if self.BatchStartTime.is_none() {
97 self.BatchStartTime = Some(Instant::now());
98 }
99
100 true
101 }
102
103 pub fn should_flush(&self) -> bool {
105 if self.CurrentBatch.is_empty() {
106 return false;
107 }
108
109 if self.CurrentBatch.len() >= self.Config.MaxBatchSize {
111 return true;
112 }
113
114 if let Some(start_time) = self.BatchStartTime {
116 let elapsed = start_time.elapsed();
117 if elapsed.as_millis() >= self.Config.MaxBatchDelayMs as u128 {
118 return true;
119 }
120 }
121
122 false
123 }
124
125 pub fn flush_batch(&mut self) -> Result<CompressedBatch, String> {
127 if self.CurrentBatch.is_empty() {
128 return Err("No messages in batch to flush".to_string());
129 }
130
131 let BatchMessages:Vec<Vec<u8>> = self.CurrentBatch.drain(..).collect();
132 let total_size = self.BatchSizeBytes;
133
134 self.BatchStartTime = None;
136 self.BatchSizeBytes = 0;
137
138 let config = bincode::config::standard();
140 let serialized_batch =
141 encode_to_vec(&BatchMessages, config).map_err(|e| format!("Failed to serialize batch: {}", e))?;
142
143 let (CompressedData, compression_info) = if total_size >= self.Config.CompressionThresholdBytes {
145 self.compress_data(&serialized_batch).map(|(data, info)| (Some(data), info))
146 } else {
147 Ok((None, CompressionInfo::none()))
148 }?;
149
150 Ok(CompressedBatch {
151 messages_count:BatchMessages.len(),
152 original_size:total_size,
153 compressed_size:CompressedData.as_ref().map(|d| d.len()).unwrap_or(total_size) as usize,
154 compressed_data:CompressedData,
155 compression_info,
156 timestamp:std::time::SystemTime::now()
157 .duration_since(std::time::UNIX_EPOCH)
158 .unwrap_or_default()
159 .as_millis() as u64,
160 })
161 }
162
163 fn compress_data(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
165 match self.Config.Algorithm {
166 CompressionAlgorithm::Brotli => self.compress_brotli(data),
167 CompressionAlgorithm::Gzip => self.compress_gzip(data),
168 CompressionAlgorithm::Zlib => self.compress_zlib(data),
169 }
170 }
171
172 fn compress_brotli(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
174 let mut params = BrotliEncoderParams::default();
175 params.quality = self.Config.CompressionLevel as i32;
176
177 let mut compressed = Vec::new();
178 {
179 let mut writer = CompressorWriter::with_params(&mut compressed, data.len().try_into().unwrap(), ¶ms);
180 std::io::Write::write_all(&mut writer, data).map_err(|e| format!("Brotli compression failed: {}", e))?;
181 writer.flush().map_err(|e| format!("Brotli flush failed: {}", e))?;
182 } let ratio = data.len() as f64 / compressed.len() as f64;
185
186 Ok((
187 compressed,
188 CompressionInfo { algorithm:"brotli".to_string(), level:self.Config.CompressionLevel as u32, ratio },
189 ))
190 }
191
192 fn compress_gzip(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
194 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
195 encoder.write_all(data).map_err(|e| format!("Gzip compression failed: {}", e))?;
196
197 let compressed = encoder.finish().map_err(|e| format!("Gzip finish failed: {}", e))?;
198
199 let ratio = data.len() as f64 / compressed.len() as f64;
200
201 Ok((
202 compressed,
203 CompressionInfo { algorithm:"gzip".to_string(), level:self.Config.CompressionLevel as u32, ratio },
204 ))
205 }
206
207 fn compress_zlib(&self, data:&[u8]) -> Result<(Vec<u8>, CompressionInfo), String> {
209 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::new(self.Config.CompressionLevel as u32));
210 encoder.write_all(data).map_err(|e| format!("Zlib compression failed: {}", e))?;
211
212 let compressed = encoder.finish().map_err(|e| format!("Zlib finish failed: {}", e))?;
213
214 let ratio = data.len() as f64 / compressed.len() as f64;
215
216 Ok((
217 compressed,
218 CompressionInfo { algorithm:"zlib".to_string(), level:self.Config.CompressionLevel as u32, ratio },
219 ))
220 }
221
222 pub fn decompress_batch(&self, batch:&CompressedBatch) -> Result<Vec<Vec<u8>>, String> {
224 let data = if let Some(ref compressed_data) = batch.compressed_data {
225 self.decompress_data(compressed_data, &batch.compression_info.algorithm)?
226 } else {
227 encode_to_vec(&batch, bincode::config::standard()).map_err(|e| format!("Serialization failed: {}", e))?
228 };
229
230 let (decoded, _) = decode_from_slice::<Vec<Vec<u8>>, _>(&data, bincode::config::standard())
231 .map_err(|e| format!("Failed to deserialize batch: {}", e))?;
232 Ok(decoded)
233 }
234
235 fn decompress_data(&self, data:&[u8], algorithm:&str) -> Result<Vec<u8>, String> {
237 match algorithm {
238 "brotli" => self.decompress_brotli(data),
239 "gzip" => self.decompress_gzip(data),
240 "zlib" => self.decompress_zlib(data),
241 _ => Err(format!("Unsupported compression algorithm: {}", algorithm)),
242 }
243 }
244
245 fn decompress_brotli(&self, data:&[u8]) -> Result<Vec<u8>, String> {
247 let mut decompressed = Vec::new();
248 let mut reader = CompressorReader::new(data, 0, data.len().try_into().unwrap(), data.len().try_into().unwrap());
249
250 std::io::Read::read_to_end(&mut reader, &mut decompressed)
251 .map_err(|e| format!("Brotli decompression failed: {}", e))?;
252
253 Ok(decompressed)
254 }
255
256 fn decompress_gzip(&self, data:&[u8]) -> Result<Vec<u8>, String> {
258 use flate2::read::GzDecoder;
259
260 let mut decoder = GzDecoder::new(data);
261 let mut decompressed = Vec::new();
262 decoder
263 .read_to_end(&mut decompressed)
264 .map_err(|e| format!("Gzip decompression failed: {}", e))?;
265
266 Ok(decompressed)
267 }
268
269 fn decompress_zlib(&self, data:&[u8]) -> Result<Vec<u8>, String> {
271 use flate2::read::ZlibDecoder;
272
273 let mut decoder = ZlibDecoder::new(data);
274 let mut decompressed = Vec::new();
275 decoder
276 .read_to_end(&mut decompressed)
277 .map_err(|e| format!("Zlib decompression failed: {}", e))?;
278
279 Ok(decompressed)
280 }
281
282 pub fn get_batch_stats(&self) -> BatchStats {
284 BatchStats {
285 messages_count:self.CurrentBatch.len(),
286 total_size_bytes:self.BatchSizeBytes,
287 batch_age_ms:self.BatchStartTime.map(|t| t.elapsed().as_millis() as u64).unwrap_or(0),
288 }
289 }
290
291 pub fn clear_batch(&mut self) {
293 self.CurrentBatch.clear();
294 self.BatchStartTime = None;
295 self.BatchSizeBytes = 0;
296 }
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct CompressedBatch {
302 pub messages_count:usize,
303 pub original_size:usize,
304 pub compressed_size:usize,
305 pub compressed_data:Option<Vec<u8>>,
306 pub compression_info:CompressionInfo,
307 pub timestamp:u64,
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize)]
312pub struct CompressionInfo {
313 pub algorithm:String,
314 pub level:u32,
315 pub ratio:f64,
316}
317
318impl CompressionInfo {
319 fn none() -> Self { Self { algorithm:"none".to_string(), level:0, ratio:1.0 } }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct BatchStats {
325 pub messages_count:usize,
326 pub total_size_bytes:usize,
327 pub batch_age_ms:u64,
328}
329
330impl MessageCompressor {
332 pub fn compress_single_message(
334 message_data:&[u8],
335 algorithm:CompressionAlgorithm,
336 level:CompressionLevel,
337 ) -> Result<(Vec<u8>, CompressionInfo), String> {
338 let config = BatchConfig { Algorithm:algorithm, CompressionLevel:level, ..Default::default() };
339
340 let compressor = MessageCompressor::new(config);
341 compressor.compress_data(message_data)
342 }
343
344 pub fn calculate_compression_ratio(original_size:usize, compressed_size:usize) -> f64 {
346 if compressed_size == 0 {
347 return 0.0;
348 }
349 original_size as f64 / compressed_size as f64
350 }
351
352 pub fn estimate_savings(original_size:usize, expected_ratio:f64) -> usize {
354 (original_size as f64 * (1.0 - 1.0 / expected_ratio)) as usize
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_message_compression() {
364 let test_data = b"This is a test message for compression evaluation".to_vec();
365
366 let (compressed, info) = MessageCompressor::compress_single_message(
368 &test_data,
369 CompressionAlgorithm::Brotli,
370 CompressionLevel::Balanced,
371 )
372 .unwrap();
373
374 assert!(compressed.len() < test_data.len());
375 assert!(info.ratio > 1.0);
376 assert_eq!(info.algorithm, "brotli");
377
378 let (compressed_gzip, info_gzip) = MessageCompressor::compress_single_message(
380 &test_data,
381 CompressionAlgorithm::Gzip,
382 CompressionLevel::Balanced,
383 )
384 .unwrap();
385
386 assert!(compressed_gzip.len() < test_data.len());
387 assert!(info_gzip.ratio > 1.0);
388 assert_eq!(info_gzip.algorithm, "gzip");
389 }
390
391 #[test]
392 fn test_batch_compression() {
393 let config = BatchConfig::default();
394 let mut compressor = MessageCompressor::new(config);
395
396 let messages:Vec<Vec<u8>> = (0..5).map(|i| format!("Message {}", i).into_bytes()).collect();
397
398 for message in &messages {
399 compressor.add_message(message);
400 }
401
402 assert!(compressor.should_flush());
403
404 let batch = compressor.flush_batch().unwrap();
405 assert_eq!(batch.messages_count, 5);
406 assert!(batch.compressed_size <= batch.original_size);
407 }
408
409 #[test]
410 fn test_compression_ratio_calculation() {
411 let ratio = MessageCompressor::calculate_compression_ratio(1000, 500);
412 assert_eq!(ratio, 2.0);
413
414 let savings = MessageCompressor::estimate_savings(1000, 2.0);
415 assert_eq!(savings, 500);
416 }
417}