Skip to content

Commit

Permalink
Simplify chained profile implementation.
Browse files Browse the repository at this point in the history
Closes #85. Earlier implementation from 2021/2022 did rely on thread local.
While collision at thread local were unlikely to spot, overall architecture of this component become fairly complex.
In order to simplify and keep it in clean shape internal call flow have been redesigned to rely on local stack.
Since all profiles are known before hand, we can use order of their creation as well as intention (callback method call),
to determine direction in which we should navigate further. Once all callbacks are passed, a framework callback is called.

Signed-off-by: Łukasz Dywicki <luke@code-house.org>
  • Loading branch information
splatch committed May 10, 2024
1 parent ccf1ff7 commit 659084b
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 151 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2021 ConnectorIO Sp. z o.o.
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022-2022 ConnectorIO Sp. z o.o.
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2022-2022 ConnectorIO Sp. z o.o.
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

This file was deleted.

Expand Up @@ -25,8 +25,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.connectorio.addons.profile.ProfileFactoryRegistry;
import org.connectorio.addons.profile.internal.util.NestedMapCreator;
Expand All @@ -48,15 +47,19 @@
class ConnectorioProfile implements StateProfile {

private final Logger logger = LoggerFactory.getLogger(ConnectorioProfile.class);
private final ProfileCallback callback;
private final StackedProfileCallback callback;
private final Executor executor;
private final ProfileContext context;
private final LinkedList<StateProfile> profileChain = new LinkedList<>();
private final LinkedList<StateProfile> callbackChain = new LinkedList<>();

private final ProfileFactoryRegistry registry;
private final ExecutorService executor = Executors.newSingleThreadExecutor();

ConnectorioProfile(ProfileCallback callback, ProfileContext context, ProfileFactoryRegistry registry) {
this.callback = callback;
this(context.getExecutorService(), callback, context, registry);
}

ConnectorioProfile(Executor executor, ProfileCallback callback, ProfileContext context, ProfileFactoryRegistry registry) {
this.executor = executor;
this.context = context;
this.registry = registry;

Expand All @@ -67,7 +70,8 @@ class ConnectorioProfile implements StateProfile {
}

ItemChannelLink link = determnineLink(callback);
StackedProfileCallback chainedCallback = new StackedProfileCallback(link);

this.callback = new StackedProfileCallback(callback, callbackChain);
for (Entry<String, Object> entry : config.entrySet()) {
if ("profile".equals(entry.getKey())) {
continue;
Expand All @@ -79,7 +83,7 @@ class ConnectorioProfile implements StateProfile {
Map<String, Object> profileCfg = (Map<String, Object>) entry.getValue();
String profileType = (String) profileCfg.get("profile");
logger.debug("Creating profile {} for config key {}", profileType, entry.getKey());
Profile createdProfile = getProfileFromFactories(getConfiguredProfileTypeUID(profileType), profileCfg, chainedCallback);
Profile createdProfile = getProfileFromFactories(getConfiguredProfileTypeUID(profileType), profileCfg, new NavigableCallback(link, callbackChain.size(), this.callback));
if (createdProfile == null) {
Optional<String> supported = registry.getAll().stream()
.map(ProfileFactory::getSupportedProfileTypeUIDs)
Expand All @@ -91,7 +95,7 @@ class ConnectorioProfile implements StateProfile {
if (!(createdProfile instanceof StateProfile)) {
throw new IllegalArgumentException("Could not create profile " + profileType + " or it is not state profile");
}
profileChain.add((StateProfile) createdProfile);
callbackChain.add((StateProfile) createdProfile);
}
}

Expand Down Expand Up @@ -155,36 +159,15 @@ public void onStateUpdateFromHandler(State state) {
}

private void handleReading(boolean incoming, Type type, Consumer<StateProfile> head) {
context.getExecutorService().execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Iterator<StateProfile> delegate = incoming ? profileChain.iterator() : profileChain.descendingIterator();
Iterator<StateProfile> iterator = new Iterator<StateProfile>() {
int pos = 0;
@Override
public boolean hasNext() {
return delegate.hasNext();
}

@Override
public StateProfile next() {
pos++;
return delegate.next();
}

public String toString() {
return "Iterator [" + pos + ", " + profileChain + "]";
}
};
ChainedProfileCallback callback = new ChainedProfileCallback(iterator, ConnectorioProfile.this.callback);
logger.trace("Setting chained callback for {} to {}", type, callback);
StackedProfileCallback.set(callback);
Iterator<StateProfile> iterator = incoming ? callbackChain.iterator() : callbackChain.descendingIterator();
logger.trace("Firing chained profiles for {} to {}", type, callback);
head.accept(iterator.next());
} catch (Throwable e) {
logger.warn("Uncaught error found while calling profile chain for {}", type, e);
} finally {
StackedProfileCallback.set(null);
}
}
});
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2024-2024 ConnectorIO Sp. z o.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.connectorio.addons.profile.internal;

import org.openhab.core.thing.link.ItemChannelLink;
import org.openhab.core.thing.profiles.ProfileCallback;
import org.openhab.core.types.Command;
import org.openhab.core.types.State;

/**
* Callback implementation which is aware of its position in chain.
*
* When this callback is asked to dispatch command or sate it passes it to chain with upper (state)
* or lower (command) element index. This construction allows to move state/command information
* across entire chain without too complex logic. Finalization of the call happens n stacked profile
* callback which know chain boundaries.
*/
public class NavigableCallback implements ProfileCallback {

private final ItemChannelLink link;
private final int index;
private final StackedProfileCallback stack;

public NavigableCallback(ItemChannelLink link, int index, StackedProfileCallback stack) {
this.link = link;
this.index = index;
this.stack = stack;
}

@Override
public void handleCommand(Command command) {
stack.handleCommand(index - 1, command);
}

@Override
public void sendCommand(Command command) {
stack.sendCommand(index - 1, command);
}

@Override
public void sendUpdate(State state) {
stack.sendUpdate(index + 1, state);
}

@Override
public String toString() {
return "Chained Callback [" + link + " at index " + index + "]";
}

}
Expand Up @@ -17,54 +17,63 @@
*/
package org.connectorio.addons.profile.internal;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import org.openhab.core.thing.link.ItemChannelLink;
import org.openhab.core.thing.profiles.ProfileCallback;
import org.openhab.core.thing.profiles.StateProfile;
import org.openhab.core.types.Command;
import org.openhab.core.types.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StackedProfileCallback implements ProfileCallback {

private final static ThreadLocal<ProfileCallback> DELEGATE = new ThreadLocal<>();
/**
* Stacked callback is a reference point for created profiles to communicate with framework.
*
* Since profiles can call callback at any time, this instance must be present when profile is being created.
* This leads to situation that we have to bridge it into future.
* More over, because this callback can be called from handler to item and from item to handler
* it has to work in both directions, independently of the creation time.
*/
public class StackedProfileCallback {

private final Logger logger = LoggerFactory.getLogger(StackedProfileCallback.class);
private final ItemChannelLink link;

public StackedProfileCallback(ItemChannelLink link) {
this.link = link;
private final ProfileCallback callback;
private final LinkedList<StateProfile> chain;

public StackedProfileCallback(ProfileCallback callback, LinkedList<StateProfile> chain) {
this.callback = callback;
this.chain = chain;
}

@Override
public void handleCommand(Command command) {
public void handleCommand(int index, Command command) {
if (index == -1) {
callback.handleCommand(command);
return;
}
logger.trace("Passing command {} to profile chain", command);
getDelegate().handleCommand(command);
chain.get(index).onCommandFromItem(command);
}

@Override
public void sendCommand(Command command) {
public void sendCommand(int index, Command command) {
if (index == -1) {
callback.handleCommand(command);
return;
}
logger.trace("Sending command {} toi profile chain", command);
getDelegate().sendCommand(command);
chain.get(index).onCommandFromHandler(command);
}

@Override
public void sendUpdate(State state) {
logger.trace("Sending state {} to profile chain", state);
getDelegate().sendUpdate(state);
}

private ProfileCallback getDelegate() {
ProfileCallback callback = DELEGATE.get();
logger.trace("Callback looked up on thread stack {}", callback);
if (callback != null) {
return callback;
public void sendUpdate(int index, State state) {
if (index >= chain.size()) {
callback.sendUpdate(state);
return;
}

throw new IllegalStateException("No callback found on thread stack");
}

static void set(ProfileCallback callback) {
DELEGATE.set(callback);
logger.trace("Sending state {} to profile chain", state);
chain.get(index).onStateUpdateFromHandler(state);
}

}

0 comments on commit 659084b

Please sign in to comment.