diff --git a/CHANGELOG.md b/CHANGELOG.md index 95082dcfc..1def61605 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ Removed: ### Added +- `Impact.impact_at_reg` method for aggregating impacts per country or custom region [#642](https://github.com/CLIMADA-project/climada_python/pull/642) + ### Changed ### Fixed diff --git a/climada/engine/impact.py b/climada/engine/impact.py index b90840883..dbccb48c6 100755 --- a/climada/engine/impact.py +++ b/climada/engine/impact.py @@ -193,8 +193,6 @@ def __init__(self, else: self.imp_mat = sparse.csr_matrix(np.empty((0, 0))) - - def calc(self, exposures, impact_funcs, hazard, save_mat=False, assign_centroids=True): """This function is deprecated, use ``ImpactCalc.impact`` instead. """ @@ -392,6 +390,55 @@ def impact_per_year(self, all_years=True, year_range=None): year_set[year] = sum(self.at_event[orig_year == year]) return year_set + def impact_at_reg(self, agg_regions=None): + """Aggregate impact on given aggregation regions. This method works + only if Impact.imp_mat was stored during the impact calculation. + + Parameters + ---------- + agg_regions : np.array, list (optional) + The length of the array must equal the number of centroids in exposures. + It reports what macro-regions these centroids belong to. For example, + asuming there are three centroids and agg_regions = ['A', 'A', 'B'] + then impact of the first and second centroids will be assigned to + region A, whereas impact from the third centroid will be assigned + to area B. If no aggregation regions are passed, the method aggregates + impact at the country (admin_0) level. + Default is None. + + Returns + ------- + pd.DataFrame + Contains the aggregated data per event. + Rows: Hazard events. Columns: Aggregation regions. + """ + if self.imp_mat.nnz == 0: + raise ValueError( + "The aggregated impact cannot be computed as no Impact.imp_mat was " + "stored during the impact calculation" + ) + + if agg_regions is None: + agg_regions = u_coord.country_to_iso( + u_coord.get_country_code(self.coord_exp[:, 0], self.coord_exp[:, 1]) + ) + + agg_regions = np.asanyarray(agg_regions) + agg_reg_unique = np.unique(agg_regions) + + at_reg_event = np.hstack( + [ + self.imp_mat[:, np.where(agg_regions == reg)[0]].sum(1) + for reg in np.unique(agg_reg_unique) + ] + ) + + at_reg_event = pd.DataFrame( + at_reg_event, columns=np.unique(agg_reg_unique), index=self.event_id + ) + + return at_reg_event + def calc_impact_year_set(self,all_years=True, year_range=None): """This function is deprecated, use Impact.impact_per_year instead.""" LOGGER.warning("The use of Impact.calc_impact_year_set is deprecated." diff --git a/climada/engine/test/test_impact.py b/climada/engine/test/test_impact.py index 4e4d9637d..0ac3a470f 100644 --- a/climada/engine/test/test_impact.py +++ b/climada/engine/test/test_impact.py @@ -496,6 +496,78 @@ def test_local_exceedance_imp_pass(self): self.assertAlmostEqual(np.max(impact_rp), 2916964966.388219, places=5) self.assertAlmostEqual(np.min(impact_rp), 444457580.131494, places=5) + +class TestImpactReg(unittest.TestCase): + """Test impact aggregation per aggregation region or admin 0""" + + def setUp(self): + """Build the impact object for testing""" + self.imp = dummy_impact() + + def test_agg_regions(self): + """Test calc local impacts per region""" + # Aggregate over a single region + region_ids = ["A", "A"] + at_reg_event = self.imp.impact_at_reg(region_ids) + + self.assertEqual(at_reg_event.sum().sum(), self.imp.at_event.sum()) + self.assertEqual(at_reg_event.shape[0], self.imp.at_event.shape[0]) + self.assertEqual(at_reg_event.shape[1], np.unique(region_ids).shape[0]) + + # Aggregate over two different regions + region_ids = ["A", "B"] + at_reg_event = self.imp.impact_at_reg(region_ids) + + self.assertEqual(at_reg_event["A"].sum(), self.imp.imp_mat[:, 0].sum()) + self.assertEqual(at_reg_event["B"].sum(), self.imp.imp_mat[:, 1].sum()) + + self.assertEqual(at_reg_event.sum().sum(), self.imp.at_event.sum()) + self.assertEqual(at_reg_event.shape[0], self.imp.at_event.shape[0]) + self.assertEqual(at_reg_event.shape[1], np.unique(region_ids).shape[0]) + + def test_admin0(self): + """Test with aggregation to countries""" + # Let's specify sample cities' coords + zurich_lat, zurich_lon = 47.37, 8.55 + bern_lat, bern_lon = 46.94, 7.44 + rome_lat, rome_lon = 41.89, 12.51 + + # Test admin 0 with one country + self.imp.coord_exp = np.array([[zurich_lat, zurich_lon], [bern_lat, bern_lon]]) + + at_reg_event = self.imp.impact_at_reg() + + self.assertEqual(len(at_reg_event.columns), 1) + self.assertEqual(at_reg_event.columns[0], "CHE") + + self.assertEqual(at_reg_event.shape[0], self.imp.at_event.shape[0]) + self.assertEqual( + at_reg_event["CHE"].sum(), at_reg_event.sum().sum(), self.imp.at_event.sum() + ) + + # Test admin 0 with two countries + self.imp.coord_exp = np.array([[rome_lat, rome_lon], [bern_lat, bern_lon]]) + at_reg_event = self.imp.impact_at_reg() + + self.assertEqual(len(at_reg_event.columns), 2) + self.assertEqual(at_reg_event.columns[0], "CHE") + self.assertEqual(at_reg_event.columns[1], "ITA") + + self.assertEqual(at_reg_event.shape[0], self.imp.at_event.shape[0]) + self.assertEqual(at_reg_event["CHE"].sum(), self.imp.imp_mat[:, 0].sum()) + self.assertEqual(at_reg_event["ITA"].sum(), self.imp.imp_mat[:, 1].sum()) + self.assertEqual(at_reg_event.sum().sum(), self.imp.at_event.sum()) + + def test_no_imp_mat(self): + """Check error if no impact matrix is stored""" + # Test error when no imp_mat is stored + self.imp.imp_mat = sparse.csr_matrix((0, 0)) + + with self.assertRaises(ValueError) as cm: + self.imp.impact_at_reg() + self.assertIn("no Impact.imp_mat was stored", str(cm.exception)) + + class TestRiskTrans(unittest.TestCase): """Test risk transfer methods""" def test_risk_trans_pass(self): @@ -1027,6 +1099,7 @@ def write_tag(group, tag_kwds): TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactPerYear)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestIO)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestRPmatrix)) + TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactReg)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestRiskTrans)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestSelect)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestConvertExp))