Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow optionally forcing power requests #365

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This release drops support for Python versions older than 3.11.
soc_rx = battery_pool.soc.new_receiver() # new
```

* A power request can now be forced by setting the `include_broken` attribute. This is especially helpful as a safety measure when components appear to be failing, such as when battery metrics are unavailable. Note that applications previously relying on automatic fallback to all batteries when none of them was working will now require the `include_broken` attribute to be explicitly set in the request.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Expand Down
163 changes: 121 additions & 42 deletions src/frequenz/sdk/actor/power_distributing/power_distributing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,12 @@

import asyncio
import logging
import time
from asyncio.tasks import ALL_COMPLETED
from dataclasses import dataclass, replace
from datetime import timedelta
from math import isnan
from typing import ( # pylint: disable=unused-import
Any,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
)
from typing import Any, Dict, Iterable, List, Optional, Self, Set, Tuple

import grpc
from frequenz.channels import Bidirectional, Peekable, Receiver, Sender
Expand Down Expand Up @@ -62,6 +56,40 @@ class _User:
"""The bidirectional channel to communicate with the user."""


@dataclass
class _CacheEntry:
"""Represents an entry in the cache with expiry time."""

inv_bat_pair: InvBatPair
"""The inverter and adjacent battery data pair."""

leandro-lucarella-frequenz marked this conversation as resolved.
Show resolved Hide resolved
expiry_time: int
"""The expiration time (taken from the monotonic clock) of the cache entry."""

@classmethod
def from_ttl(
cls, inv_bat_pair: InvBatPair, ttl: timedelta = timedelta(hours=2.5)
) -> Self:
"""Initialize a CacheEntry instance from a TTL (Time-To-Live).

Args:
inv_bat_pair: the inverter and adjacent battery data pair to cache.
ttl: the time a cache entry is kept alive.

Returns:
this class instance.
"""
return cls(inv_bat_pair, time.monotonic_ns() + int(ttl.total_seconds() * 1e9))

def has_expired(self) -> bool:
"""Check whether the cache entry has expired.

Returns:
whether the cache entry has expired.
"""
return time.monotonic_ns() >= self.expiry_time


@actor
class PowerDistributingActor:
# pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -211,6 +239,10 @@ def __init__(
max_data_age_sec=10.0,
)

self._cached_metrics: dict[int, _CacheEntry | None] = {
daniel-zullo-frequenz marked this conversation as resolved.
Show resolved Hide resolved
bat_id: None for bat_id, _ in self._bat_inv_map.items()
}

def _create_users_tasks(self) -> List[asyncio.Task[None]]:
"""For each user create a task to wait for request.

Expand All @@ -224,37 +256,45 @@ def _create_users_tasks(self) -> List[asyncio.Task[None]]:
)
return tasks

def _get_upper_bound(self, batteries: Set[int]) -> float:
def _get_upper_bound(self, batteries: Set[int], include_broken: bool) -> float:
"""Get total upper bound of power to be set for given batteries.

Note, output of that function doesn't guarantee that this bound will be
the same when the request is processed.

Args:
batteries: List of batteries
include_broken: whether all batteries in the batteries set in the
request must be used regardless the status.

Returns:
Upper bound for `set_power` operation.
"""
pairs_data: List[InvBatPair] = self._get_components_data(batteries)
pairs_data: List[InvBatPair] = self._get_components_data(
batteries, include_broken
)
return sum(
min(battery.power_upper_bound, inverter.active_power_upper_bound)
for battery, inverter in pairs_data
)

def _get_lower_bound(self, batteries: Set[int]) -> float:
def _get_lower_bound(self, batteries: Set[int], include_broken: bool) -> float:
"""Get total lower bound of power to be set for given batteries.

Note, output of that function doesn't guarantee that this bound will be
the same when the request is processed.

Args:
batteries: List of batteries
include_broken: whether all batteries in the batteries set in the
request must be used regardless the status.

Returns:
Lower bound for `set_power` operation.
"""
pairs_data: List[InvBatPair] = self._get_components_data(batteries)
pairs_data: List[InvBatPair] = self._get_components_data(
batteries, include_broken
)
return sum(
max(battery.power_lower_bound, inverter.active_power_lower_bound)
for battery, inverter in pairs_data
Expand Down Expand Up @@ -282,21 +322,19 @@ async def run(self) -> None:

try:
pairs_data: List[InvBatPair] = self._get_components_data(
request.batteries
request.batteries, request.include_broken
)
except KeyError as err:
await user.channel.send(Error(request=request, msg=str(err)))
continue

if len(pairs_data) == 0:
if not pairs_data and not request.include_broken:
error_msg = f"No data for the given batteries {str(request.batteries)}"
await user.channel.send(Error(request=request, msg=str(error_msg)))
continue

try:
distribution = self.distribution_algorithm.distribute_power(
request.power, pairs_data
)
distribution = self._get_power_distribution(request, pairs_data)
except ValueError as err:
error_msg = f"Couldn't distribute power, error: {str(err)}"
await user.channel.send(Error(request=request, msg=str(error_msg)))
Expand Down Expand Up @@ -379,6 +417,44 @@ async def _set_distributed_power(

return self._parse_result(tasks, distribution.distribution, timeout_sec)

def _get_power_distribution(
self, request: Request, inv_bat_pairs: List[InvBatPair]
) -> DistributionResult:
"""Get power distribution result for the batteries in the request.

Args:
request: the power request to process.
inv_bat_pairs: the battery and adjacent inverter data pairs.

Returns:
the power distribution result.
"""
available_bat_ids = {battery.component_id for battery, _ in inv_bat_pairs}
unavailable_bat_ids = request.batteries - available_bat_ids
unavailable_inv_ids = {
self._bat_inv_map[battery_id] for battery_id in unavailable_bat_ids
}

if request.include_broken and not available_bat_ids:
return self.distribution_algorithm.distribute_power_equally(
request.power, unavailable_inv_ids
)

result = self.distribution_algorithm.distribute_power(
request.power, inv_bat_pairs
)

if request.include_broken and unavailable_inv_ids:
additional_result = self.distribution_algorithm.distribute_power_equally(
result.remaining_power, unavailable_inv_ids
)

for inv_id, power in additional_result.distribution.items():
result.distribution[inv_id] = power
result.remaining_power = 0.0

return result
leandro-lucarella-frequenz marked this conversation as resolved.
Show resolved Hide resolved

def _check_request(self, request: Request) -> Optional[Result]:
"""Check whether the given request if correct.

Expand All @@ -388,6 +464,9 @@ def _check_request(self, request: Request) -> Optional[Result]:
Returns:
Result for the user if the request is wrong, None otherwise.
"""
if not request.batteries:
return Error(request=request, msg="Empty battery IDs in the request")

for battery in request.batteries:
if battery not in self._battery_receivers:
msg = (
Expand All @@ -398,11 +477,11 @@ def _check_request(self, request: Request) -> Optional[Result]:

if not request.adjust_power:
if request.power < 0:
bound = self._get_lower_bound(request.batteries)
bound = self._get_lower_bound(request.batteries, request.include_broken)
if request.power < bound:
return OutOfBound(request=request, bound=bound)
else:
bound = self._get_upper_bound(request.batteries)
bound = self._get_upper_bound(request.batteries, request.include_broken)
if request.power > bound:
return OutOfBound(request=request, bound=bound)

Expand Down Expand Up @@ -551,29 +630,15 @@ def _get_components_pairs(

return bat_inv_map, inv_bat_map

def _get_working_batteries(self, batteries: Set[int]) -> Set[int]:
"""Get subset with working batteries.

If none of the given batteries are working, then treat all of them
as working.

Args:
batteries: requested batteries

Returns:
Subset with working batteries or input set if none of the given batteries
are working.
"""
working_batteries = self._all_battery_status.get_working_batteries(batteries)
if len(working_batteries) == 0:
return batteries
return working_batteries

def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
def _get_components_data(
self, batteries: Set[int], include_broken: bool
) -> List[InvBatPair]:
"""Get data for the given batteries and adjacent inverters.

Args:
batteries: Batteries that needs data.
include_broken: whether all batteries in the batteries set in the
request must be used regardless the status.

Raises:
KeyError: If any battery in the given list doesn't exists in microgrid.
Expand All @@ -582,7 +647,11 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
Pairs of battery and adjacent inverter data.
"""
pairs_data: List[InvBatPair] = []
working_batteries = self._get_working_batteries(batteries)
working_batteries = (
batteries
if include_broken
else self._all_battery_status.get_working_batteries(batteries)
)

for battery_id in working_batteries:
if battery_id not in self._battery_receivers:
Expand All @@ -594,6 +663,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
inverter_id: int = self._bat_inv_map[battery_id]

data = self._get_battery_inverter_data(battery_id, inverter_id)
if not data and include_broken:
cached_entry = self._cached_metrics[battery_id]
if cached_entry and not cached_entry.has_expired():
data = cached_entry.inv_bat_pair
else:
data = None
if data is None:
_logger.warning(
"Skipping battery %d because its message isn't correct.",
Expand Down Expand Up @@ -661,7 +736,9 @@ def _get_battery_inverter_data(

# If all values are ok then return them.
if not any(map(isnan, replaceable_metrics)):
return InvBatPair(battery_data, inverter_data)
inv_bat_pair = InvBatPair(battery_data, inverter_data)
self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair)
return inv_bat_pair

# Replace NaN with the corresponding value in the adjacent component.
# If both metrics are None, return None to ignore this battery.
Expand All @@ -683,10 +760,12 @@ def _get_battery_inverter_data(
elif isnan(inv_bound):
inverter_new_metrics[inv_attr] = bat_bound

return InvBatPair(
inv_bat_pair = InvBatPair(
replace(battery_data, **battery_new_metrics),
replace(inverter_data, **inverter_new_metrics),
)
self._cached_metrics[battery_id] = _CacheEntry.from_ttl(inv_bat_pair)
return inv_bat_pair

async def _create_channels(self) -> None:
"""Create channels to get data of components in microgrid."""
Expand Down
11 changes: 11 additions & 0 deletions src/frequenz/sdk/actor/power_distributing/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,14 @@ class Request:
If `False` and the power is outside the batteries' bounds, the request will
fail and be replied to with an `OutOfBound` result.
"""

include_broken: bool = False
"""Whether to use all batteries included in the batteries set regardless the status.

if `True`, the remaining power after distributing between working batteries
will be distributed equally between broken batteries. Also if all batteries
in the batteries set are broken then the power is distributed equally between
broken batteries.

if `False`, the power will be only distributed between the working batteries.
"""
21 changes: 21 additions & 0 deletions src/frequenz/sdk/power/_distribution_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,27 @@ def _greedy_distribute_remaining_power(

return DistributionResult(new_distribution, remaining_power)

def distribute_power_equally(
self, power: float, inverters: set[int]
) -> DistributionResult:
"""Distribute the power equally between the inverters in the set.

This function is mainly useful to set the power for components that are
broken or have no metrics available.

Args:
power: the power to distribute.
inverters: the inverters to set the power to.

Returns:
the power distribution result.
"""
power_per_inverter = power / len(inverters)
return DistributionResult(
distribution={id: power_per_inverter for id in inverters},
remaining_power=0.0,
)

def distribute_power(
self, power: float, components: List[InvBatPair]
) -> DistributionResult:
Expand Down
Loading