Skip to content

Commit

Permalink
Extract common parts of polling/sampling thread management into sourc…
Browse files Browse the repository at this point in the history
…e api.

The source api defines two kinds of data sources - one is based on sampling, other on events.
For sampling it involves management of threads which can work at different rate.

The PLC4X part got also its own source concept which works with Plc4xSampler and generic purpose subscriber.

Signed-off-by: Łukasz Dywicki <luke@code-house.org>
  • Loading branch information
splatch committed Mar 7, 2024
1 parent 7ac27a1 commit 44d590b
Show file tree
Hide file tree
Showing 80 changed files with 1,868 additions and 1,037 deletions.
Expand Up @@ -36,13 +36,16 @@
import org.connectorio.addons.binding.amsads.internal.config.AmsConfiguration;
import org.connectorio.addons.binding.amsads.internal.handler.channel.AdsChannelHandler;
import org.connectorio.addons.binding.amsads.internal.handler.channel.ChannelHandlerFactory;
import org.connectorio.addons.binding.amsads.internal.handler.polling.FetchContainer;
import org.connectorio.addons.binding.amsads.internal.handler.polling.PollFetchContainer;
import org.connectorio.addons.binding.amsads.internal.handler.polling.SubscribeFetchContainer;
import org.connectorio.addons.binding.amsads.internal.symbol.SymbolEntry;
import org.connectorio.addons.binding.amsads.internal.symbol.SymbolReader;
import org.connectorio.addons.binding.amsads.internal.symbol.SymbolReaderFactory;
import org.connectorio.addons.binding.handler.GenericThingHandlerBase;
import org.connectorio.addons.binding.plc4x.sampler.DefaultPlc4xSampler;
import org.connectorio.addons.binding.plc4x.sampler.DefaultPlc4xSamplerComposer;
import org.connectorio.addons.binding.plc4x.source.Plc4xSampler;
import org.connectorio.addons.binding.plc4x.source.SourceFactory;
import org.connectorio.addons.binding.plc4x.source.SubscriberSource;
import org.connectorio.addons.binding.source.sampling.SamplingSource;
import org.openhab.core.thing.Channel;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
Expand All @@ -62,17 +65,20 @@ public abstract class AbstractAmsAdsThingHandler<B extends AmsBridgeHandler, C e
private final Logger logger = LoggerFactory.getLogger(AbstractAmsAdsThingHandler.class);
private final SymbolReaderFactory symbolReaderFactory;
private final ChannelHandlerFactory channelHandlerFactory;
private final SourceFactory sourceFactory;

private final Map<String, Entry<AdsTag, AdsChannelHandler>> handlerMap = new ConcurrentHashMap<>();
private final CompletableFuture<PlcConnection> initializer = new CompletableFuture<>();;

private FetchContainer subscriber;
private FetchContainer poller;
private SubscriberSource<AdsTag> subscriber;
private SamplingSource<Plc4xSampler<AdsTag>> poller;

