Skip to content

Commit

Permalink
consideration for visible cells, make pull based reactivity configura…
Browse files Browse the repository at this point in the history
…ble, add placeholder settings for push based
  • Loading branch information
smacke committed Jan 21, 2024
1 parent deae523 commit 7b065ab
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 24 deletions.
3 changes: 3 additions & 0 deletions core/ipyflow/config.py
Expand Up @@ -94,6 +94,9 @@ class MutableDataflowSettings(JsonSerializableMixin):
exec_schedule: ExecutionSchedule
flow_order: FlowDirection
reactivity_mode: ReactivityMode
push_reactive_updates: bool
push_reactive_updates_to_cousins: bool
pull_reactive_updates: bool
color_scheme: ColorScheme
warn_out_of_order_usages: bool
lint_out_of_order_usages: bool
Expand Down
4 changes: 4 additions & 0 deletions core/ipyflow/data_model/cell.py
Expand Up @@ -164,6 +164,10 @@ def text(self) -> str:
def is_placeholder_id(self) -> bool:
return self._placeholder_id

@property
def is_visible(self) -> bool:
return self._position_by_cell_id.get(self.cell_id, float("inf")) < float("inf")

@property
def position(self) -> int:
pos = self._position_by_cell_id.get(self.cell_id, -1)
Expand Down
12 changes: 11 additions & 1 deletion core/ipyflow/data_model/symbol.py
Expand Up @@ -250,6 +250,13 @@ def shallow_timestamp(self) -> Timestamp:
else:
return max(self._timestamp, self._override_timestamp)

@property
def visible_timestamp(self) -> Optional[Timestamp]:
for ts in sorted(self.updated_timestamps, reverse=True):
if cells().at_timestamp(ts).is_visible:
return ts
return None

@property
def memoize_timestamp(self) -> Optional[Timestamp]:
return self.last_updated_timestamp_by_obj_id.get(self.obj_id)
Expand Down Expand Up @@ -1106,6 +1113,8 @@ def update_usage_info_one_timestamp(
updated_time,
self,
)
if is_static:
is_usage = cells().at_timestamp(updated_time).is_visible
return is_usage

