Skip to content

Commit

Permalink
Code quality improvements in the DataPipeline and *Pool classes (#…
Browse files Browse the repository at this point in the history
…948)

Some more cleanup in preparation for adding support for incremental
power actors in the `PowerManager`.
  • Loading branch information
shsms committed May 15, 2024
2 parents a31c845 + 4ecdc79 commit e4c9fe9
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 93 deletions.
45 changes: 25 additions & 20 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ def producer(self) -> Producer:
)
return self._producer

def grid(self) -> Grid:
"""Return the grid measuring point."""
if self._grid is None:
initialize_grid(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
self._grid = get_grid()

return self._grid

def ev_charger_pool(
self,
*,
Expand Down Expand Up @@ -250,7 +261,9 @@ def ev_charger_pool(
)
)
return EVChargerPool(
self._ev_charger_pool_reference_stores[ref_store_key], name, priority
pool_ref_store=self._ev_charger_pool_reference_stores[ref_store_key],
name=name,
priority=priority,
)

def pv_pool(
Expand Down Expand Up @@ -317,18 +330,11 @@ def pv_pool(
component_ids=component_ids,
)

return PVPool(self._pv_pool_reference_stores[ref_store_key], name, priority)

def grid(self) -> Grid:
"""Return the grid measuring point."""
if self._grid is None:
initialize_grid(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
self._grid = get_grid()

return self._grid
return PVPool(
pool_ref_store=self._pv_pool_reference_stores[ref_store_key],
name=name,
priority=priority,
)

def battery_pool(
self,
Expand Down Expand Up @@ -400,7 +406,9 @@ def battery_pool(
)

return BatteryPool(
self._battery_pool_reference_stores[ref_store_key], name, priority
pool_ref_store=self._battery_pool_reference_stores[ref_store_key],
name=name,
priority=priority,
)

def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
Expand Down Expand Up @@ -518,8 +526,7 @@ def ev_charger_pool(
propose different power values for the same set of EV chargers.
!!! note
When specifying priority, bigger values indicate higher priority. The default
priority is the lowest possible value.
When specifying priority, bigger values indicate higher priority.
It is recommended to reuse the same instance of the `EVChargerPool` within the
same actor, unless they are managing different sets of EV chargers.
Expand Down Expand Up @@ -558,8 +565,7 @@ def battery_pool(
propose different power values for the same set of batteries.
!!! note
When specifying priority, bigger values indicate higher priority. The default
priority is the lowest possible value.
When specifying priority, bigger values indicate higher priority.
It is recommended to reuse the same instance of the `BatteryPool` within the
same actor, unless they are managing different sets of batteries.
Expand Down Expand Up @@ -598,8 +604,7 @@ def pv_pool(
propose different power values for the same set of PV inverters.
!!! note
When specifying priority, bigger values indicate higher priority. The default
priority is the lowest possible value.
When specifying priority, bigger values indicate higher priority.
It is recommended to reuse the same instance of the `PVPool` within the same
actor, unless they are managing different sets of PV inverters.
Expand Down
83 changes: 42 additions & 41 deletions src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class BatteryPool:

def __init__(
self,
battery_pool_ref: BatteryPoolReferenceStore,
*,
pool_ref_store: BatteryPoolReferenceStore,
name: str | None,
priority: int,
):
Expand All @@ -67,12 +68,12 @@ def __init__(
for creating `BatteryPool` instances.
Args:
battery_pool_ref: The battery pool reference store instance.
pool_ref_store: The battery pool reference store instance.
name: An optional name used to identify this instance of the pool or a
corresponding actor in the logs.
priority: The priority of the actor using this wrapper.
"""
self._battery_pool = battery_pool_ref
self._pool_ref_store = pool_ref_store
unique_id = str(uuid.uuid4())
self._source_id = unique_id if name is None else f"{name}-{unique_id}"
self._priority = priority
Expand Down Expand Up @@ -120,12 +121,12 @@ async def propose_power(
to the maximum power of the batteries in the pool. This is currently
and experimental feature.
"""
await self._battery_pool._power_manager_requests_sender.send(
await self._pool_ref_store._power_manager_requests_sender.send(
_power_managing.Proposal(
source_id=self._source_id,
preferred_power=power,
bounds=bounds,
component_ids=self._battery_pool._batteries,
component_ids=self._pool_ref_store._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
request_timeout=request_timeout,
Expand Down Expand Up @@ -166,12 +167,12 @@ async def propose_charge(
"""
if power and power < Power.zero():
raise ValueError("Charge power must be positive.")
await self._battery_pool._power_manager_requests_sender.send(
await self._pool_ref_store._power_manager_requests_sender.send(
_power_managing.Proposal(
source_id=self._source_id,
preferred_power=power,
bounds=timeseries.Bounds(None, None),
component_ids=self._battery_pool._batteries,
component_ids=self._pool_ref_store._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
request_timeout=request_timeout,
Expand Down Expand Up @@ -214,12 +215,12 @@ async def propose_discharge(
if power < Power.zero():
raise ValueError("Discharge power must be positive.")
power = -power
await self._battery_pool._power_manager_requests_sender.send(
await self._pool_ref_store._power_manager_requests_sender.send(
_power_managing.Proposal(
source_id=self._source_id,
preferred_power=power,
bounds=timeseries.Bounds(None, None),
component_ids=self._battery_pool._batteries,
component_ids=self._pool_ref_store._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
request_timeout=request_timeout,
Expand All @@ -233,7 +234,7 @@ def component_ids(self) -> abc.Set[int]:
Returns:
Ids of the batteries in the pool
"""
return self._battery_pool._batteries
return self._pool_ref_store._batteries

@property
def power(self) -> FormulaEngine[Power]:
Expand All @@ -251,11 +252,11 @@ def power(self) -> FormulaEngine[Power]:
A FormulaEngine that will calculate and stream the total power of all
batteries in the pool.
"""
engine = self._battery_pool._formula_pool.from_power_formula_generator(
engine = self._pool_ref_store._formula_pool.from_power_formula_generator(
"battery_pool_power",
BatteryPowerFormula,
FormulaGeneratorConfig(
component_ids=self._battery_pool._batteries,
component_ids=self._pool_ref_store._batteries,
),
)
assert isinstance(engine, FormulaEngine)
Expand Down Expand Up @@ -298,15 +299,15 @@ def soc(self) -> ReceiverFetcher[Sample[Percentage]]:
"""
method_name = SendOnUpdate.name() + "_" + SoCCalculator.name()

if method_name not in self._battery_pool._active_methods:
calculator = SoCCalculator(self._battery_pool._batteries)
self._battery_pool._active_methods[method_name] = SendOnUpdate(
if method_name not in self._pool_ref_store._active_methods:
calculator = SoCCalculator(self._pool_ref_store._batteries)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._battery_pool._working_batteries,
min_update_interval=self._battery_pool._min_update_interval,
working_batteries=self._pool_ref_store._working_batteries,
min_update_interval=self._pool_ref_store._min_update_interval,
)

return self._battery_pool._active_methods[method_name]
return self._pool_ref_store._active_methods[method_name]

@property
def temperature(self) -> ReceiverFetcher[Sample[Temperature]]:
Expand All @@ -317,14 +318,14 @@ def temperature(self) -> ReceiverFetcher[Sample[Temperature]]:
of all batteries in the pool.
"""
method_name = SendOnUpdate.name() + "_" + TemperatureCalculator.name()
if method_name not in self._battery_pool._active_methods:
calculator = TemperatureCalculator(self._battery_pool._batteries)
self._battery_pool._active_methods[method_name] = SendOnUpdate(
if method_name not in self._pool_ref_store._active_methods:
calculator = TemperatureCalculator(self._pool_ref_store._batteries)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._battery_pool._working_batteries,
min_update_interval=self._battery_pool._min_update_interval,
working_batteries=self._pool_ref_store._working_batteries,
min_update_interval=self._pool_ref_store._min_update_interval,
)
return self._battery_pool._active_methods[method_name]
return self._pool_ref_store._active_methods[method_name]

@property
def capacity(self) -> ReceiverFetcher[Sample[Energy]]:
Expand Down Expand Up @@ -355,15 +356,15 @@ def capacity(self) -> ReceiverFetcher[Sample[Energy]]:
"""
method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name()

if method_name not in self._battery_pool._active_methods:
calculator = CapacityCalculator(self._battery_pool._batteries)
self._battery_pool._active_methods[method_name] = SendOnUpdate(
if method_name not in self._pool_ref_store._active_methods:
calculator = CapacityCalculator(self._pool_ref_store._batteries)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._battery_pool._working_batteries,
min_update_interval=self._battery_pool._min_update_interval,
working_batteries=self._pool_ref_store._working_batteries,
min_update_interval=self._pool_ref_store._min_update_interval,
)

return self._battery_pool._active_methods[method_name]
return self._pool_ref_store._active_methods[method_name]

@property
def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
Expand All @@ -380,14 +381,14 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
sub = _power_managing.ReportRequest(
source_id=self._source_id,
priority=self._priority,
component_ids=self._battery_pool._batteries,
component_ids=self._pool_ref_store._batteries,
)
self._battery_pool._power_bounds_subs[sub.get_channel_name()] = (
self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = (
asyncio.create_task(
self._battery_pool._power_manager_bounds_subscription_sender.send(sub)
self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub)
)
)
channel = self._battery_pool._channel_registry.get_or_create(
channel = self._pool_ref_store._channel_registry.get_or_create(
_power_managing._Report, sub.get_channel_name()
)
channel.resend_latest = True
Expand Down Expand Up @@ -415,16 +416,16 @@ def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
"""
method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name()

if method_name not in self._battery_pool._active_methods:
calculator = PowerBoundsCalculator(self._battery_pool._batteries)
self._battery_pool._active_methods[method_name] = SendOnUpdate(
if method_name not in self._pool_ref_store._active_methods:
calculator = PowerBoundsCalculator(self._pool_ref_store._batteries)
self._pool_ref_store._active_methods[method_name] = SendOnUpdate(
metric_calculator=calculator,
working_batteries=self._battery_pool._working_batteries,
min_update_interval=self._battery_pool._min_update_interval,
working_batteries=self._pool_ref_store._working_batteries,
min_update_interval=self._pool_ref_store._min_update_interval,
)

return self._battery_pool._active_methods[method_name]
return self._pool_ref_store._active_methods[method_name]

async def stop(self) -> None:
"""Stop all tasks and channels owned by the BatteryPool."""
await self._battery_pool.stop()
await self._pool_ref_store.stop()

0 comments on commit e4c9fe9

Please sign in to comment.