Field Notes.

One Billion CSV Rows in Rust

Cover Image for One Billion CSV Rows in Rust
Roger Rodriguez
Roger Rodriguez

I did not set out to process a billion CSV rows in a Lambda. At work, a large CSV parsing job dragged every time an export landed, and it started to sting. I had been experimenting with Rust on the side, so curiosity asked a rude question: could a tiny serverless function stream, decompress, transcode, and validate that data without blowing the memory limit?

The goal: take arbitrarily large CSVs landing in S3, reliably stream them, normalize to UTF-8, validate critical fields, and do it fast enough that Lambda does not tap its watch.

Constraints (aka the boss fight rules)

  • Memory is finite. We must process as a stream, not as a giant Vec<u8>.
  • Encoding isn't guaranteed. We normalize to UTF-8 on the fly.
  • Compression varies. Prefer headers, then content-type, then file extension.
  • We need backpressure-aware IO and predictable perf (hello, async Rust).
  • Reliability over bravado: partial failures should retry cleanly.

Architecture in a nutshell

  • S3 → event fanout via SQS → Lambda worker
  • Lambda pulls S3 object as a streaming ByteStream
  • Conditional decompression: gzip/zstd/none
  • On-the-fly transcoding to UTF-8 with a tokio_util::codec::Decoder
  • csv_async parses records without buffering the whole file
  • Minimal validation on required headers/fields (e.g., sku)
  • Observability baked in: timing for S3, parsing, totals

This is where curiosity meets a real problem. You will see how the pieces fit: S3 events become a stream of bytes, bytes become UTF-8 text, and rows get validated on the fly. No giant buffers, no guesswork, just steady throughput.

The tuning loop: memory, CPU, and small code wins

The first cut ran at 512 MB. It worked, but it felt like pedaling a bike uphill with a couch on the back. Parsing and decompression were CPU bound, so I started a simple loop: bump memory, measure, fix the next bottleneck, repeat. Lambda ties CPU to memory, which means a higher setting buys more parallelism for gzip and CSV parsing.

Early and late run logs

{"level":"INFO","fields":{"message":"Processed 99000000 rows from s3://bucket/key.csv.gz. Total time: 511.835476665s, S3 API latency: 210.547667ms, CSV streaming & parsing: 511.624928537s"}}
{"level":"INFO","fields":{"message":"Processed 99000000 rows from s3://bucket/key.csv.gz. Total time: 197.737875446s, S3 API latency: 114.730937ms, CSV streaming & parsing: 197.623144409s"}}

What changed between those two

  • Raised memory to unlock more CPU; kept well below the memory cap
  • Wrapped reads with BufReader to reduce syscalls
  • Reused a single StringRecord and precomputed header indices
  • Turned off per-row logging; log one timing summary per file
  • Ensured decompressor choice is deterministic and non-blocking

Memory vs time (99M row test file)

Lambda memoryApprox vCPUsTotal time
512 MB~0.38:31
1024 MB~0.66:02
2048 MB~1.24:01
3008 MB~1.83:31 ← chosen (best cost/perf)
4096 MB~2.43:17
5120 MB~3.03:05

The final production run used memorySize: 3008 for a good cost/performance balance and scaled to a 1,000,000,000 row file in about three minutes. No buffers, just a steady stream and enough CPU to keep the decompressor and parser fed.

The code

The full Lambda is below. It streams from S3, chooses the right decompressor, transcodes to UTF-8 using a custom Transcoder that implements Decoder, then hands a compatible reader to csv_async to iterate rows. Header presence is enforced early; row-level required fields are checked as we go.

csv_parser.rs
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use aws_config::BehaviorVersion;
use aws_lambda_events::{
    event::s3::S3Event,
    event::sqs::{SqsBatchResponse, SqsEvent},
    sqs::{BatchItemFailure, SqsMessage as SqsRecord},
};
use bytes::{Buf, BytesMut};
use csv_async::{AsyncReaderBuilder, StringRecord};
use encoding_rs;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use std::io;
use std::time::Instant;
use tokio::io::{AsyncRead, BufReader};
use tokio_util::codec::{Decoder, FramedRead};
use tokio_util::compat::TokioAsyncReadCompatExt;
use tokio_util::io::StreamReader;
 
struct Transcoder {
    decoder: encoding_rs::Decoder,
}
 
impl Transcoder {
    fn new(encoding: &'static encoding_rs::Encoding) -> Self {
        Self {
            decoder: encoding.new_decoder(),
        }
    }
}
 
impl Decoder for Transcoder {
    type Item = BytesMut;
    type Error = io::Error;
 
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if src.is_empty() {
            return Ok(None);
        }
 
        let mut temp_out = vec![
            0;
            self.decoder
                .max_utf8_buffer_length_without_replacement(src.len())
                .unwrap_or_else(|| src.len() * 2)
        ];
 
