I did not set out to process a billion CSV rows in a Lambda. At work, a large CSV parsing job dragged every time an export landed, and it started to sting. I had been experimenting with Rust on the side, so curiosity asked a rude question: could a tiny serverless function stream, decompress, transcode, and validate that data without blowing the memory limit?
The goal: take arbitrarily large CSVs landing in S3, reliably stream them, normalize to UTF-8, validate critical fields, and do it fast enough that Lambda does not tap its watch.
The twist: we can't load the file into memory, can't assume UTF-8, and can't trust headers. Oh, and some files are gz or zstd. Fun.
On-the-fly transcoding to UTF-8 with a tokio_util::codec::Decoder
csv_async parses records without buffering the whole file
Minimal validation on required headers/fields (e.g., sku)
Observability baked in: timing for S3, parsing, totals
Why Rust? Because predictability matters. Async + zero-cost abstractions + control over allocations and backpressure. Also: fast.
This is where curiosity meets a real problem. You will see how the pieces fit: S3 events become a stream of bytes, bytes become UTF-8 text, and rows get validated on the fly. No giant buffers, no guesswork, just steady throughput.
The first cut ran at 512 MB. It worked, but it felt like pedaling a bike uphill with a couch on the back. Parsing and decompression were CPU bound, so I started a simple loop: bump memory, measure, fix the next bottleneck, repeat. Lambda ties CPU to memory, which means a higher setting buys more parallelism for gzip and CSV parsing.
Rule of thumb: if the job is streaming and CPU bound, try raising memory before rewriting everything. You get CPU and network for free with that slider.
The final production run used memorySize: 3008 for a good cost/performance balance and scaled to a 1,000,000,000 row file in about three minutes. No buffers, just a steady stream and enough CPU to keep the decompressor and parser fed.
The full Lambda is below. It streams from S3, chooses the right decompressor, transcodes to UTF-8 using a custom Transcoder that implements Decoder, then hands a compatible reader to csv_async to iterate rows. Header presence is enforced early; row-level required fields are checked as we go.
csv_parser.rs
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};use aws_config::BehaviorVersion;use aws_lambda_events::{ event::s3::S3Event, event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage as SqsRecord},};use bytes::{Buf, BytesMut};use csv_async::{AsyncReaderBuilder, StringRecord};use encoding_rs;use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};use std::io;use std::time::Instant;use tokio::io::{AsyncRead, BufReader};use tokio_util::codec::{Decoder, FramedRead};use tokio_util::compat::TokioAsyncReadCompatExt;use tokio_util::io::StreamReader;struct Transcoder { decoder: encoding_rs::Decoder,}impl Transcoder { fn new(encoding: &'static encoding_rs::Encoding) -> Self { Self { decoder: encoding.new_decoder(), } }}impl Decoder for Transcoder { type Item = BytesMut; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { if src.is_empty() { return Ok(None); } let mut temp_out = vec![ 0; self.decoder .max_utf8_buffer_length_without_replacement(src.len()) .unwrap_or_else(|| src.len() * 2) ]; let (_result, bytes_read, bytes_written, _has_errors) = self.decoder.decode_to_utf8(src, &mut temp_out, false); if bytes_read == 0 && bytes_written == 0 && !src.is_empty() { return Ok(None); } src.advance(bytes_read); Ok(Some(BytesMut::from(&temp_out[..bytes_written]))) } fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { if buf.is_empty() { return Ok(None); } let mut temp_out = vec![ 0; self.decoder .max_utf8_buffer_length(buf.len()) .unwrap_or_else(|| buf.len() * 2) ]; let (_result, _bytes_read, bytes_written, _has_errors) = self.decoder.decode_to_utf8(buf, &mut temp_out, true); buf.clear(); if bytes_written > 0 { Ok(Some(BytesMut::from(&temp_out[..bytes_written]))) } else { Ok(None) } }}#[tokio::main]async fn main() -> Result<(), Error> { tracing::init_default_subscriber(); run(service_fn(handler)).await}async fn handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for sqs_record in event.payload.records { let message_id = sqs_record.message_id.clone().unwrap_or_default(); if let Err(e) = process_record(&sqs_record).await { tracing::error!("Failed to process SQS message {}: {:?}", &message_id, e); batch_item_failures.push(BatchItemFailure { item_identifier: message_id, }); } } Ok(SqsBatchResponse { batch_item_failures, })}async fn process_record(record: &SqsRecord) -> Result<(), Error> { let s3_event_str = record.body.as_deref().unwrap_or_default(); let s3_event: S3Event = serde_json::from_str(s3_event_str)?; for s3_record in s3_event.records { let bucket = s3_record.s3.bucket.name.unwrap_or_default(); let key = s3_record.s3.object.key.unwrap_or_default(); tracing::info!("Received event for s3://{}/{}", bucket, key); if let Err(e) = process_s3_csv_file(&bucket, &key).await { tracing::error!( "Failed to process S3 file s3://{}/{}: {:?}", bucket, key, e ); return Err(e); } } Ok(())}async fn process_s3_csv_file(bucket: &str, key: &str) -> Result<(), Error> { let start_time = Instant::now(); let mut row_count = 0; let config = aws_config::load_defaults(BehaviorVersion::latest()).await; let s3_client = aws_sdk_s3::Client::new(&config); // 1. Grab the object – *body* is a streaming `ByteStream` let resp = s3_client .get_object() .bucket(bucket) .key(key) .send() .await?; let s3_latency = start_time.elapsed(); // Grab metadata before we move the body. // Convert to owned strings to break the borrow on `resp`. let content_type = resp.content_type().unwrap_or_default().to_string(); let content_encoding = resp.content_encoding().unwrap_or_default().to_string(); // 2. Turn the ByteStream into something csv_async can read let parsing_start_time = Instant::now(); let raw_stream = resp.body.into_async_read(); // 1. Decompress (if needed) // Decompression logic order of precedence: // 1. Content-Encoding header // 2. Content-Type header // 3. File extension (as a fallback) let maybe_decompressed: Box<dyn AsyncRead + Unpin + Send> = match content_encoding.as_str() { "gzip" => { tracing::info!( "Decompressing gzipped file (from content-encoding): s3://{}/{}", bucket, key ); Box::new(GzipDecoder::new(BufReader::new(raw_stream))) } "zstd" => { tracing::info!( "Decompressing zstd-compressed file (from content-encoding): s3://{}/{}", bucket, key ); Box::new(ZstdDecoder::new(BufReader::new(raw_stream))) } _ => match content_type.as_str() { "application/gzip" | "application/x-gzip" => { tracing::info!( "Decompressing gzipped file (from content-type): s3://{}/{}", bucket, key ); Box::new(GzipDecoder::new(BufReader::new(raw_stream))) } "application/zstd" => { tracing::info!( "Decompressing zstd-compressed file (from content-type): s3://{}/{}", bucket, key ); Box::new(ZstdDecoder::new(BufReader::new(raw_stream))) } _ => { let ext = std::path::Path::new(key) .extension() .and_then(std::ffi::OsStr::to_str) .unwrap_or(""); match ext { "gz" => { tracing::info!( "Decompressing gzipped file (fallback to extension): s3://{}/{}", bucket, key ); Box::new(GzipDecoder::new(BufReader::new(raw_stream))) } "zst" => { tracing::info!( "Decompressing zstd-compressed file (fallback to extension): s3://{}/{}", bucket, key ); Box::new(ZstdDecoder::new(BufReader::new(raw_stream))) } _ => Box::new(raw_stream), } } }, }; // 2. Transcode to UTF-8 using an async-compatible approach let transcoder = Transcoder::new(encoding_rs::UTF_8); let framed = FramedRead::new(maybe_decompressed, transcoder); let transcoded_stream_reader = StreamReader::new(framed); // 3. Feed into csv_async let mut rdr = AsyncReaderBuilder::new() .has_headers(true) .flexible(true) .create_reader(transcoded_stream_reader.compat()); let headers = rdr.headers().await?.clone(); // TODO: Replace this with a database call to fetch required headers. let required_headers: Vec<String> = vec!["sku".into()]; let required_indices = required_headers .iter() .map(|req_h| { headers .iter() .position(|h| h == req_h) .ok_or_else(|| Error::from(format!("Missing required header: '{}' in CSV file", req_h))) }) .collect::<Result<Vec<_>, Error>>()?; let mut record = StringRecord::new(); while rdr.read_record(&mut record).await? { row_count += 1; if row_count == 1 { tracing::info!("First row: {:?}", record); } for (i, &header_index) in required_indices.iter().enumerate() { if record.get(header_index).is_none() { return Err(format!( "Row #{} is missing required field '{}': {:?}", row_count, required_headers[i], record ) .into()); } } // Commenting out to avoid performance impact from excessive logging. // tracing::info!("Record: {:?}", record); } let parsing_and_streaming_time = parsing_start_time.elapsed(); let total_time = start_time.elapsed(); tracing::info!( "Processed {} rows from s3://{}/{}. Total time: {:?}, S3 API latency: {:?}, CSV streaming & parsing: {:?}", row_count, bucket, key, total_time, s3_latency, parsing_and_streaming_time ); Ok(())}
We accept SqsEvent, iterate messages, and return SqsBatchResponse with BatchItemFailures for any records that fail. That gives at-least-once delivery with per-message retries instead of failing the whole batch.
CSV data isn't always UTF-8. The custom Transcoder implements tokio_util::codec::Decoder, converting arbitrary input encodings to UTF-8 and yielding BytesMut frames. We then bridge it with StreamReader to expose an AsyncRead that csv_async can ingest.
csv_async is set to has_headers(true) and flexible(true) so we can survive row-length variance. We compute indices for required headers (sku in this draft) once, then validate presence per row without allocating new strings. Logging avoids per-row spam; we only peek at the first row for sanity.
We capture S3 latency, parsing time, and total time. These numbers are essential for capacity planning and spotting regressions when input characteristics change.
Parsing a billion rows isn't impressive because it's big; it's impressive because it forces discipline. Streams, not buffers. Measurements, not vibes. With Rust, the whole path stays honest and fast, even when the CSV isn't.