Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic feed availability #160

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.entur.lamassu.model.discovery.System;
import org.entur.lamassu.model.discovery.SystemDiscovery;
import org.entur.lamassu.model.provider.FeedProvider;
import org.entur.lamassu.service.FeedAvailabilityService;
import org.entur.lamassu.service.FeedProviderService;
import org.entur.lamassu.service.SystemDiscoveryService;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -28,17 +29,20 @@ public class GBFSFeedController {
private final GBFSFeedCache feedCache;
private final FeedProviderService feedProviderService;

private final FeedAvailabilityService feedAvailabilityService;

@Value("${org.entur.lamassu.baseUrl}")
private String baseUrl;

@Value("${org.entur.lamassu.internalLoadBalancer}")
private String internalLoadBalancer;

@Autowired
public GBFSFeedController(SystemDiscoveryService systemDiscoveryService, GBFSFeedCache feedCache, FeedProviderService feedProviderService) {
public GBFSFeedController(SystemDiscoveryService systemDiscoveryService, GBFSFeedCache feedCache, FeedProviderService feedProviderService, FeedAvailabilityService feedAvailabilityService) {
this.systemDiscoveryService = systemDiscoveryService;
this.feedCache = feedCache;
this.feedProviderService = feedProviderService;
this.feedAvailabilityService = feedAvailabilityService;
}

@GetMapping("/gbfs")
Expand Down Expand Up @@ -72,8 +76,9 @@ public Object getGbfsFeedForProvider(@PathVariable String systemId, @PathVariabl
try {
var feedName = GBFSFeedName.fromValue(feed);
var feedProvider = feedProviderService.getFeedProviderBySystemId(systemId);
var availableFiles = feedAvailabilityService.getAvailableFiles(systemId);

if (feedProvider == null) {
if (feedProvider == null || availableFiles == null || !availableFiles.contains(feedName) && !feedName.equals(GBFSFeedName.GBFS)) {
throw new NoSuchElementException();
}

Expand Down
23 changes: 21 additions & 2 deletions src/main/java/org/entur/lamassu/leader/FeedUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.entur.gbfs.GbfsDelivery;
import org.entur.gbfs.GbfsSubscriptionManager;
import org.entur.gbfs.GbfsSubscriptionOptions;
import org.entur.gbfs.v2_3.gbfs.GBFSFeed;
import org.entur.gbfs.validation.model.ValidationResult;
import org.entur.lamassu.config.feedprovider.FeedProviderConfig;
import org.entur.lamassu.leader.entityupdater.EntityCachesUpdater;
import org.entur.lamassu.leader.feedcachesupdater.FeedCachesUpdater;
import org.entur.lamassu.mapper.feedmapper.GbfsDeliveryMapper;
import org.entur.lamassu.metrics.MetricsService;
import org.entur.lamassu.model.provider.FeedProvider;
import org.entur.lamassu.service.FeedAvailabilityService;
import org.redisson.api.RBucket;
import org.redisson.api.RMapCache;
import org.slf4j.Logger;
Expand All @@ -41,6 +43,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Component
@Profile("leader")
Expand All @@ -65,6 +68,8 @@ public class FeedUpdater {

private MetricsService metricsService;

private FeedAvailabilityService feedAvailabilityService;

@Autowired
public FeedUpdater(
FeedProviderConfig feedProviderConfig,
Expand All @@ -73,7 +78,8 @@ public FeedUpdater(
EntityCachesUpdater entityCachesUpdater,
RMapCache<String, ValidationResult> validationResultCache,
RBucket<Boolean> cacheReady,
MetricsService metricsService
MetricsService metricsService,
FeedAvailabilityService feedAvailabilityService
) {
this.feedProviderConfig = feedProviderConfig;
this.gbfsDeliveryMapper = gbfsDeliveryMapper;
Expand All @@ -82,6 +88,7 @@ public FeedUpdater(
this.validationResultCache = validationResultCache;
this.cacheReady = cacheReady;
this.metricsService = metricsService;
this.feedAvailabilityService = feedAvailabilityService;
}

public void start() {
Expand Down Expand Up @@ -138,7 +145,19 @@ private void receiveUpdate(FeedProvider feedProvider, GbfsDelivery delivery) {

var mappedDelivery = gbfsDeliveryMapper.mapGbfsDelivery(delivery, feedProvider);
var oldDelivery = feedCachesUpdater.updateFeedCaches(feedProvider, mappedDelivery);
entityCachesUpdater.updateEntityCaches(feedProvider, mappedDelivery, oldDelivery);

try {
entityCachesUpdater.updateEntityCaches(feedProvider, mappedDelivery, oldDelivery);
} catch (ClassCastException e) {
logger.warn("This should not occur in production", e);
}


feedAvailabilityService.setAvailableFiles(
feedProvider.getSystemId(),
mappedDelivery.getDiscovery().getFeedsData().get(feedProvider.getLanguage()).getFeeds().stream().map(GBFSFeed::getName).collect(Collectors.toList())
);

cacheReady.set(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
*
*
* * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by
* * the European Commission - subsequent versions of the EUPL (the "Licence");
* * You may not use this work except in compliance with the Licence.
* * You may obtain a copy of the Licence at:
* *
* * https://joinup.ec.europa.eu/software/page/eupl
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the Licence is distributed on an "AS IS" basis,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the Licence for the specific language governing permissions and
* * limitations under the Licence.
*
*/

package org.entur.lamassu.mapper.feedmapper;

import org.entur.gbfs.GbfsDelivery;
import org.entur.gbfs.v2_3.gbfs.GBFS;

import java.util.stream.Collectors;

public class DiscoveryFeedPostProcessor {
public static void removeUnavailableFiles(GBFS discovery, GbfsDelivery mapped) {
discovery.getFeedsData().keySet().forEach(language -> {
var feeds = discovery.getFeedsData().get(language).getFeeds().stream().filter(feed -> {
switch (feed.getName()) {
case SystemInformation:
return mapped.getSystemInformation() != null;
case VehicleTypes:
return mapped.getVehicleTypes() != null;
case FreeBikeStatus:
return mapped.getFreeBikeStatus() != null;
case StationInformation:
return mapped.getStationInformation() != null;
case StationStatus:
return mapped.getStationStatus() != null;
case SystemPricingPlans:
return mapped.getSystemPricingPlans() != null;
case SystemAlerts:
return mapped.getSystemAlerts() != null;
case SystemHours:
return mapped.getSystemHours() != null;
case SystemCalendar:
return mapped.getSystemCalendar() != null;
case SystemRegions:
return mapped.getSystemRegions() != null;
case GeofencingZones:
return mapped.getGeofencingZones() != null;
default:
return false;
}
}).collect(Collectors.toList());
discovery.getFeedsData().get(language).setFeeds(feeds);
});


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public GbfsDeliveryMapper(
// mapping of the versions file, if it exists, is intentionally skipped.
public GbfsDelivery mapGbfsDelivery(GbfsDelivery delivery, FeedProvider feedProvider) {
var mapped = new GbfsDelivery();
mapped.setDiscovery(discoveryFeedMapper.map(delivery.getDiscovery(), feedProvider));
mapped.setSystemInformation(systemInformationFeedMapper.map(delivery.getSystemInformation(), feedProvider));
mapped.setSystemAlerts(systemAlertsFeedMapper.map(delivery.getSystemAlerts(), feedProvider));
mapped.setSystemCalendar(systemCalendarFeedMapper.map(delivery.getSystemCalendar(), feedProvider));
Expand All @@ -99,6 +98,11 @@ public GbfsDelivery mapGbfsDelivery(GbfsDelivery delivery, FeedProvider feedProv
stationStatus -> VehicleTypeCapacityProducer.addToStations(stationStatus, mapped.getVehicleTypes()))
);
mapped.setFreeBikeStatus(freeBikeStatusFeedMapper.map(delivery.getFreeBikeStatus(), feedProvider));
mapped.setDiscovery(
discoveryFeedMapper.map(
delivery.getDiscovery(),
feedProvider,
discovery -> DiscoveryFeedPostProcessor.removeUnavailableFiles(discovery, mapped)));
return mapped;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
*
* * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by
* * the European Commission - subsequent versions of the EUPL (the "Licence");
* * You may not use this work except in compliance with the Licence.
* * You may obtain a copy of the Licence at:
* *
* * https://joinup.ec.europa.eu/software/page/eupl
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the Licence is distributed on an "AS IS" basis,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the Licence for the specific language governing permissions and
* * limitations under the Licence.
*
*/

package org.entur.lamassu.service;

import org.entur.gbfs.v2_3.gbfs.GBFSFeedName;

import java.util.List;
import java.util.Map;

public interface FeedAvailabilityService {
Map<String, List<GBFSFeedName>> getAvailableFeeds();
List<GBFSFeedName> getAvailableFiles(String systemId);
void setAvailableFiles(String systemId, List<GBFSFeedName> files);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
*
*
* * Licensed under the EUPL, Version 1.2 or – as soon they will be approved by
* * the European Commission - subsequent versions of the EUPL (the "Licence");
* * You may not use this work except in compliance with the Licence.
* * You may obtain a copy of the Licence at:
* *
* * https://joinup.ec.europa.eu/software/page/eupl
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the Licence is distributed on an "AS IS" basis,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the Licence for the specific language governing permissions and
* * limitations under the Licence.
*
*/

package org.entur.lamassu.service.impl;

import org.entur.gbfs.v2_3.gbfs.GBFSFeedName;
import org.entur.lamassu.service.FeedAvailabilityService;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This mock service is only to demonstrate functionality. A service based on redis should be implemented
* before merging.
*/
@Component
public class MockFeedAvailabilityService implements FeedAvailabilityService {
private final ConcurrentHashMap<String, List<GBFSFeedName>> availableFilesPerSystem = new ConcurrentHashMap<>();

@Override
public Map<String, List<GBFSFeedName>> getAvailableFeeds() {
return availableFilesPerSystem;
}

@Override
public List<GBFSFeedName> getAvailableFiles(String systemId) {
return availableFilesPerSystem.get(systemId);
}

@Override
public void setAvailableFiles(String systemId, List<GBFSFeedName> files) {
var minimumRequired = files.containsAll(List.of(GBFSFeedName.SystemInformation, GBFSFeedName.VehicleTypes, GBFSFeedName.SystemPricingPlans));
var freeFloating = files.contains(GBFSFeedName.FreeBikeStatus);
var stationBased = files.containsAll(List.of(GBFSFeedName.StationInformation, GBFSFeedName.StationStatus));

if (minimumRequired && (freeFloating || stationBased)) {
availableFilesPerSystem.put(systemId, files);
} else {
availableFilesPerSystem.remove(systemId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.entur.lamassu.mapper.entitymapper.SystemDiscoveryMapper;
import org.entur.lamassu.model.discovery.SystemDiscovery;
import org.entur.lamassu.service.FeedAvailabilityService;
import org.entur.lamassu.service.FeedProviderService;
import org.entur.lamassu.service.SystemDiscoveryService;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -11,19 +12,27 @@

@Component
public class SystemDiscoveryServiceImpl implements SystemDiscoveryService {
private final SystemDiscovery systemDiscovery;

private final FeedProviderService feedProviderService;
private final FeedAvailabilityService feedAvailabilityService;
private final SystemDiscoveryMapper systemDiscoveryMapper;

@Autowired
public SystemDiscoveryServiceImpl(FeedProviderService feedProviderService, SystemDiscoveryMapper systemDiscoveryMapper) {
systemDiscovery = new SystemDiscovery();
systemDiscovery.setSystems(
feedProviderService.getFeedProviders().stream()
.map(systemDiscoveryMapper::mapSystemDiscovery).collect(Collectors.toList())
);
public SystemDiscoveryServiceImpl(FeedProviderService feedProviderService, FeedAvailabilityService feedAvailabilityService, SystemDiscoveryMapper systemDiscoveryMapper) {
this.feedProviderService = feedProviderService;
this.feedAvailabilityService = feedAvailabilityService;
this.systemDiscoveryMapper = systemDiscoveryMapper;
}

@Override
public SystemDiscovery getSystemDiscovery() {
var systemDiscovery = new SystemDiscovery();
systemDiscovery.setSystems(
feedAvailabilityService.getAvailableFeeds().keySet().stream()
.map(feedProviderService::getFeedProviderBySystemId)
.map(systemDiscoveryMapper::mapSystemDiscovery)
.collect(Collectors.toList())
);
return systemDiscovery;
}
}