        let (_result, bytes_read, bytes_written, _has_errors) =
            self.decoder.decode_to_utf8(src, &mut temp_out, false);
 
        if bytes_read == 0 && bytes_written == 0 && !src.is_empty() {
            return Ok(None);
        }
 
        src.advance(bytes_read);
        Ok(Some(BytesMut::from(&temp_out[..bytes_written])))
    }
 
    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if buf.is_empty() {
            return Ok(None);
        }
        let mut temp_out = vec![
            0;
            self.decoder
                .max_utf8_buffer_length(buf.len())
                .unwrap_or_else(|| buf.len() * 2)
        ];
        let (_result, _bytes_read, bytes_written, _has_errors) =
            self.decoder.decode_to_utf8(buf, &mut temp_out, true);
 
        buf.clear();
 
        if bytes_written > 0 {
            Ok(Some(BytesMut::from(&temp_out[..bytes_written])))
        } else {
            Ok(None)
        }
    }
}
 
#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing::init_default_subscriber();
    run(service_fn(handler)).await
}
 
async fn handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
    let mut batch_item_failures = Vec::new();
 
    for sqs_record in event.payload.records {
        let message_id = sqs_record.message_id.clone().unwrap_or_default();
        if let Err(e) = process_record(&sqs_record).await {
            tracing::error!("Failed to process SQS message {}: {:?}", &message_id, e);
            batch_item_failures.push(BatchItemFailure {
                item_identifier: message_id,
            });
        }
    }
 
    Ok(SqsBatchResponse {
        batch_item_failures,
    })
}
 
async fn process_record(record: &SqsRecord) -> Result<(), Error> {
    let s3_event_str = record.body.as_deref().unwrap_or_default();
    let s3_event: S3Event = serde_json::from_str(s3_event_str)?;
 
    for s3_record in s3_event.records {
        let bucket = s3_record.s3.bucket.name.unwrap_or_default();
        let key = s3_record.s3.object.key.unwrap_or_default();
        tracing::info!("Received event for s3://{}/{}", bucket, key);
 
        if let Err(e) = process_s3_csv_file(&bucket, &key).await {
            tracing::error!(
                "Failed to process S3 file s3://{}/{}: {:?}",
                bucket,
                key,
                e
            );
            return Err(e);
        }
    }
 
    Ok(())
}
 
async fn process_s3_csv_file(bucket: &str, key: &str) -> Result<(), Error> {
    let start_time = Instant::now();
    let mut row_count = 0;
 
    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let s3_client = aws_sdk_s3::Client::new(&config);
 
    // 1. Grab the object – *body* is a streaming `ByteStream`
    let resp = s3_client
        .get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;
 
    let s3_latency = start_time.elapsed();
 
    // Grab metadata before we move the body.
    // Convert to owned strings to break the borrow on `resp`.
    let content_type = resp.content_type().unwrap_or_default().to_string();
    let content_encoding = resp.content_encoding().unwrap_or_default().to_string();
 
    // 2. Turn the ByteStream into something csv_async can read
    let parsing_start_time = Instant::now();
    let raw_stream = resp.body.into_async_read();
 
    // 1. Decompress (if needed)
    // Decompression logic order of precedence:
    // 1. Content-Encoding header
    // 2. Content-Type header
    // 3. File extension (as a fallback)
    let maybe_decompressed: Box<dyn AsyncRead + Unpin + Send> = match content_encoding.as_str() {
        "gzip" => {
            tracing::info!(
                "Decompressing gzipped file (from content-encoding): s3://{}/{}",
                bucket,
                key
            );
            Box::new(GzipDecoder::new(BufReader::new(raw_stream)))
        }
        "zstd" => {
            tracing::info!(
                "Decompressing zstd-compressed file (from content-encoding): s3://{}/{}",
                bucket,
                key
            );
            Box::new(ZstdDecoder::new(BufReader::new(raw_stream)))
        }
        _ => match content_type.as_str() {
            "application/gzip" | "application/x-gzip" => {
                tracing::info!(
                    "Decompressing gzipped file (from content-type): s3://{}/{}",
                    bucket,
                    key
                );
                Box::new(GzipDecoder::new(BufReader::new(raw_stream)))
            }
            "application/zstd" => {
                tracing::info!(
                    "Decompressing zstd-compressed file (from content-type): s3://{}/{}",
                    bucket,
                    key
                );
                Box::new(ZstdDecoder::new(BufReader::new(raw_stream)))
            }
            _ => {
                let ext = std::path::Path::new(key)
                    .extension()
                    .and_then(std::ffi::OsStr::to_str)
                    .unwrap_or("");
                match ext {
                    "gz" => {
                        tracing::info!(
                            "Decompressing gzipped file (fallback to extension): s3://{}/{}",
                            bucket,
                            key
                        );
                        Box::new(GzipDecoder::new(BufReader::new(raw_stream)))
                    }
                    "zst" => {
                        tracing::info!(
                            "Decompressing zstd-compressed file (fallback to extension): s3://{}/{}",
                            bucket,
                            key
                        );
                        Box::new(ZstdDecoder::new(BufReader::new(raw_stream)))
                    }
                    _ => Box::new(raw_stream),
                }
            }
        },
    };
 
    // 2. Transcode to UTF-8 using an async-compatible approach
    let transcoder = Transcoder::new(encoding_rs::UTF_8);
    let framed = FramedRead::new(maybe_decompressed, transcoder);
    let transcoded_stream_reader = StreamReader::new(framed);
 
    // 3. Feed into csv_async
    let mut rdr = AsyncReaderBuilder::new()
        .has_headers(true)
        .flexible(true)
        .create_reader(transcoded_stream_reader.compat());
 
    let headers = rdr.headers().await?.clone();
 
    // TODO: Replace this with a database call to fetch required headers.
    let required_headers: Vec<String> = vec!["sku".into()];
 
    let required_indices = required_headers
        .iter()
        .map(|req_h| {
            headers
                .iter()
                .position(|h| h == req_h)
                .ok_or_else(|| Error::from(format!("Missing required header: '{}' in CSV file", req_h)))
        })
        .collect::<Result<Vec<_>, Error>>()?;
 
    let mut record = StringRecord::new();
    while rdr.read_record(&mut record).await? {
        row_count += 1;
 
        if row_count == 1 {
            tracing::info!("First row: {:?}", record);
        }
 
        for (i, &header_index) in required_indices.iter().enumerate() {
            if record.get(header_index).is_none() {
                return Err(format!(
                    "Row #{} is missing required field '{}': {:?}",
                    row_count, required_headers[i], record
                )
                .into());
            }
        }
 
        // Commenting out to avoid performance impact from excessive logging.
        // tracing::info!("Record: {:?}", record);
    }
 
    let parsing_and_streaming_time = parsing_start_time.elapsed();
    let total_time = start_time.elapsed();
    tracing::info!(
        "Processed {} rows from s3://{}/{}. Total time: {:?}, S3 API latency: {:?}, CSV streaming & parsing: {:?}",
        row_count,
        bucket,
        key,
        total_time,
        s3_latency,
        parsing_and_streaming_time
    );
 
    Ok(())
}
 

