modin: Memory leak when reading CSV files in a loop

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04):

Ubuntu 20.04.3 LTS

  • Modin version (modin.__version__):

0.11.0

  • Python version:

Python 3.9.7

  • Code we can use to reproduce:

This is a reproducer made from Modin example

import sys
import time
import modin.pandas as pd
from modin.experimental.engines.omnisci_on_native.frame.omnisci_worker import OmnisciServer


def read(filename):
    columns_names = [
        "trip_id",
        "vendor_id",
        "pickup_datetime",
        "dropoff_datetime",
        "store_and_fwd_flag",
        "rate_code_id",
        "pickup_longitude",
        "pickup_latitude",
        "dropoff_longitude",
        "dropoff_latitude",
        "passenger_count",
        "trip_distance",
        "fare_amount",
        "extra",
        "mta_tax",
        "tip_amount",
        "tolls_amount",
        "ehail_fee",
        "improvement_surcharge",
        "total_amount",
        "payment_type",
        "trip_type",
        "pickup",
        "dropoff",
        "cab_type",
        "precipitation",
        "snow_depth",
        "snowfall",
        "max_temperature",
        "min_temperature",
        "average_wind_speed",
        "pickup_nyct2010_gid",
        "pickup_ctlabel",
        "pickup_borocode",
        "pickup_boroname",
        "pickup_ct2010",
        "pickup_boroct2010",
        "pickup_cdeligibil",
        "pickup_ntacode",
        "pickup_ntaname",
        "pickup_puma",
        "dropoff_nyct2010_gid",
        "dropoff_ctlabel",
        "dropoff_borocode",
        "dropoff_boroname",
        "dropoff_ct2010",
        "dropoff_boroct2010",
        "dropoff_cdeligibil",
        "dropoff_ntacode",
        "dropoff_ntaname",
        "dropoff_puma",
    ]
    # use string instead of category
    columns_types = [
        "int64",
        "string",
        "timestamp",
        "timestamp",
        "string",
        "int64",
        "float64",
        "float64",
        "float64",
        "float64",
        "int64",
        "float64",
        "float64",
        "float64",
        "float64",
        "float64",
        "float64",
        "float64",
        "float64",
        "float64",
        "string",
        "float64",
        "string",
        "string",
        "string",
        "float64",
        "int64",
        "float64",
        "int64",
        "int64",
        "float64",
        "float64",
        "float64",
        "float64",
        "string",
        "float64",
        "float64",
        "string",
        "string",
        "string",
        "float64",
        "float64",
        "float64",
        "float64",
        "string",
        "float64",
        "float64",
        "string",
        "string",
        "string",
        "float64",
    ]

    dtypes = {columns_names[i]: columns_types[i] for i in range(len(columns_names))}
    all_but_dates = {
        col: valtype
        for (col, valtype) in dtypes.items()
        if valtype not in ["timestamp"]
    }
    dates_only = [col for (col, valtype) in dtypes.items() if valtype in ["timestamp"]]

    df = pd.read_csv(
        filename,
        names=columns_names,
        dtype=all_but_dates,
        parse_dates=dates_only,
    )

    df.shape  # to trigger real execution
    df._query_compiler._modin_frame._partitions[0][
        0
    ].frame_id = OmnisciServer().put_arrow_to_omnisci(
        df._query_compiler._modin_frame._partitions[0][0].get()
    )  # to trigger real execution
    return df


def q1_omnisci(df):
    q1_pandas_output = df.groupby("cab_type").size()
    q1_pandas_output.shape  # to trigger real execution
    return q1_pandas_output


def q2_omnisci(df):
    q2_pandas_output = df.groupby("passenger_count").agg({"total_amount": "mean"})
    q2_pandas_output.shape  # to trigger real execution
    return q2_pandas_output


def q3_omnisci(df):
    df["pickup_datetime"] = df["pickup_datetime"].dt.year
    q3_pandas_output = df.groupby(["passenger_count", "pickup_datetime"]).size()
    q3_pandas_output.shape  # to trigger real execution
    return q3_pandas_output


def q4_omnisci(df):
    df["pickup_datetime"] = df["pickup_datetime"].dt.year
    df["trip_distance"] = df["trip_distance"].astype("int64")
    q4_pandas_output = (
        df.groupby(["passenger_count", "pickup_datetime", "trip_distance"], sort=False)
        .size()
        .reset_index()
        .sort_values(
            by=["pickup_datetime", 0], ignore_index=True, ascending=[True, False]
        )
    )
    q4_pandas_output.shape  # to trigger real execution
    return q4_pandas_output


def measure(name, func, *args, **kw):
    t0 = time.time()
    res = func(*args, **kw)
    t1 = time.time()
    print(f"{name}: {t1 - t0} sec")
    return res


def main():
    if len(sys.argv) != 2:
        print(
            f"USAGE: python nyc-taxi-omnisci.py <data file name>"
        )
        return
    for i in range(0, 20):
        print("Iteration #", i+1)
        df = measure("Reading", read, sys.argv[1])
        #measure("Q1", q1_omnisci, df)
        #measure("Q2", q2_omnisci, df)
        #measure("Q3", q3_omnisci, df.copy())
        #measure("Q4", q4_omnisci, df.copy())


if __name__ == "__main__":
    main()

Describe the problem

Download data files form https://modin-datasets.s3.amazonaws.com/taxi/trips_xaa.csv

After CSV files are loaded and no longer needed (python should garbage collect a dataframe) native resources occupied by CSV data are not released. This leads to a memory leak. On system with tight memory condition even 2nd iteration of a benchmark may fail because resources from 1st iteration are still kept in memory.

Source code / logs

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 30

Commits related to this issue

Most upvoted comments

Going omnisql docs DROP TABLE deletes the table structure, all data from the table, and any dictionary content unless it is a shared dictionary https://docs-new.omnisci.com/sql/data-definition-ddl/tables#drop-table. Are in-memory tables stored in a shared dictionary or is it an issue in OmniSciDB to delete tables?

Currently, it looks like OmniSci issue. We need to check that DROP TABLE correctly clears temporary tables data.