Skip to content

Commit

Permalink
Merge branch 'main' into fix-athena_query_wait_polling_delay-default-…
Browse files Browse the repository at this point in the history
…value-in-docstring
  • Loading branch information
jaidisido committed Jun 25, 2024
2 parents 93ff90f + 12bac7c commit d4accdd
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
4 changes: 3 additions & 1 deletion awswrangler/athena/_write_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,9 @@ def to_iceberg(
sql_statement = f"""
MERGE INTO "{database}"."{table}" target
USING "{database}"."{temp_table}" source
ON {' AND '.join([f'target."{x}" = source."{x}"' for x in merge_cols])}
ON {' AND '.join([
f'(target."{x}" = source."{x}" OR (target."{x}" IS NULL AND source."{x}" IS NULL))'
for x in merge_cols])}
{match_condition}
WHEN NOT MATCHED THEN
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/test_athena_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,74 @@ def test_athena_to_iceberg_merge_into(path: str, path2: str, glue_database: str,
assert_pandas_equals(df_expected, df_out)


def test_athena_to_iceberg_merge_into_nulls(path: str, path2: str, glue_database: str, glue_table: str) -> None:
df = pd.DataFrame(
{
"col1": ["a", "a", "a", np.nan],
"col2": [0.0, 1.1, np.nan, 2.2],
"action": ["insert", "insert", "insert", "insert"],
}
)
df["col1"] = df["col1"].astype("string")
df["col2"] = df["col2"].astype("float64")
df["action"] = df["action"].astype("string")

wr.athena.to_iceberg(
df=df,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
keep_files=False,
)

# Perform MERGE INTO
df2 = pd.DataFrame(
{
"col1": ["a", "a", np.nan, "b"],
"col2": [1.1, np.nan, 2.2, 3.3],
"action": ["update", "update", "update", "insert"],
}
)
df2["col1"] = df2["col1"].astype("string")
df2["col2"] = df2["col2"].astype("float64")
df2["action"] = df2["action"].astype("string")

wr.athena.to_iceberg(
df=df2,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=path2,
keep_files=False,
merge_cols=["col1", "col2"],
)

# Expected output
df_expected = pd.DataFrame(
{
"col1": ["a", "a", "a", np.nan, "b"],
"col2": [0.0, 1.1, np.nan, 2.2, 3.3],
"action": ["insert", "update", "update", "update", "insert"],
}
)
df_expected["col1"] = df_expected["col1"].astype("string")
df_expected["col2"] = df_expected["col2"].astype("float64")
df_expected["action"] = df_expected["action"].astype("string")

df_out = wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)

assert_pandas_equals(
df_out.sort_values(df_out.columns.to_list()).reset_index(drop=True),
df_expected.sort_values(df_expected.columns.to_list()).reset_index(drop=True),
)


def test_athena_to_iceberg_merge_into_ignore(path: str, path2: str, glue_database: str, glue_table: str) -> None:
df = pd.DataFrame({"title": ["Dune", "Fargo"], "year": ["1984", "1996"], "gross": [35_000_000, 60_000_000]})
df["title"] = df["title"].astype("string")
Expand Down

0 comments on commit d4accdd

Please sign in to comment.