Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store tables as PARQUET files #419

Open
wants to merge 62 commits into
base: main
Choose a base branch
from
Open

Store tables as PARQUET files #419

wants to merge 62 commits into from

Conversation

hagenw
Copy link
Member

@hagenw hagenw commented Mar 20, 2024

Closes #382
Closes #376

Several improvement/speed ups to the table handling by using pyarrow:

  • Support for storing tables as PARQUET files
  • Reading tables from CSV files with pyarrow
  • Require pandas >=2.2.0
  • Require pyarrow
  • Remove support for Python 3.8 (as we need pandas >=2.2.0)
  • Add a new CI job "ubuntu-latest, 3.9, minimum", that tests with all Python packages installed at its minimum version

I decided to stay with "csv" as the default setting for the storage_format argument in audformat.Database.save() and audformat.Table.save(). This way, we could make a new release of audformat and can test storing tables as PARQUET file with some databases, without exposing it to other users as well. In addition, audb needs to be updated to support publishing PARQUET tables.

Loading benchmark

We can benchmark the behavior with loading a dataset from a folder, that contains all tables with
audformat.Database.load(db_root, load_data=True). Results are given as average over 10 runs.

Dataset # table rows CSV (previous) CSV (new) PARQUET PKL
mozillacommonvoice 1,923,269 8.62 ± 0.3 s 4.76 ± 0.6 s 3.58 ± 0.7 s 2.30 ± 0.8 s
voxceleb2 2,894,987 17.44 ± 0.3 s 6.05 ± 0.2 s 1.33 ± 0.1 s 1.01 ± 0.1 s
librispeech 36,422,346 1036.1 ± 30 s 284.95 ± 17.3 s 26.88 ± 0.7 s 3.01 ± 1.0 s
imda-nsc-read-speech-balanced 4,538,128 8.38 ± 0.2 s 2.86 ± 0.4 s 3.07 ± 0.3 s 2.04 ± 0.3 s
emodb 1,605 0.02 ± 0.00 s 0.05 ± 0.01 s 0.03 ± 0.01 s 0.03 ± 0.07 s
Benchmark code
import os
import time

import numpy as np

import audb
import audeer
import audformat


repetitions = 10

db_root = audeer.path("./db")
audeer.rmdir(db_root)
cache_root = audeer.path("./cache")
audeer.rmdir(cache_root)
audeer.mkdir(cache_root)

datasets = [
    ("mozillacommonvoice", "10.3.0"),
    ("voxceleb2", "2.6.0"),
    ("librispeech", "3.1.0"),
    ("imda-nsc-read-speech-balanced", "2.6.0"),
    ("emodb", "1.4.1"),
]
for name, version in datasets:
    audeer.rmdir(db_root)
    audb.load_to(
        db_root,
        name,
        version=version,
        only_metadata=True,
        cache_root=cache_root,
        num_workers=1,
    )

    # PKL files
    times = []
    for _ in range(repetitions):
        t0 = time.time()
        db = audformat.Database.load(db_root, load_data=True)
        t = time.time() - t0
        times.append(t)
    print(
        f"Execution time with PKL tables {name}: "
        f"{np.mean(times):.3f}±{np.std(times):.3f}s"
    )

    # CSV files
    pkl_files = audeer.list_file_names(db_root, filetype="pkl")
    for pkl_file in pkl_files:
        os.remove(pkl_file)
    times = []
    for _ in range(repetitions):
        t0 = time.time()
        db = audformat.Database.load(db_root, load_data=True)
        t = time.time() - t0
        times.append(t)
    print(
        f"Execution time with CSV tables {name}: "
        f"{np.mean(times):.3f}±{np.std(times):.3f}s"
    )

    # PARQUET files
    if audformat.__version__ > "1.1.4":
        # Replace CSV with PARQUET files
        # and repeat benchmark
        for table_id in list(db):
            path = audeer.path(db_root, f"db.{table_id}")
            db[table_id].save(path, storage_format="parquet")
            os.remove(f"{path}.csv")
        times = []
        for _ in range(repetitions):
            t0 = time.time()
            db = audformat.Database.load(db_root, load_data=True)
            t = time.time() - t0
            times.append(t)
        print(
            f"Execution time with PARQUET tables {name}: "
            f"{np.mean(times):.3f}±{np.std(times):.3f}s"
        )

The benchmark highlights two important results:

  • The new implementation speeds up loading of CSV files by a factor of ~4.
  • Reading from PARQUET is at least as fast as reading from CSV, and for some datasets much faster.

Memory benchmark

I investigated memory consumption using heaptrack, when loading the phonetic-transcription.train-clean-360 table from the librispeech dataset from our internal server. Stored as CSV file the table has a size of 1,3 G, stored as PARQUET file 49 M.

branch peak heap memory execution time
CSV (main) 1.36 G 197.9 s
CSV (pull request) 1.33 G 43.9 s
PARQUET (pull request) 1.11 G 3.7 s
Benchmark code

csv-table-loading.py

import time

import audb
import audformat


# Measure execution time when loading a CSV table.
# Assumes the file `db.phonetic-transcription.train-clean-360.csv`
# or `db.phonetic-transcription.train-clean-360.parquet`
# exists in the same folder
# as this script.
# Note, that the CSV file is only loaded
# when the PARQUET file is not present.

db = audb.info.header("librispeech")
table_id = "phonetic-transcription.train-clean-360"
t0 = time.time()
db[table_id].load(f"db.{table_id}")
t = time.time() - t0
print(f"Execution time: {t:.1f} s")

