Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ray deprecation warnings #2929

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ def __init__( # noqa: PLR0912,PLR0915
paths,
**dataset_kwargs,
filesystem=filesystem,
use_legacy_dataset=False,
)
except OSError as e:
_handle_read_os_error(e, paths)
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def pyarrow_types_from_pandas_distributed(
) -> dict[str, pa.DataType]:
"""Extract the related Pyarrow data types from a pandas DataFrame."""
func = ray_remote()(pyarrow_types_from_pandas)
first_block_object_ref = _ray_dataset_from_df(df).get_internal_block_refs()[0]
first_block_object_ref = next(_ray_dataset_from_df(df).iter_internal_ref_bundles()).block_refs[0]
return ray_get( # type: ignore[no-any-return]
func(
df=first_block_object_ref,
Expand Down
10 changes: 8 additions & 2 deletions awswrangler/distributed/ray/modin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,21 @@ def _to_modin(

return from_partitions(
partitions=[
_block_to_df(block=block, to_pandas_kwargs=_to_pandas_kwargs) for block in dataset.get_internal_block_refs()
_block_to_df(block=block_ref, to_pandas_kwargs=_to_pandas_kwargs)
for ref_bundle in dataset.iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
],
axis=0,
index=index,
)


def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> list[ObjectRef[Any]]:
object_refs: list[ObjectRef[Any]] = _ray_dataset_from_df(df).get_internal_block_refs()
object_refs: list[ObjectRef[Any]] = [
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
]
return object_refs


Expand Down
6 changes: 5 additions & 1 deletion awswrangler/distributed/ray/modin/s3/_write_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def write_partitions(df: pd.DataFrame, block_index: int) -> tuple[list[str], dic
)
return paths, partitions_values

block_object_refs = _ray_dataset_from_df(df).get_internal_block_refs()
block_object_refs = (
block_ref
for ref_bundle in _ray_dataset_from_df(df).iter_internal_ref_bundles()
for block_ref in ref_bundle.block_refs
)
result = ray_get(
[write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)]
)
Expand Down
5 changes: 0 additions & 5 deletions tests/unit/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database,
assert len(wr.s3.list_objects(path=path3)) == 0


@pytest.mark.modin_index
def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, glue_database, glue_ctas_database):
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
wr.s3.to_parquet(
Expand Down Expand Up @@ -1013,7 +1012,6 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1102,7 +1100,6 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1168,7 +1165,6 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2, 3], [False, True, False, True], ["b", "c", "d", "e"]])
def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, glue_table, bucketing_data):
nb_of_buckets = 2
Expand Down Expand Up @@ -1296,7 +1292,6 @@ def test_combined_bucketing_partitioning_csv_dataset(path, glue_database, glue_t
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_table):
nb_of_buckets = 2
df = pd.DataFrame({"c0": [0, 1, 2, 3], "c1": [4, 6, 5, 7], "c2": ["foo", "bar", "baz", "boo"]})
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_athena_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ def test_athena_csv_types(path, glue_database, glue_table):
ensure_data_types_csv(df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.parametrize("ctas_approach", [True, False])
@pytest.mark.parametrize("line_count", [1, 2])
Expand Down
1 change: 0 additions & 1 deletion tests/unit/test_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ def test_schema_evolution_disabled(path, glue_table, glue_database):
assert df2.c0.sum() == 3


@pytest.mark.modin_index
def test_date_cast(path, glue_table, glue_database):
df = pd.DataFrame(
{
Expand Down
8 changes: 0 additions & 8 deletions tests/unit/test_s3_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ def test_index_recovery_simple_str(path, use_threads):
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -447,7 +446,6 @@ def test_range_index_recovery_simple(path, use_threads):
assert_pandas_equals(df.reset_index(level=0), df2.reset_index(level=0))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -498,7 +496,6 @@ def test_multi_index_recovery_nameless(path, use_threads):
assert_pandas_equals(df.reset_index(), df2.reset_index())


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=(wr.exceptions.InvalidArgumentCombination, AssertionError),
reason="Named index not working when partitioning to a single file",
Expand Down Expand Up @@ -535,7 +532,6 @@ def test_index_schema_validation(path, glue_database, glue_table, index):
assert_pandas_equals(pd.concat([df, df]), df2)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -625,7 +621,6 @@ def test_to_parquet_dataset_sanitize(path):
assert df2.par.to_list() == ["a", "b"]


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [False, True, 2])
def test_timezone_file(path, use_threads):
file_path = f"{path}0.parquet"
Expand All @@ -636,7 +631,6 @@ def test_timezone_file(path, use_threads):
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False, 2])
def test_timezone_file_columns(path, use_threads):
file_path = f"{path}0.parquet"
Expand Down Expand Up @@ -690,7 +684,6 @@ def test_validate_columns(path, partition_cols) -> None:
wr.s3.read_parquet(path, columns=["a", "b", "c"], dataset=True, validate_schema=True)


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand All @@ -715,7 +708,6 @@ def test_mixed_types_column(path) -> None:
wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"])


@pytest.mark.modin_index
@pytest.mark.parametrize("compression", [None, "snappy", "gzip", "zstd"])
def test_parquet_compression(path, compression) -> None:
df = pd.DataFrame({"id": [1, 2, 3]}, dtype="Int64")
Expand Down
3 changes: 0 additions & 3 deletions tests/unit/test_s3_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
assert df_res.equals(dfs[-1])


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand All @@ -205,7 +204,6 @@ def test_json(path):
assert df1.equals(wr.s3.read_json(path=[path0, path1], use_threads=True))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
Expand Down Expand Up @@ -366,7 +364,6 @@ def test_csv_line_terminator(path, line_terminator):
assert df.equals(df2)


@pytest.mark.modin_index
def test_read_json_versioned(path) -> None:
path_file = f"{path}0.json"
dfs = [
Expand Down
Loading