Filesystem source
Filesystem source allows loading files from remote locations (AWS S3, Google Cloud Storage, Google Drive, Azure) or the local filesystem seamlessly. Filesystem source natively supports csv
, parquet
, and jsonl
files and allows customization for loading any type of structured files.
To load unstructured data (.pdf
, .txt
, e-mail), please refer to the unstructured data source.
How Filesystem source works?
The Filesystem source doesn't just give you an easy way to load data from both remote and local files — it also comes with a powerful set of tools that let you customize the loading process to fit your specific needs.
Filesystem source loads data in two steps:
- It accesses the files in your remote or local file storage without actually reading the content yet. At this point, you can filter files by metadata or name. You can also set up incremental loading to load only new files.
- The transformer reads the files' content and yields the records. At this step, you can filter out the actual data, enrich records with metadata from files, or perform incremental loading based on the file content.
Quick example
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
filesystem_resource = filesystem(
bucket_url="file://Users/admin/Documents/parquet_files",
file_glob="**/*.parquet"
)
filesystem_pipe = filesystem_resource | read_parquet()
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("modification_date"))
# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
print(pipeline.last_trace.last_normalize_info)
Setup
Prerequisites
Please make sure the dlt
library is installed. Refer to the installation guide.
Initialize the filesystem source
To get started with your data pipeline, follow these steps:
Enter the following command:
dlt init filesystem duckdb
dlt init command will initialize the pipeline example with the filesystem as the source and duckdb as the destination.
If you would like to use a different destination, simply replace
duckdb
with the name of your preferred destination.After running this command, a new directory will be created with the necessary files and configuration settings to get started.
Configuration
Get credentials
- AWS S3
- GCS/GDrive
- Azure
- Local filesystem
To get AWS keys for S3 access:
- Access IAM in the AWS Console.
- Select "Users", choose a user, and open "Security credentials".
- Click "Create access key" for AWS ID and Secret Key.
For more info, see AWS official documentation.
To get GCS/GDrive access:
- Log in to console.cloud.google.com.
- Create a service account.
- Enable "Cloud Storage API" / "Google Drive API"; see Google's guide.
- In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" > "ADD KEY" > "CREATE" to get a JSON credential file.
- Grant the service account appropriate permissions for cloud storage access.
For more info, see how to create a service account.
To obtain Azure blob storage access:
- Go to the Azure Portal (portal.azure.com).
- Select "Storage accounts" > your storage.
- Click "Settings" > "Access keys".
- View the account name and two keys (primary/secondary). Keep keys confidential.
For more info, see Azure official documentation.
Add credentials to dlt pipeline
To provide credentials to the filesystem source, you can use any method available in dlt
.
One of the easiest ways is to use configuration files. The .dlt
folder in your working directory
contains two files: config.toml
and secrets.toml
. Sensitive information, like passwords and
access tokens, should only be put into secrets.toml
, while any other configuration, like the path to
a bucket, can be specified in config.toml
.
- AWS S3
- GCS/GDrive
- Azure
- Local filesystem
# secrets.toml
[sources.filesystem.credentials]
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"
# config.toml
[sources.filesystem]
bucket_url="s3://<bucket_name>/<path_to_files>/"
# secrets.toml
[sources.filesystem.credentials]
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"
# config.toml
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="az://<container_name>/<path_to_files>/"
# secrets.toml
[sources.filesystem.credentials]
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"
# config.toml
# gdrive
[gdrive_pipeline_name.sources.filesystem]
bucket_url="gdrive://<folder_name>/<subfolder_or_file_path>/"
# config.toml
# Google storage
[gstorage_pipeline_name.sources.filesystem]
bucket_url="gs://<bucket_name>/<path_to_files>/"
You can use both native local filesystem paths and file://
URI. Absolute, relative, and UNC Windows paths are supported.
You could provide an absolute filepath:
# config.toml
[sources.filesystem]
bucket_url='file://Users/admin/Documents/csv_files'
Or skip the schema and provide the local path in a format native for your operating system. For example, for Windows:
[sources.filesystem]
bucket_url='~\Documents\csv_files\'
You can also specify the credentials using Environment variables. The name of the corresponding environment
variable should be slightly different than the corresponding name in the toml
file. Simply replace dots .
with double
underscores __
:
export SOURCES__FILESYSTEM__AWS_ACCESS_KEY_ID = "Please set me up!"
export SOURCES__FILESYSTEM__AWS_SECRET_ACCESS_KEY = "Please set me up!"
dlt
supports more ways of authorizing with the cloud storage, including identity-based
and default credentials. To learn more about adding credentials to your pipeline, please refer to the
Configuration and secrets section.
Usage
The filesystem source is quite unique since it provides you with building blocks for loading data from files. First, it iterates over files in the storage and then processes each file to yield the records. Usually, you need two resources:
- The
filesystem
resource enumerates files in a selected bucket using a glob pattern, returning details asFileItem
in customizable page sizes. - One of the available transformer resources to process each file in a specific transforming function and yield the records.
1. Initialize a filesystem
resource
If you use just the filesystem
resource, it will only list files in the storage based on glob parameters and yield the
files metadata. The filesystem
resource itself does not read or copy files.
All parameters of the resource can be specified directly in code:
from dlt.sources.filesystem import filesystem
filesystem_source = filesystem(
bucket_url="file://Users/admin/Documents/csv_files",
file_glob="*.csv"
)
or taken from the config:
python code:
from dlt.sources.filesystem import filesystem
filesystem_source = filesystem()configuration file:
[sources.filesystem]
bucket_url="file://Users/admin/Documents/csv_files"
file_glob="*.csv"
Full list of filesystem
resource parameters:
bucket_url
- full URL of the bucket (could be a relative path in the case of the local filesystem).credentials
- cloud storage credentials ofAbstractFilesystem
instance (should be empty for the local filesystem). We recommend not to specify this parameter in the code, but put it in secrets file instead.file_glob
- file filter in glob format. Defaults to listing all non-recursive files in the bucket URL.files_per_page
- number of files processed at once. The default value is100
.extract_content
- if true, the content of the file will be read and returned in the resource. The default value isFalse
.
2. Choose the right transformer resource
The current implementation of the filesystem source natively supports three file types: csv
, parquet
, and jsonl
.
You can apply any of the above or create your own transformer. To apply the selected transformer
resource, use pipe notation |
:
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(
bucket_url="file://Users/admin/Documents/csv_files",
file_glob="*.csv"
) | read_csv()
Available transformers
read_csv()
- processcsv
files usingpandas
read_jsonl()
- processjsonl
files chuck by chunkread_parquet()
- processparquet
files usingpyarrow
read_csv_duckdb()
- this transformer processcsv
files using DuckDB, which usually shows better performance, thanpandas
.
We advise that you give each resource a
specific name
before loading with pipeline.run
. This will make sure that data goes to a table with the name you
want and that each pipeline uses a
separate state for incremental loading.
3. Create and run a pipeline
import dlt
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
info = pipeline.run(filesystem_pipe)
print(info)
For more information on how to create and run the pipeline, read the Walkthrough: Run a pipeline.
4. Apply hints
import dlt
from dlt.sources.filesystem import filesystem, read_csv
filesystem_pipe = filesystem(bucket_url="file://Users/admin/Documents/csv_files", file_glob="*.csv") | read_csv()
# tell dlt to merge on date
filesystem_pipe.apply_hints(write_disposition="merge", merge_key="date")
# We load the data into the table_name table
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe.with_name("table_name"))
print(load_info)
5. Incremental loading
Here are a few simple ways to load your data incrementally:
- Load files based on modification date. Only load files that have been updated since the last time
dlt
processed them.dlt
checks the files' metadata (like the modification date) and skips those that haven't changed. - Load new records based on a specific column. You can load only the new or updated records by looking at a specific column, like
updated_at
. Unlike the first method, this approach would read all files every time and then filter the records which was updated. - Combine loading only updated files and records. Finally, you can combine both methods. It could be useful if new records could be added to existing files, so you not only want to filter the modified files, but modified records as well.
Load files based on modification date
For example, to load only new CSV files with incremental loading you can use apply_hints
method.
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# This configuration will only consider new csv files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# add incremental on modification time
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run((new_files | read_csv()).with_name("csv_files"))
print(load_info)
Load new records based on a specific column
In this example we load only new records based on the field called updated_at
. This method may be useful if you are not able to
filter files by modification date because for example, all files are modified each time new record is appeared.
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# We consider all csv files
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
# But filter out only updated records
filesystem_pipe = (all_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
Combine loading only updated files and records
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# This configuration will only consider modified csv files
new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
# And in each modified file we filter out only updated records
filesystem_pipe = (new_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
6. Filter files
If you need to filter out files based on their metadata, you can easily do this using the add_filter
method.
Within your filtering function, you'll have access to any field of the FileItem
representation.
Filter by name
To filter only files that have London
and Berlin
in their names, you can do the following:
import dlt
from dlt.sources.filesystem import filesystem, read_csv
# Filter files accessing file_name field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: ("London" in item["file_name"]) or ("Berlin" in item["file_name"]))
filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
You could also use file_glob
to filter files by names. It works very well in simple cases, for example, filtering by extention:
from dlt.sources.filesystem import filesystem
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="**/*.json")
Filter by size
If for some reason you only want to load small files, you can also do that:
import dlt
from dlt.sources.filesystem import filesystem, read_csv
MAX_SIZE_IN_BYTES = 10
# Filter files accessing size_in_bytes field
filtered_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")
filtered_files.add_filter(lambda item: item["size_in_bytes"] < MAX_SIZE_IN_BYTES)
filesystem_pipe = (filtered_files | read_csv())
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
Troubleshooting
Access extremely long file paths
Windows supports paths up to 255 characters. When you access a path longer than 255 characters, you'll see a FileNotFound
exception.
To go over this limit, you can use extended paths. Note that Python glob does not work with extended UNC paths, so you will not be able to use them
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
If you get an empty list of files
If you are running a dlt
pipeline with the filesystem source and get zero records, we recommend you check
the configuration of bucket_url
and file_glob
parameters.
For example, with Azure Blob storage, people sometimes mistake the account name for the container name. Make sure
you've set up a URL as "az://<container name>/"
.
Also, please reference the glob
function to configure the resource correctly. Use **
to include recursive files. Note that the local
filesystem supports full Python glob functionality,
while cloud storage supports a restricted fsspec
version.
Additional Setup guides
- Load data from The Local Filesystem to Databricks in python with dlt
- Load data from The Local Filesystem to Neon Serverless Postgres in python with dlt
- Load data from Soundcloud to AWS S3 in python with dlt
- Load data from Braze to AWS S3 in python with dlt
- Load data from Azure Cloud Storage to AWS S3 in python with dlt
- Load data from Azure Cloud Storage to Snowflake in python with dlt
- Load data from DigitalOcean to The Local Filesystem in python with dlt
- Load data from Google Cloud Storage to CockroachDB in python with dlt
- Load data from Google Cloud Storage to Redshift in python with dlt
- Load data from Cisco Meraki to Azure Cloud Storage in python with dlt