Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upgrade Ray to 2.30 #2870

Merged
merged 9 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion awswrangler/athena/_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def create_prepared_statement(


@apply_configs
def list_prepared_statements(workgroup: str = "primary", boto3_session: boto3.Session | None = None) -> list[str]:
def list_prepared_statements(
workgroup: str = "primary", boto3_session: boto3.Session | None = None
) -> list[dict[str, Any]]:
"""
List the prepared statements in the specified workgroup.

Expand Down
5 changes: 2 additions & 3 deletions awswrangler/athena/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Dict,
Generator,
NamedTuple,
Sequence,
TypedDict,
Union,
cast,
Expand All @@ -36,7 +35,7 @@
from ._cache import _cache_manager, _LocalMetadataCacheManager

if TYPE_CHECKING:
from mypy_boto3_glue.type_defs import ColumnTypeDef
from mypy_boto3_glue.type_defs import ColumnOutputTypeDef

_QUERY_FINAL_STATES: list[str] = ["FAILED", "SUCCEEDED", "CANCELLED"]
_QUERY_WAIT_POLLING_DELAY: float = 1.0 # SECONDS
Expand Down Expand Up @@ -993,7 +992,7 @@ def generate_create_query(

"""

def parse_columns(columns_description: Sequence["ColumnTypeDef"]) -> str:
def parse_columns(columns_description: list["ColumnOutputTypeDef"]) -> str:
columns_str: list[str] = []
for column in columns_description:
column_str = f" `{column['Name']}` {column['Type']}"
Expand Down
4 changes: 2 additions & 2 deletions awswrangler/catalog/_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import base64
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, cast
from typing import TYPE_CHECKING, Any, Dict, Iterator, cast

import boto3
import botocore.exceptions
Expand Down Expand Up @@ -893,7 +893,7 @@ def get_columns_parameters(
table: str,
catalog_id: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, Mapping[str, str] | None]:
) -> dict[str, dict[str, str] | None]:
"""Get all columns parameters.

Parameters
Expand Down
205 changes: 74 additions & 131 deletions awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -34,10 +35,11 @@
)
from ray.data.datasource.datasource import ReadTask
from ray.data.datasource.file_meta_provider import (
DefaultParquetMetadataProvider,
ParquetMetadataProvider,
_handle_read_os_error,
)
from ray.data.datasource.parquet_meta_provider import (
ParquetMetadataProvider,
)
from ray.data.datasource.partitioning import PathPartitionFilter
from ray.data.datasource.path_util import (
_has_file_extension,
Expand All @@ -55,8 +57,6 @@

_logger: logging.Logger = logging.getLogger(__name__)

FRAGMENTS_PER_META_FETCH = 6
PARALLELIZE_META_FETCH_THRESHOLD = 24

# The number of rows to read per batch. This is sized to generate 10MiB batches
# for rows about 1KiB in size.
Expand Down Expand Up @@ -93,6 +93,12 @@
PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS = 1024


@dataclass(frozen=True)
class _SampleInfo:
actual_bytes_per_row: int | None
estimated_bytes_per_row: int | None


# TODO(ekl) this is a workaround for a pyarrow serialization bug, where serializing a
# raw pyarrow file fragment causes S3 network calls.
class _SerializedFragment:
Expand All @@ -117,38 +123,6 @@ def _deserialize_fragments(
return [p.deserialize() for p in serialized_fragments]


class _ParquetFileFragmentMetaData:
"""Class to store metadata of a Parquet file fragment.