The memory consumption is then profiled with:

$ heaptrack python csv-table-loading.py

The execution time was measured without heaptrack:

$ python csv-table-loading.py

Why and when is reading from a CSV file slow?

By far the slowest part when reading a CSV file with pyarrow is the conversion to pandas.Timedelta values for columns, that specify a duration.
E.g. when reading the CSV from the memory benchmark, the reading with pyarrow and conversion to pandas.DataFrame takes 3 s, whereas the conversion of the start and end column to pandas.Timedelta takes roughly 40 s.
There is a dedicated dtype with pyarrow.duration, but it does not have yet reading support for CSV files. When trying so, you get:

pyarrow.lib.ArrowNotImplementedError: CSV conversion to duration[ns] is not supported

When storing a table as a PARQUET file we use duration[ns] for time values, and converting those back to pandas.Timedelta seems to be much faster. Reading the PARQUET file needs 0.3 s, converting to the dataframe then takes the remaining 3.4 s.

The conversion can very likely be speed up when switching to use pyarrow based dtypes in the dataframe as we do for audb.Dependencies._df, but at the moment this is not fully supported in pandas, e.g. timedelta values are not implemented yet (pandas-dev/pandas#52284).

Hashing of PARQUET files

As we have experienced already at audeering/audb#372, PARQUET files are not stored in a reproducible fashion and might return different MD5 hash sums, even though they store the same dataframe.
To overcome this problem, I calculate now a hash based on the content of the dataframe using audformat.utils.hash() and store the resulting value inside the metadata of schema of the PARQUET file. Which means in audb we can access it by just loading the schema of a PARQUET file. The corresponding code to access the hash is:

import pyarrow.parquet as parquet

hash = parquet.read_schema(path).metadata[b"hash"].decode()

This approach is faster than calculating the MD5 sum with audeer.md5().
Execution time benchmarked as average over 100 repetitions:

method execution time
read hash 0.0001 ± 0.0000 s
calculate md5 0.0716 ± 0.0007 s
Benchmark code
import time

import numpy as np
import pyarrow.parquet as parquet

import audeer

# Compare execution times
# for calculating the MD5 sum
# vs reading the hash from the PARQUET file.
# Assumes the file `db.phonetic-transcription.train-clean-360.parquet`
# exists in the same folder
# as this script.
table_id = "phonetic-transcription.train-clean-360"
repetitions = 100 

path = f"db.{table_id}.parquet"
times = []
for _ in range(repetitions):
    t0 = time.time()
    audeer.md5(path)
    t = time.time() - t0
    times.append(t)
print(f"Execution time MD5: {np.mean(times):.4f} ± {np.std(times):.4f} s")

times = []
for _ in range(repetitions):
    t0 = time.time()
    parquet.read_schema(path).metadata[b"hash"].decode()
    t = time.time() - t0
    times.append(t)
print(f"Execution time hash: {np.mean(times):.4f} ± {np.std(times):.4f} s")

The downside is that audformat.utils.hash() requires pandas >=2.2.0 as pandas has changed its hash calculation, which means older versions of pandas return a different hash, see pandas-dev/pandas#58999. I tried to implement a custom hashing to not rely on pandas (as they might change the behavior again in the future), but I did not succeed in implementing something that is fast and requires not much memory.

Writing benchmark

Comparison of saving the phonetic-transcription.train-clean-360 table from librispeech in different formats.
Note, saving as a PARQUET file includes calculating of the hash of the underlying dataframe.

format execution time
CSV 22.2 s
PARQUET 4.2 s
PKL 1.0 s
Benchmark code
import os
import time

import audb
import audformat


# Measure execution time when saving table.
# Assumes the file `db.phonetic-transcription.train-clean-360.csv`
# or `db.phonetic-transcription.train-clean-360.parquet`
# exists in the same folder
# as this script.
db = audb.info.header("librispeech")
table_id = "phonetic-transcription.train-clean-360"
db[table_id].load(f"db.{table_id}")

t0 = time.time()
db[table_id].save("table", storage_format="csv")
t = time.time() - t0
os.remove("table.csv")
print(f"Execution time CSV: {t:.1f} s")

t0 = time.time()
db[table_id].save("table", storage_format="parquet")
t = time.time() - t0
os.remove("table.parquet")
print(f"Execution time PARQUET: {t:.1f} s")

t0 = time.time()
db[table_id].save("table", storage_format="pkl")
t = time.time() - t0
os.remove("table.pkl")
print(f"Execution time PKL: {t:.1f} s")

@hagenw hagenw marked this pull request as draft March 20, 2024 12:33
Copy link

codecov bot commented Mar 21, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.0%. Comparing base (eddf224) to head (2749ef9).

Additional details and impacted files
Files Coverage Δ
audformat/core/database.py 100.0% <ø> (ø)
audformat/core/define.py 100.0% <100.0%> (ø)
audformat/core/table.py 100.0% <100.0%> (ø)
audformat/core/utils.py 100.0% <100.0%> (ø)

@hagenw hagenw force-pushed the pyarrow branch 2 times, most recently from 2c0f12f to 625d4c3 Compare May 30, 2024 14:36
@hagenw hagenw changed the title Use pyarrow to read CSV files Store tables as PARQUET files Jun 11, 2024
@hagenw hagenw marked this pull request as ready for review June 18, 2024 11:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Switch to pyarrow engine when reading CSV files Support an additional file format for tables
1 participant