Skip to content

Commit

Permalink
fix: Ray deprecation warnings (#2929)
Browse files Browse the repository at this point in the history
* remove use_legacy_dataset

* remove pytest.mark.modin_index

* replace get_internal_block_refs with iter_internal_ref_bundles

* fix formatting

* fix usage of iter_internal_ref_bundles
  • Loading branch information
LeonLuttenberger committed Aug 12, 2024
1 parent d09a556 commit 26df821
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ def __init__( # noqa: PLR0912,PLR0915
paths,
**dataset_kwargs,
filesystem=filesystem,
use_legacy_dataset=False,
)
except OSError as e:
_handle_read_os_error(e, paths)
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def pyarrow_types_from_pandas_distributed(
) -> dict[str, pa.DataType]:
"""Extract the related Pyarrow data types from a pandas DataFrame."""
func = ray_remote()(pyarrow_types_from_pandas)
first_block_object_ref = _ray_dataset_from_df(df).get_internal_block_refs()[0]
first_block_object_ref = next(_ray_dataset_from_df(df).iter_internal_ref_bundles()).block_refs[0]
return ray_get( # type: ignore[no-any-return]
func(
df=first_block_object_ref,
Expand Down
10 changes: 8 additions & 2 deletions awswrangler/distributed/ray/modin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,21 @@ def _to_modin(

return from_partitions(
partitions=[
_block_to_df(block=block, to_pandas_kwargs=_to_pandas_kwargs) for block in dataset.get_internal_block_refs()
_block_to_df(block=block_ref, to_pandas_kwargs=_to_pandas_kwargs)
for ref_bundle in dataset.iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
],
axis=0,
index=index,
)


def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> list[ObjectRef[Any]]:
object_refs: list[ObjectRef[Any]] = _ray_dataset_from_df(df).get_internal_block_refs()
object_refs: list[ObjectRef[Any]] = [
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
]
return object_refs


Expand Down
6 changes: 5 additions & 1 deletion awswrangler/distributed/ray/modin/s3/_write_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def write_partitions(df: pd.DataFrame, block_index: int) -> tuple[list[str], dic
)
return paths, partitions_values

block_object_refs = _ray_dataset_from_df(df).get_internal_block_refs()
block_object_refs = (
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
)
result = ray_get(
[write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)]
)
Expand Down
5 changes: 0 additions & 5 deletions tests/unit/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database,
assert len(wr.s3.list_objects(path=path3)) == 0


@pytest.mark.modin_index
def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, glue_database, glue_ctas_database):
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
wr.s3.to_parquet(
Expand Down Expand Up @@ -1013,7 +1012,6 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1102,7 +1100,6 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1168,7 +1165,6 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2, 3], [False, True, False, True], ["b", "c", "d", "e"]])
def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, glue_table, bucketing_data):
nb_of_buckets = 2
Expand Down Expand Up @@ -1296,7 +1292,6 @@ def test_combined_bucketing_partitioning_csv_dataset(path, glue_database, glue_t
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_table):
nb_of_buckets = 2
df = pd.DataFrame({"c0": [0, 1, 2, 3], "c1": [4, 6, 5, 7], "c2": ["foo", "bar", "baz", "boo"]})
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_athena_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ def test_athena_csv_types(path, glue_database, glue_table):
ensure_data_types_csv(df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.parametrize("ctas_approach", [True, False])
@pytest.mark.parametrize("line_count", [1, 2])
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ def test_schema_evolution_disabled(path, glue_table, glue_database):
assert df2.c0.sum() == 3


@pytest.mark.modin_index
def test_date_cast(path, glue_table, glue_database):
df = pd.DataFrame(
{
Expand Down
8 changes: 0 additions & 8 deletions tests/unit/test_s3_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ def test_index_recovery_simple_str(path, use_threads):
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -447,7 +446,6 @@ def test_range_index_recovery_simple(path, use_threads):
assert_pandas_equals(df.reset_index(level=0), df2.reset_index(level=0))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -498,7 +496,6 @@ def test_multi_index_recovery_nameless(path, use_threads):
assert_pandas_equals(df.reset_index(), df2.reset_index())


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=(wr.exceptions.InvalidArgumentCombination, AssertionError),
reason="Named index not working when partitioning to a single file",
Expand Down Expand Up @@ -535,7 +532,6 @@ def test_index_schema_validation(path, glue_database, glue_table, index):
assert_pandas_equals(pd.concat([df, df]), df2)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -625,7 +621,6 @@ def test_to_parquet_dataset_sanitize(path):
assert df2.par.to_list() == ["a", "b"]


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [False, True, 2])
def test_timezone_file(path, use_threads):
file_path = f"{path}0.parquet"
Expand All @@ -636,7 +631,6 @@ def test_timezone_file(path, use_threads):
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False, 2])
def test_timezone_file_columns(path, use_threads):
file_path = f"{path}0.parquet"
Expand Down Expand Up @@ -690,7 +684,6 @@ def test_validate_columns(path, partition_cols) -> None:
wr.s3.read_parquet(path, columns=["a", "b", "c"], dataset=True, validate_schema=True)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand All @@ -715,7 +708,6 @@ def test_mixed_types_column(path) -> None:
wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"])


@pytest.mark.modin_index
@pytest.mark.parametrize("compression", [None, "snappy", "gzip", "zstd"])
def test_parquet_compression(path, compression) -> None:
df = pd.DataFrame({"id": [1, 2, 3]}, dtype="Int64")
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/test_s3_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
assert df_res.equals(dfs[-1])


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand All @@ -205,7 +204,6 @@ def test_json(path):
assert df1.equals(wr.s3.read_json(path=[path0, path1], use_threads=True))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -366,7 +364,6 @@ def test_csv_line_terminator(path, line_terminator):
assert df.equals(df2)


@pytest.mark.modin_index
def test_read_json_versioned(path) -> None:
path_file = f"{path}0.json"
dfs = [
Expand Down

0 comments on commit 26df821

Please sign in to comment.