This includes all attributes from `pyarrow.parquet.FileMetaData` except for `schema`,
which is stored in `self.schema_pickled` as a pickled object from
`cloudpickle.loads()`, used in deduplicating schemas across multiple fragments.
"""

def __init__(self, fragment_metadata: "pyarrow.parquet.FileMetaData"):
self.created_by = fragment_metadata.created_by
self.format_version = fragment_metadata.format_version
self.num_columns = fragment_metadata.num_columns
self.num_row_groups = fragment_metadata.num_row_groups
self.num_rows = fragment_metadata.num_rows
self.serialized_size = fragment_metadata.serialized_size
# This is a pickled schema object, to be set later with
# `self.set_schema_pickled()`. To get the underlying schema, use
# `cloudpickle.loads(self.schema_pickled)`.
self.schema_pickled: bytes | None = None

# Calculate the total byte size of the file fragment using the original
# object, as it is not possible to access row groups from this class.
self.total_byte_size = 0
for row_group_idx in range(fragment_metadata.num_row_groups):
row_group_metadata = fragment_metadata.row_group(row_group_idx)
self.total_byte_size += row_group_metadata.total_byte_size

def set_schema_pickled(self, schema_pickled: bytes) -> None:
"""Note: to get the underlying schema, use `cloudpickle.loads(self.schema_pickled)`."""
self.schema_pickled = schema_pickled


# This retry helps when the upstream datasource is not able to handle
# overloaded read request or failed with some retriable failures.
# For example when reading data from HA hdfs service, hdfs might
Expand Down Expand Up @@ -213,7 +187,7 @@ def __init__( # noqa: PLR0912,PLR0915
arrow_parquet_args: dict[str, Any] | None = None,
_block_udf: Callable[[Block], Block] | None = None,
filesystem: "pyarrow.fs.FileSystem" | None = None,
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
meta_provider: ParquetMetadataProvider = ParquetMetadataProvider(),
partition_filter: PathPartitionFilter | None = None,
shuffle: Literal["files"] | None = None,
include_paths: bool = False,
Expand Down Expand Up @@ -299,8 +273,7 @@ def __init__( # noqa: PLR0912,PLR0915
prefetch_remote_args = {}
if self._local_scheduling:
prefetch_remote_args["scheduling_strategy"] = self._local_scheduling
raw_metadata = meta_provider.prefetch_file_metadata(pq_ds.fragments, **prefetch_remote_args) or []
self._metadata = self._dedupe_metadata(raw_metadata)
self._metadata = meta_provider.prefetch_file_metadata(pq_ds.fragments, **prefetch_remote_args) or []
except OSError as e:
_handle_read_os_error(e, paths)
except pa.ArrowInvalid as ex:
Expand All @@ -319,43 +292,15 @@ def __init__( # noqa: PLR0912,PLR0915
self._columns = columns
self._schema = schema
self._arrow_parquet_args = arrow_parquet_args
self._encoding_ratio = self._estimate_files_encoding_ratio()
self._file_metadata_shuffler = None
self._include_paths = include_paths
self._path_root = path_root
if shuffle == "files":
self._file_metadata_shuffler = np.random.default_rng()

def _dedupe_metadata(
self,
raw_metadatas: list["pyarrow.parquet.FileMetaData"],
) -> list[_ParquetFileFragmentMetaData]:
"""Deduplicate schemas to reduce memory usage.

For datasets with a large number of columns, the FileMetaData
(in particular the schema) can be very large. We can reduce the
memory usage by only keeping unique schema objects across all
file fragments. This method deduplicates the schemas and returns
a list of `_ParquetFileFragmentMetaData` objects.
"""
schema_to_id: dict[int, Any] = {} # schema_id -> serialized_schema
id_to_schema: dict[Any, bytes] = {} # serialized_schema -> schema_id
stripped_metadatas = []
for fragment_metadata in raw_metadatas:
stripped_md = _ParquetFileFragmentMetaData(fragment_metadata)

schema_ser = cloudpickle.dumps(fragment_metadata.schema.to_arrow_schema()) # type: ignore[no-untyped-call]
if schema_ser not in schema_to_id:
schema_id: int | None = len(schema_to_id)
schema_to_id[schema_ser] = schema_id
id_to_schema[schema_id] = schema_ser
stripped_md.set_schema_pickled(schema_ser)
else:
schema_id = schema_to_id.get(schema_ser)
existing_schema_ser = id_to_schema[schema_id]
stripped_md.set_schema_pickled(existing_schema_ser)
stripped_metadatas.append(stripped_md)
return stripped_metadatas
sample_infos = self._sample_fragments()
self._encoding_ratio = _estimate_files_encoding_ratio(sample_infos)
self._default_read_batch_size_rows = _estimate_default_read_batch_size_rows(sample_infos)

def estimate_inmemory_data_size(self) -> int | None:
"""Return an estimate of the Parquet files encoding ratio.
Expand Down Expand Up @@ -414,25 +359,18 @@ def get_read_tasks(self, parallelism: int) -> list[ReadTask]:
if meta.size_bytes is not None:
meta.size_bytes = int(meta.size_bytes * self._encoding_ratio)

if meta.num_rows and meta.size_bytes:
# Make sure the batches read are small enough to enable yielding of
# output blocks incrementally during the read.
row_size = meta.size_bytes / meta.num_rows
# Make sure the row batch size is small enough that block splitting
# is still effective.
max_parquet_reader_row_batch_size_bytes = DataContext.get_current().target_max_block_size // 10
default_read_batch_size_rows = max(
1,
min(
PARQUET_READER_ROW_BATCH_SIZE,
max_parquet_reader_row_batch_size_bytes // row_size,
),
)
else:
default_read_batch_size_rows = PARQUET_READER_ROW_BATCH_SIZE
block_udf, arrow_parquet_args, columns, schema, path_root, include_paths = (
(
block_udf,
arrow_parquet_args,
default_read_batch_size_rows,
columns,
schema,
path_root,
include_paths,
) = (
self._block_udf,
self._arrow_parquet_args,
self._default_read_batch_size_rows,
self._columns,
self._schema,
self._path_root,
Expand All @@ -456,14 +394,7 @@ def get_read_tasks(self, parallelism: int) -> list[ReadTask]:

return read_tasks

def _estimate_files_encoding_ratio(self) -> float:
"""Return an estimate of the Parquet files encoding ratio.

To avoid OOMs, it is safer to return an over-estimate than an underestimate.
"""
if not DataContext.get_current().decoding_size_estimation:
return PARQUET_ENCODING_RATIO_ESTIMATE_DEFAULT

def _sample_fragments(self) -> list[_SampleInfo]:
# Sample a few rows from Parquet files to estimate the encoding ratio.
# Launch tasks to sample multiple files remotely in parallel.
# Evenly distributed to sample N rows in i-th row group in i-th file.
Expand Down Expand Up @@ -495,11 +426,10 @@ def _estimate_files_encoding_ratio(self) -> float:
)
)
sample_bar = ProgressBar("Parquet Files Sample", len(futures))
sample_ratios = sample_bar.fetch_until_complete(futures)
sample_infos = sample_bar.fetch_until_complete(futures)
sample_bar.close() # type: ignore[no-untyped-call]
ratio = np.mean(sample_ratios)
_logger.debug(f"Estimated Parquet encoding ratio from sampling is {ratio}.")
return max(ratio, PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND) # type: ignore[no-any-return]

