diff --git a/awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py b/awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py index 002303806..919fcdcf4 100644 --- a/awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py +++ b/awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py @@ -243,7 +243,6 @@ def __init__( # noqa: PLR0912,PLR0915 paths, **dataset_kwargs, filesystem=filesystem, - use_legacy_dataset=False, ) except OSError as e: _handle_read_os_error(e, paths) diff --git a/awswrangler/distributed/ray/modin/_data_types.py b/awswrangler/distributed/ray/modin/_data_types.py index 4b6267d47..6316a0b72 100644 --- a/awswrangler/distributed/ray/modin/_data_types.py +++ b/awswrangler/distributed/ray/modin/_data_types.py @@ -15,7 +15,7 @@ def pyarrow_types_from_pandas_distributed( ) -> dict[str, pa.DataType]: """Extract the related Pyarrow data types from a pandas DataFrame.""" func = ray_remote()(pyarrow_types_from_pandas) - first_block_object_ref = _ray_dataset_from_df(df).get_internal_block_refs()[0] + first_block_object_ref = next(_ray_dataset_from_df(df).iter_internal_ref_bundles()).block_refs[0] return ray_get( # type: ignore[no-any-return] func( df=first_block_object_ref, diff --git a/awswrangler/distributed/ray/modin/_utils.py b/awswrangler/distributed/ray/modin/_utils.py index e5f468dd5..04cdefe99 100644 --- a/awswrangler/distributed/ray/modin/_utils.py +++ b/awswrangler/distributed/ray/modin/_utils.py @@ -51,7 +51,9 @@ def _to_modin( return from_partitions( partitions=[ - _block_to_df(block=block, to_pandas_kwargs=_to_pandas_kwargs) for block in dataset.get_internal_block_refs() + _block_to_df(block=block_ref, to_pandas_kwargs=_to_pandas_kwargs) + for ref_bundle in dataset.iter_internal_ref_bundles() + for block_ref in ref_bundle.block_refs ], axis=0, index=index, @@ -59,7 +61,11 @@ def _to_modin( def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> list[ObjectRef[Any]]: - object_refs: list[ObjectRef[Any]] = _ray_dataset_from_df(df).get_internal_block_refs() + object_refs: list[ObjectRef[Any]] = [ + block_ref + for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles() + for block_ref in ref_bundle.block_refs + ] return object_refs diff --git a/awswrangler/distributed/ray/modin/s3/_write_dataset.py b/awswrangler/distributed/ray/modin/s3/_write_dataset.py index c7944a977..5faee0263 100644 --- a/awswrangler/distributed/ray/modin/s3/_write_dataset.py +++ b/awswrangler/distributed/ray/modin/s3/_write_dataset.py @@ -145,7 +145,11 @@ def write_partitions(df: pd.DataFrame, block_index: int) -> tuple[list[str], dic ) return paths, partitions_values - block_object_refs = _ray_dataset_from_df(df).get_internal_block_refs() + block_object_refs = ( + block_ref + for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles() + for block_ref in ref_bundle.block_refs + ) result = ray_get( [write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)] ) diff --git a/tests/unit/test_athena.py b/tests/unit/test_athena.py index 5075f6969..cc0a5029f 100644 --- a/tests/unit/test_athena.py +++ b/tests/unit/test_athena.py @@ -135,7 +135,6 @@ def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database, assert len(wr.s3.list_objects(path=path3)) == 0 -@pytest.mark.modin_index def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, glue_database, glue_ctas_database): df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]}) wr.s3.to_parquet( @@ -1013,7 +1012,6 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table): assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols -@pytest.mark.modin_index @pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]]) @pytest.mark.parametrize( "dtype", @@ -1102,7 +1100,6 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table): assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols -@pytest.mark.modin_index @pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]]) @pytest.mark.parametrize( "dtype", @@ -1168,7 +1165,6 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data, assert all(x in bucketing_data for x in loaded_df["c0"].to_list()) -@pytest.mark.modin_index @pytest.mark.parametrize("bucketing_data", [[0, 1, 2, 3], [False, True, False, True], ["b", "c", "d", "e"]]) def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, glue_table, bucketing_data): nb_of_buckets = 2 @@ -1296,7 +1292,6 @@ def test_combined_bucketing_partitioning_csv_dataset(path, glue_database, glue_t assert all(x in bucketing_data for x in loaded_df["c0"].to_list()) -@pytest.mark.modin_index def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_table): nb_of_buckets = 2 df = pd.DataFrame({"c0": [0, 1, 2, 3], "c1": [4, 6, 5, 7], "c2": ["foo", "bar", "baz", "boo"]}) diff --git a/tests/unit/test_athena_csv.py b/tests/unit/test_athena_csv.py index 4bd731331..31dd82fd3 100644 --- a/tests/unit/test_athena_csv.py +++ b/tests/unit/test_athena_csv.py @@ -372,7 +372,6 @@ def test_athena_csv_types(path, glue_database, glue_table): ensure_data_types_csv(df2) -@pytest.mark.modin_index @pytest.mark.parametrize("use_threads", [True, False]) @pytest.mark.parametrize("ctas_approach", [True, False]) @pytest.mark.parametrize("line_count", [1, 2]) diff --git a/tests/unit/test_athena_parquet.py b/tests/unit/test_athena_parquet.py index 2d50099e3..7e5a01f69 100644 --- a/tests/unit/test_athena_parquet.py +++ b/tests/unit/test_athena_parquet.py @@ -613,7 +613,6 @@ def test_schema_evolution_disabled(path, glue_table, glue_database): assert df2.c0.sum() == 3 -@pytest.mark.modin_index def test_date_cast(path, glue_table, glue_database): df = pd.DataFrame( { diff --git a/tests/unit/test_s3_parquet.py b/tests/unit/test_s3_parquet.py index 8ed228648..97d568fb4 100644 --- a/tests/unit/test_s3_parquet.py +++ b/tests/unit/test_s3_parquet.py @@ -410,7 +410,6 @@ def test_index_recovery_simple_str(path, use_threads): assert_pandas_equals(df, df2) -@pytest.mark.modin_index @pytest.mark.xfail( raises=AssertionError, reason="https://github.com/ray-project/ray/issues/37771", @@ -447,7 +446,6 @@ def test_range_index_recovery_simple(path, use_threads): assert_pandas_equals(df.reset_index(level=0), df2.reset_index(level=0)) -@pytest.mark.modin_index @pytest.mark.xfail( raises=AssertionError, reason="https://github.com/ray-project/ray/issues/37771", @@ -498,7 +496,6 @@ def test_multi_index_recovery_nameless(path, use_threads): assert_pandas_equals(df.reset_index(), df2.reset_index()) -@pytest.mark.modin_index @pytest.mark.xfail( raises=(wr.exceptions.InvalidArgumentCombination, AssertionError), reason="Named index not working when partitioning to a single file", @@ -535,7 +532,6 @@ def test_index_schema_validation(path, glue_database, glue_table, index): assert_pandas_equals(pd.concat([df, df]), df2) -@pytest.mark.modin_index @pytest.mark.xfail( raises=AssertionError, reason="https://github.com/ray-project/ray/issues/37771", @@ -625,7 +621,6 @@ def test_to_parquet_dataset_sanitize(path): assert df2.par.to_list() == ["a", "b"] -@pytest.mark.modin_index @pytest.mark.parametrize("use_threads", [False, True, 2]) def test_timezone_file(path, use_threads): file_path = f"{path}0.parquet" @@ -636,7 +631,6 @@ def test_timezone_file(path, use_threads): assert_pandas_equals(df, df2) -@pytest.mark.modin_index @pytest.mark.parametrize("use_threads", [True, False, 2]) def test_timezone_file_columns(path, use_threads): file_path = f"{path}0.parquet" @@ -690,7 +684,6 @@ def test_validate_columns(path, partition_cols) -> None: wr.s3.read_parquet(path, columns=["a", "b", "c"], dataset=True, validate_schema=True) -@pytest.mark.modin_index @pytest.mark.xfail( raises=AssertionError, reason="https://github.com/ray-project/ray/issues/37771", @@ -715,7 +708,6 @@ def test_mixed_types_column(path) -> None: wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"]) -@pytest.mark.modin_index @pytest.mark.parametrize("compression", [None, "snappy", "gzip", "zstd"]) def test_parquet_compression(path, compression) -> None: df = pd.DataFrame({"id": [1, 2, 3]}, dtype="Int64") diff --git a/tests/unit/test_s3_text.py b/tests/unit/test_s3_text.py index 7b2d90113..18741a909 100644 --- a/tests/unit/test_s3_text.py +++ b/tests/unit/test_s3_text.py @@ -188,7 +188,6 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table): assert df_res.equals(dfs[-1]) -@pytest.mark.modin_index @pytest.mark.xfail( raises=AssertionError, reason="https://github.com/ray-project/ray/issues/37771", @@ -205,7 +204,6 @@ def test_json(path): assert df1.equals(wr.s3.read_json(path=[path0, path1], use_threads=True)) -@pytest.mark.modin_index @pytest.mark.xfail( raises=AssertionError, reason="https://github.com/ray-project/ray/issues/37771", @@ -366,7 +364,6 @@ def test_csv_line_terminator(path, line_terminator): assert df.equals(df2) -@pytest.mark.modin_index def test_read_json_versioned(path) -> None: path_file = f"{path}0.json" dfs = [