Walkthrough

SQS fan-in and retry semantics

We accept SqsEvent, iterate messages, and return SqsBatchResponse with BatchItemFailures for any records that fail. That gives at-least-once delivery with per-message retries instead of failing the whole batch.

Smarter decompression

We choose a decompressor in this order of precedence:

  1. Content-Encoding header (most explicit)
  2. Content-Type (often accurate for gz)
  3. File extension as a last resort (.gz, .zst)

If none apply, we stream raw bytes. All of it is wrapped with BufReader and implemented via async_compression to keep things non-blocking.

Async transcoding to UTF-8

CSV data isn't always UTF-8. The custom Transcoder implements tokio_util::codec::Decoder, converting arbitrary input encodings to UTF-8 and yielding BytesMut frames. We then bridge it with StreamReader to expose an AsyncRead that csv_async can ingest.

CSV streaming and validation

csv_async is set to has_headers(true) and flexible(true) so we can survive row-length variance. We compute indices for required headers (sku in this draft) once, then validate presence per row without allocating new strings. Logging avoids per-row spam; we only peek at the first row for sanity.

Instrumentation

We capture S3 latency, parsing time, and total time. These numbers are essential for capacity planning and spotting regressions when input characteristics change.

Processed 1_000_000_000 rows from s3://bucket/key.csv.gz. Total: 00:24:13, S3: 00:00:03, Parse: 00:24:10

Edge cases we plan for

  • Non-UTF-8 encodings: transcoder normalizes to UTF-8 in-stream
  • Empty/partial chunks: decoder defers until it has enough bytes
  • Missing headers/fields: hard error with row number for debuggability
  • Mixed compression signals: header wins over content-type, which wins over extension
  • Over-logging: avoided by design; timing is summarized at the end

Practical checklist you can reuse

  • Use streaming APIs end-to-end; avoid buffered reads unless measured
  • Detect compression by header → type → extension
  • Normalize encodings before parsing; don't assume UTF-8
  • Validate required headers once; use indices for fast row checks
  • Treat retries as a feature: per-message failures only
  • Time the critical path and log once per file

What I'd do next

  • Persist validated rows to a sink (S3 parquet, DynamoDB, or a queue)
  • Batch writes with backpressure; measure tail latencies under bursty loads
  • Add schema evolution: optional columns with versioned validators
  • Surface metrics to CloudWatch dashboards and alarms

Closing

Parsing a billion rows isn't impressive because it's big; it's impressive because it forces discipline. Streams, not buffers. Measurements, not vibes. With Rust, the whole path stays honest and fast, even when the CSV isn't.