return sample_infos

def get_name(self) -> str:
"""Return a human-readable name for this datasource.
Expand Down Expand Up @@ -577,33 +507,12 @@ def _read_fragments(
yield table


def _fetch_metadata_serialization_wrapper(
fragments: list[_SerializedFragment],
) -> list["pyarrow.parquet.FileMetaData"]:
fragments: list["pyarrow._dataset.ParquetFileFragment"] = _deserialize_fragments_with_retry(fragments) # type: ignore[no-redef]

return _fetch_metadata(fragments)


def _fetch_metadata(
fragments: list["pyarrow.dataset.ParquetFileFragment"],
) -> list["pyarrow.parquet.FileMetaData"]:
fragment_metadata = []
for f in fragments:
try:
fragment_metadata.append(f.metadata)
except AttributeError:
break
return fragment_metadata


def _sample_fragment(
columns: list[str] | None,
schema: type | "pyarrow.lib.Schema" | None,
file_fragment: _SerializedFragment,
) -> float:
) -> _SampleInfo:
# Sample the first rows batch from file fragment `serialized_fragment`.
# Return the encoding ratio calculated from the sampled rows.
fragment = _deserialize_fragments_with_retry([file_fragment])[0]

# Only sample the first row group.
Expand All @@ -616,23 +525,57 @@ def _sample_fragment(
schema=schema,
batch_size=batch_size,
)
# Use first batch in-memory size as ratio estimation.
# Use first batch in-memory size for estimation.
try:
batch = next(batches)
except StopIteration:
ratio = PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND
sample_data = _SampleInfo(actual_bytes_per_row=None, estimated_bytes_per_row=None)
else:
if batch.num_rows > 0:
in_memory_size = batch.nbytes / batch.num_rows
metadata = fragment.metadata
total_size = 0
for idx in range(metadata.num_row_groups):
total_size += metadata.row_group(idx).total_byte_size
file_size = total_size / metadata.num_rows
ratio = in_memory_size / file_size
sample_data = _SampleInfo(
actual_bytes_per_row=batch.nbytes / batch.num_rows,
estimated_bytes_per_row=total_size / metadata.num_rows,
)
else:
ratio = PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND
_logger.debug(
f"Estimated Parquet encoding ratio is {ratio} for fragment {fragment} " f"with batch size {batch_size}."
)
return ratio
sample_data = _SampleInfo(actual_bytes_per_row=None, estimated_bytes_per_row=None)
return sample_data


def _estimate_files_encoding_ratio(sample_infos: list[_SampleInfo]) -> float:
"""Return an estimate of the Parquet files encoding ratio.

To avoid OOMs, it is safer to return an over-estimate than an underestimate.
"""
if not DataContext.get_current().decoding_size_estimation:
return PARQUET_ENCODING_RATIO_ESTIMATE_DEFAULT

def compute_encoding_ratio(sample_info: _SampleInfo) -> float:
if sample_info.actual_bytes_per_row is None or sample_info.estimated_bytes_per_row is None:
return PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND
else:
return sample_info.actual_bytes_per_row / sample_info.estimated_bytes_per_row

ratio = np.mean(list(map(compute_encoding_ratio, sample_infos)))
_logger.debug(f"Estimated Parquet encoding ratio from sampling is {ratio}.")
return max(ratio, PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND) # type: ignore[return-value]


def _estimate_default_read_batch_size_rows(sample_infos: list[_SampleInfo]) -> int:
def compute_batch_size_rows(sample_info: _SampleInfo) -> int:
if sample_info.actual_bytes_per_row is None:
return PARQUET_READER_ROW_BATCH_SIZE
else:
max_parquet_reader_row_batch_size_bytes = DataContext.get_current().target_max_block_size // 10
return max(
1,
min(
PARQUET_READER_ROW_BATCH_SIZE,
max_parquet_reader_row_batch_size_bytes // sample_info.actual_bytes_per_row,
),
)

return np.mean(list(map(compute_batch_size_rows, sample_infos))) # type: ignore[return-value]
Loading
Loading