def update_usage_info(
Expand Down Expand Up @@ -1144,7 +1153,8 @@ def update_usage_info(
updated_ts,
is_static=is_static,
)
break
if is_usage or not is_static:
break
if is_usage:
timestamp_by_used_time[used_time] = ts_to_use
if used_node is not None:
Expand Down
47 changes: 43 additions & 4 deletions core/ipyflow/flow.py
Expand Up @@ -121,6 +121,18 @@ def __init__(self, **kwargs) -> None:
getattr(config, "reactivity_mode", ReactivityMode.BATCH),
)
),
push_reactive_updates=kwargs.pop(
"push_reactive_updates",
getattr(config, "push_reactive_updates", True),
),
push_reactive_updates_to_cousins=kwargs.pop(
"push_reactive_updates_to_cousins",
getattr(config, "push_reactive_updates_to_cousins", False),
),
pull_reactive_updates=kwargs.pop(
"pull_reactive_updates",
getattr(config, "pull_reactive_updates", False),
),
color_scheme=ColorScheme(
kwargs.pop(
"color_scheme",
Expand Down Expand Up @@ -316,6 +328,27 @@ def initialize(
kwargs.get("reactivity_mode"),
)
)
push_reactive_updates = getattr(
config,
"push_reactive_updates",
kwargs.get("push_reactive_updates"),
)
push_reactive_updates_to_cousins = getattr(
config,
"push_reactive_updates_to_cousins",
kwargs.get("push_reactive_updates_to_cousins"),
)
pull_reactive_updates = getattr(
config,
"pull_reactive_updates",
kwargs.get("pull_reactive_updates"),
)
if push_reactive_updates is not None:
self.mut_settings.push_reactive_updates = push_reactive_updates
if push_reactive_updates_to_cousins is not None:
self.mut_settings.push_reactive_updates = push_reactive_updates_to_cousins
if pull_reactive_updates is not None:
self.mut_settings.pull_reactive_updates = iface == Interface.JUPYTERLAB
self.mut_settings.color_scheme = ColorScheme(
getattr(
config,
Expand Down Expand Up @@ -500,17 +533,20 @@ def _create_untracked_cells_for_content(content_by_cell_id: Dict[IdType, str]):
continue
cells().create_and_track(cell_id, content, (), bump_cell_counter=False)

def _recompute_ast_for_cells(self, content_by_cell_id: Dict[IdType, str]) -> bool:
should_recompute_exec_schedule = False
def _recompute_ast_for_cells(
self, content_by_cell_id: Dict[IdType, str], force: bool = False
) -> bool:
should_recompute_exec_schedule = force
for cell_id, content in content_by_cell_id.items():
if should_recompute_exec_schedule:
break
cell = cells().from_id_nullable(cell_id)
if cell is None:
continue
is_same_content = cell.current_content == content
is_same_counter = cell.cell_ctr == (cell.last_check_cell_ctr or 0)
if not is_same_content or not is_same_counter:
should_recompute_exec_schedule = True
break
if not should_recompute_exec_schedule:
return False
should_recompute_exec_schedule = False
Expand Down Expand Up @@ -621,6 +657,7 @@ def handle_notify_content_changed(
is_cell_structure_change = (
is_cell_structure_change or self._prev_order_idx_by_id != order_index_by_id
)
prev_order_idx_by_id = self._prev_order_idx_by_id
self._prev_order_idx_by_id = order_index_by_id
content_by_cell_id = {
cell_id: metadata["content"]
Expand All @@ -642,7 +679,9 @@ def handle_notify_content_changed(
)
should_recompute_exec_schedule = (
not is_reactively_executing
and self._recompute_ast_for_cells(content_by_cell_id)
and self._recompute_ast_for_cells(
content_by_cell_id, force=order_index_by_id != prev_order_idx_by_id
)
) or is_cell_structure_change
placeholder_cells = cells().with_placeholder_ids()
if len(placeholder_cells) > 0:
Expand Down
17 changes: 13 additions & 4 deletions core/ipyflow/frontend.py
Expand Up @@ -315,10 +315,19 @@ def _compute_readiness(
}
):
continue
if max(
cell.cell_ctr, flow_.min_timestamp
) < par.cell_ctr and par.cell_ctr in {
sym.shallow_timestamp.cell_num for sym in syms
if (
max(cell.cell_ctr, flow_.min_timestamp) < par.cell_ctr
and (
flow_.mut_settings.pull_reactive_updates
or par.cell_ctr
in {sym.shallow_timestamp.cell_num for sym in syms}
)
) or par.cell_ctr in {
sym.visible_timestamp.cell_num
for sym in syms
if sym.visible_timestamp is not None
and sym.visible_timestamp.cell_num
!= sym.shallow_timestamp.cell_num
}:
should_skip = False
if (
Expand Down
3 changes: 3 additions & 0 deletions core/ipyflow/tracing/ipyflow_tracer.py
Expand Up @@ -1270,6 +1270,9 @@ def before_literal(self, *_, **__):
def after_literal(
self, literal: Union[dict, list, tuple], node_id: NodeId, *_, **__
):
if self.active_literal_scope is None:
# can happen if tracing was disabled since the literal was created
return
try:
self.active_literal_scope.update_obj_ref(literal)
logger.warning("create literal scope %s", self.active_literal_scope)
Expand Down
32 changes: 17 additions & 15 deletions frontend/labextension/src/index.ts
Expand Up @@ -176,13 +176,13 @@ class IpyflowSessionState {
closure: Set<string>,
cellId: string,
edges: { [id: string]: string[] } | undefined | null,
addCellsNeedingRefresh = false,
pullReactiveUpdates = false,
skipFirstCheck = false
): void {
if (!skipFirstCheck && closure.has(cellId)) {
return;
}
if (!addCellsNeedingRefresh) {
if (!pullReactiveUpdates) {
closure.add(cellId);
}
const relatives = edges?.[cellId];
Expand All @@ -195,11 +195,11 @@ class IpyflowSessionState {
closure,
related,
edges,
addCellsNeedingRefresh
pullReactiveUpdates
);
});
if (
addCellsNeedingRefresh &&
pullReactiveUpdates &&
(closure.size > prevClosureSize ||
!this.executedCells.has(cellId) ||
this.readyCells.has(cellId) ||
Expand All @@ -208,7 +208,7 @@ class IpyflowSessionState {
) {
closure.add(cellId);
}
if (addCellsNeedingRefresh && closure.has(cellId)) {
if (pullReactiveUpdates && closure.has(cellId)) {
relatives.forEach((related) => {
if (closure.has(related)) {
return;
Expand All @@ -234,7 +234,7 @@ class IpyflowSessionState {
closure,
related,
edges,
addCellsNeedingRefresh,
pullReactiveUpdates,
true
);
}
Expand All @@ -254,7 +254,7 @@ class IpyflowSessionState {
closure,
parent,
edges,
addCellsNeedingRefresh,
pullReactiveUpdates,
true
);
}
Expand Down Expand Up @@ -1130,14 +1130,16 @@ const connectToComm = (
state.computeTransitiveClosureHelper(slice, cellId, state.cellChildren);
}
const executeSlice = new Set(slice);
for (const cellId of slice) {
state.computeTransitiveClosureHelper(
executeSlice,
cellId,
state.cellParents,
true,
true
);
if (state.settings.pull_reactive_updates ?? false) {
for (const cellId of slice) {
state.computeTransitiveClosureHelper(
executeSlice,
cellId,
state.cellParents,
true,
true
);
}
}
for (const cellId of closureCellIds) {
slice.delete(cellId);
Expand Down

0 comments on commit 7b065ab

Please sign in to comment.