Auto Loader: The End of File Listing Hell
If you’ve ever tracked processed files by hand, this one’s for you.
Some years ago, it was a straightforward way to think how to identify which .parquet files your pipeline had already processed or not. We could use a database to save the metadata for each file such as file_name, storage_location, timestamps, or we could use a simple process to read all files available, process each one and move them to another path like processed.
Regardless of the way you did or do, there is a complexity to maintain the code and it can affect the pipeline performance.
I don’t know if you are still using these methods at your Databricks Data Lakehouse, but if you are, you should take a look at Auto Loader.
Auto Loader is basically Spark Structured Streaming with a little trick. We use the cloudFiles as a source and magically it will identify new files at your cloud storage such as AWS S3 and Azure Data Lake Storage.
In my case, this was a game changer because we were ingesting hundred of tables with hundreds of files per day, and my team could optimize a pipeline that took 1h30min to 15min.
With that you don’t need to worry which files need to be read or which were already read. Auto Loader takes care of all of it for you. And you take care of what’s most important: bringing value to your company.
Example of Reading
The command itself is very simple:
data_path = "s3://your-bucket/raw/"
schema_path = "s3://your-bucket/schema/table-name"
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", schema_path)
.load(data_path)
)
Components
.format("cloudFiles"): enable Auto Loader.option("cloudFiles.schemaLocation", schema_path): metadata such as data types inferred, schema evolution process, which files were read
Schema Evolution
To handle the data schema evolution, Auto Loader gives some options:
- addNewColumns: will add any new columns to the delta table
- rescue: will not add new columns but will create a column called
_rescued_datawith the new column data - failOnNewColumns: in case of new column, the pipeline will fail
- none: don’t add new column and you need to provide the data schema
The statement is:
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") # "rescue" or "failOnNewColumns" or "none"
Example of Writing
After you set the dataframe to read your data at cloud storage using Auto Loader, you need to save the data elsewhere. Here, we can have basically 2 approaches:
- create an as-is delta table
- create a delta table with transformations
As-is Delta Table
The command is very simple and similar to Spark Structured Streaming:
checkpoint_location = "s3://your-bucket/delta_table/table_name/checkpoint"
(
df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_location)
.table("table_name")
)
Delta Table with Transformation
Here we need to change a little bit how the code is triggered using foreachBatch:
checkpoint_location = "s3://your-bucket/delta_table/table_name/checkpoint"
def process_batch(batch_df, batch_id):
transformed_df = (
batch_df
.withColumn("processed_at", current_timestamp())
.filter(col("status") == "active")
.select("id", "name", "processed_at")
)
transformed_df.write.format("delta").mode("append").saveAsTable("table_name")
(
df.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", checkpoint_location)
.start()
)
The foreachBatch allows you to apply any Spark transformation before writing the data. This is useful when you need to clean, enrich, or reshape your data before persisting it.
How Auto Loader Works Under the Hood
Auto Loader uses two different mechanisms depending on your cloud provider and configuration:
File Notification Mode
In most production scenarios, this ends up being the best option. Auto Loader subscribes to cloud storage events (like AWS S3 Event Notifications or Azure Event Grid) to detect new files in real-time.
Benefits:
- Near real-time file detection
- Lower cloud storage API costs
- Better performance at scale
To enable it, you need to set up cloud notifications and add:
.option("cloudFiles.useNotifications", "true")
Directory Listing Mode
This is the default fallback mode. Auto Loader periodically lists the directory to find new files.
It’s simpler to set up (no cloud configuration needed) but has some tradeoffs:
- Higher latency
- More API calls to your cloud storage
- Can be slower with millions of files
File Metadata: Your Best Friend for Debugging
Auto Loader can automatically include file metadata in your dataframe. This became a lifesaver for us when we needed to understand which specific file caused an issue or when a pipeline behaved unexpectedly.
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", schema_path)
.option("cloudFiles.includeExistingFiles", "true") # process existing files on first run
.load(data_path)
)
After loading, you have access to metadata columns:
df_with_metadata = df.selectExpr(
"*",
"_metadata.file_path as source_file",
"_metadata.file_name as file_name",
"_metadata.file_modification_time as modified_at"
)
This is incredibly useful for:
- Debugging: “Which file had this corrupted record?”
- Auditing: “When was this data ingested?”
- Reprocessing: “I need to reprocess files from last Tuesday only”
You can also use these metadata columns in your transformations:
def process_batch(batch_df, batch_id):
processed_df = (
batch_df
.withColumn("ingestion_timestamp", current_timestamp())
.withColumn("source_file", col("_metadata.file_name"))
.withColumn("file_size", col("_metadata.file_size"))
)
processed_df.write.format("delta").mode("append").saveAsTable("table_name")
Now you have full traceability of where each record came from.
Cost Optimization: Don’t Burn Money
Auto Loader is already efficient by design, but there are a few things we learned along the way to make it even more cost-effective:
1. Use File Notification Mode
This is the big one. Instead of constantly listing directories (which generates API calls), use cloud events:
.option("cloudFiles.useNotifications", "true")
Impact: Reduces S3/ADLS API costs significantly, especially with large directories.
2. Control Throughput
Don’t process everything at once. Control how many files are processed per trigger:
.option("cloudFiles.maxFilesPerTrigger", "1000")
This helps with:
- Controlling cluster usage
- Avoiding overwhelming downstream systems
- Predictable cost per run
3. Keep Checkpoint and Schema Close to Data
Store your checkpoint and schema location in the same region as your data:
# ❌ Bad: cross-region costs
data_path = "s3://us-east-1-bucket/data/"
checkpoint = "s3://eu-west-1-bucket/checkpoint/"
# ✅ Good: same region
data_path = "s3://us-east-1-bucket/data/"
checkpoint = "s3://us-east-1-bucket/checkpoint/"
Impact: Avoid data transfer costs between regions.
4. Use Incremental Listing for Large Directories
If you have millions of files, enable incremental listing:
.option("cloudFiles.useIncrementalListing", "true")
This makes the initial directory scan much faster and cheaper.
5. Right-Size Your Schema Evolution
If you know your schema is stable, disable schema inference to save processing time:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
custom_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
df = (
spark.readStream
.format("cloudFiles")
.schema(custom_schema) # no inference needed
.option("cloudFiles.format", "json")
.load(data_path)
)
Impact: Faster startup time and lower compute costs.
Key Points to Remember
Checkpoint Location: This is where Auto Loader keeps track of processed files and schema evolution. Don’t delete it or you’ll reprocess everything.
Schema Location: Stores the inferred schema and its evolution over time. Essential for maintaining data consistency.
Idempotency: Auto Loader ensures exactly-once processing semantics. Even if your pipeline fails and restarts, files won’t be processed twice.
When to Use Auto Loader
Use Auto Loader when:
- You’re ingesting files continuously from cloud storage
- You want to avoid maintaining complex state management
- You need schema evolution handling out of the box
- You’re building lakehouse architectures on Databricks
Don’t use Auto Loader when:
- You’re doing one-time batch loads (just use
spark.readinstead) - You’re not using Delta Lake or need custom checkpointing logic
- Your files don’t live in cloud storage
Final Thoughts
After using Auto Loader in real pipelines, it’s hard to imagine going back. Not because it’s fancy, but because it removes a whole class of problems we used to accept as “part of the job”. And that’s usually a sign of a good abstraction.
When a tool quietly disappears from your list of daily concerns, it gives teams space to focus on what actually matters, building reliable data products instead of managing file state.