public AbstractAmsAdsThingHandler(Thing thing, SymbolReaderFactory symbolReaderFactory, ChannelHandlerFactory channelHandlerFactory) {
public AbstractAmsAdsThingHandler(Thing thing, SymbolReaderFactory symbolReaderFactory,
ChannelHandlerFactory channelHandlerFactory, SourceFactory sourceFactory) {
super(thing);
this.symbolReaderFactory = symbolReaderFactory;
this.channelHandlerFactory = channelHandlerFactory;
this.sourceFactory = sourceFactory;
}

@Override
Expand Down Expand Up @@ -160,8 +166,8 @@ public void initialize() {
}

List<Channel> channels = getThing().getChannels();
poller = new PollFetchContainer(scheduler, connection);
subscriber = new SubscribeFetchContainer(connection);
poller = sourceFactory.sampling(scheduler, new DefaultPlc4xSamplerComposer<>(connection));
subscriber = sourceFactory.subscriber(connection);
for (Channel channel : channels) {
AdsChannelHandler handler = channelHandlerFactory.map(thing, getCallback(), channel);
if (handler != null) {
Expand All @@ -172,9 +178,9 @@ public void initialize() {
continue;
}
if (handler.getRefreshInterval() != null) {
poller.add(handler.getRefreshInterval(), channelId, tag, handler::onChange);
poller.add(handler.getRefreshInterval(), channelId, new DefaultPlc4xSampler<>(connection, channelId, tag, handler::onChange));
} else {
subscriber.add(null, channelId, tag, handler::onChange);
subscriber.add(channelId, tag, handler::onChange);
}
// register handler so we can dispatch commands
handlerMap.put(channelId, new SimpleEntry<>(tag, handler));
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.connectorio.addons.binding.amsads.internal.symbol.DefaultSymbolReaderFactory;
import org.connectorio.addons.binding.amsads.internal.symbol.SymbolReaderFactory;
import org.connectorio.addons.binding.plc4x.Plc4xHandlerFactory;
import org.connectorio.addons.binding.plc4x.source.SourceFactory;
import org.connectorio.plc4x.extras.osgi.PlcDriverManager;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.Thing;
Expand All @@ -48,6 +49,7 @@
public class AmsAdsHandlerFactory extends Plc4xHandlerFactory {

private final PlcDriverManager driverManager;
private final SourceFactory sourceFactory;

private final SymbolReaderFactory symbolReaderFactory = new DefaultSymbolReaderFactory();

Expand All @@ -56,9 +58,10 @@ public class AmsAdsHandlerFactory extends Plc4xHandlerFactory {
private AmsAdsDiscoveryDriver discoveryDriver;

@Activate
public AmsAdsHandlerFactory(@Reference PlcDriverManager driverManager) {
public AmsAdsHandlerFactory(@Reference PlcDriverManager driverManager, @Reference(target = "(plc4x=true)") SourceFactory sourceFactory) {
super(THING_TYPE_AMS, THING_TYPE_NETWORK, THING_TYPE_SERIAL);
this.driverManager = driverManager;
this.sourceFactory = sourceFactory;
}

@Override
Expand All @@ -68,9 +71,10 @@ protected ThingHandler createHandler(Thing thing) {
if (THING_TYPE_AMS.equals(thingTypeUID)) {
return new AmsBridgeHandler((Bridge) thing, discoveryDriver);
} else if (THING_TYPE_NETWORK.equals(thingTypeUID)) {
return new AmsAdsNetworkBridgeHandler(thing, symbolReaderFactory, channelHandlerFactory, driverManager, discoveryDriver);
return new AmsAdsNetworkBridgeHandler(thing, symbolReaderFactory, channelHandlerFactory, driverManager, sourceFactory, discoveryDriver);
} else if (THING_TYPE_SERIAL.equals(thingTypeUID)) {
return new AmsAdsSerialBridgeHandler(thing, symbolReaderFactory, channelHandlerFactory, driverManager);
return new AmsAdsSerialBridgeHandler(thing, symbolReaderFactory, channelHandlerFactory,
driverManager, sourceFactory);
}

return null;
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.transport.serial.SerialTransport;
import org.connectorio.addons.binding.amsads.internal.AmsConverter;
import org.connectorio.addons.binding.amsads.internal.config.AmsConfiguration;
import org.connectorio.addons.binding.amsads.internal.config.NetworkConfiguration;
Expand All @@ -45,12 +44,11 @@
import org.connectorio.addons.binding.amsads.internal.discovery.DiscoverySender.Envelope;
import org.connectorio.addons.binding.amsads.internal.handler.channel.ChannelHandlerFactory;
import org.connectorio.addons.binding.amsads.internal.symbol.SymbolReaderFactory;
import org.connectorio.addons.binding.plc4x.source.SourceFactory;
import org.connectorio.plc4x.extras.osgi.PlcDriverManager;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
import org.openhab.core.types.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,8 +67,8 @@ public class AmsAdsNetworkBridgeHandler extends AbstractAmsAdsThingHandler<AmsBr
private final AmsAdsDiscoveryDriver discoveryDriver;

public AmsAdsNetworkBridgeHandler(Thing thing, SymbolReaderFactory symbolReaderFactory, ChannelHandlerFactory channelHandlerFactory,
PlcDriverManager driverManager, AmsAdsDiscoveryDriver discoveryDriver) {
super(thing, symbolReaderFactory, channelHandlerFactory);
PlcDriverManager driverManager, SourceFactory sourceFactory, AmsAdsDiscoveryDriver discoveryDriver) {
super(thing, symbolReaderFactory, channelHandlerFactory, sourceFactory);
this.driverManager = driverManager;
this.discoveryDriver = discoveryDriver;
}
Expand Down
Expand Up @@ -25,10 +25,9 @@
import org.connectorio.addons.binding.amsads.internal.config.SerialConfiguration;
import org.connectorio.addons.binding.amsads.internal.handler.channel.ChannelHandlerFactory;
import org.connectorio.addons.binding.amsads.internal.symbol.SymbolReaderFactory;
import org.connectorio.addons.binding.plc4x.source.SourceFactory;
import org.connectorio.plc4x.extras.osgi.PlcDriverManager;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.types.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,8 +43,8 @@ public class AmsAdsSerialBridgeHandler extends AbstractAmsAdsThingHandler<AmsBri
private final PlcDriverManager driverManager;

public AmsAdsSerialBridgeHandler(Thing thing, SymbolReaderFactory symbolReaderFactory, ChannelHandlerFactory channelHandlerFactory,
PlcDriverManager driverManager) {
super(thing, symbolReaderFactory, channelHandlerFactory);
PlcDriverManager driverManager, SourceFactory sourceFactory) {
super(thing, symbolReaderFactory, channelHandlerFactory, sourceFactory);
this.driverManager = driverManager;
}

Expand Down

This file was deleted.

Expand Up @@ -39,6 +39,7 @@
import org.connectorio.addons.binding.amsads.internal.config.AmsConfiguration;
import org.connectorio.addons.binding.amsads.internal.config.NetworkConfiguration;
import org.connectorio.addons.binding.amsads.internal.discovery.AmsAdsDiscoveryDriver;
import org.connectorio.addons.binding.plc4x.source.SourceFactory;
import org.connectorio.addons.binding.test.ThingMock;
import org.connectorio.plc4x.extras.osgi.core.internal.OsgiDriverManager;
import org.connectorio.addons.binding.test.BridgeMock;
Expand All @@ -61,6 +62,9 @@ class AmsAdsNetworkBridgeHandlerTest {
@Mock
ChannelHandlerFactory channelHandlerFactory;

@Mock
SourceFactory sourceFactory;

@Mock
AmsAdsDiscoveryDriver discoveryDriver;

Expand All @@ -70,7 +74,7 @@ void testHandlerInitializationWithNoConfig() {
.create();

AmsAdsNetworkBridgeHandler handler = new AmsAdsNetworkBridgeHandler(bridge, symbolReaderFactory,
channelHandlerFactory, new OsgiDriverManager(), discoveryDriver);
channelHandlerFactory, new OsgiDriverManager(), sourceFactory, discoveryDriver);
handler.initialize();

CompletableFuture<PlcConnection> initializer = handler.getPlcConnection();
Expand Down Expand Up @@ -101,7 +105,7 @@ void testHandlerInitializationWithConfig() {
Thing thing = thingMock.create();

AmsAdsNetworkBridgeHandler handler = new AmsAdsNetworkBridgeHandler(thing, symbolReaderFactory,
channelHandlerFactory, new OsgiDriverManager(), discoveryDriver);
channelHandlerFactory, new OsgiDriverManager(), sourceFactory, discoveryDriver);
handler.setCallback(thingMock.getCallback());
handler.initialize();

Expand Down

0 comments on commit 44d590b

Please sign in to comment.