Skip to content

Commit

Permalink
feat(discovery): refactor to use common discovery abstraction from ko…
Browse files Browse the repository at this point in the history
…rk-core (#734)
  • Loading branch information
cfieber committed Jul 10, 2020
1 parent 4a617eb commit b60d8dc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.netflix.spinnaker.fiat.roles;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
Expand All @@ -32,26 +30,23 @@
import com.netflix.spinnaker.fiat.permissions.PermissionsResolver;
import com.netflix.spinnaker.fiat.providers.ProviderException;
import com.netflix.spinnaker.fiat.providers.ResourceProvider;
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent;
import com.netflix.spinnaker.kork.discovery.DiscoveryStatusListener;
import com.netflix.spinnaker.kork.lock.LockManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.BackOffExecution;
Expand All @@ -60,8 +55,8 @@
@Slf4j
@Component
@ConditionalOnExpression("${fiat.write-mode.enabled:true}")
public class UserRolesSyncer implements ApplicationListener<RemoteStatusChangedEvent> {
private final Optional<DiscoveryClient> discoveryClient;
public class UserRolesSyncer {
private final DiscoveryStatusListener discoveryStatusListener;

private final LockManager lockManager;
private final PermissionsRepository permissionsRepository;
Expand All @@ -74,14 +69,12 @@ public class UserRolesSyncer implements ApplicationListener<RemoteStatusChangedE
private final long syncFailureDelayMs;
private final long syncDelayTimeoutMs;

private final AtomicBoolean isEnabled;

private final Registry registry;
private final Gauge userRolesSyncCount;

@Autowired
public UserRolesSyncer(
Optional<DiscoveryClient> discoveryClient,
DiscoveryStatusListener discoveryStatusListener,
Registry registry,
LockManager lockManager,
PermissionsRepository permissionsRepository,
Expand All @@ -92,7 +85,7 @@ public UserRolesSyncer(
@Value("${fiat.write-mode.sync-delay-ms:600000}") long syncDelayMs,
@Value("${fiat.write-mode.sync-failure-delay-ms:600000}") long syncFailureDelayMs,
@Value("${fiat.write-mode.sync-delay-timeout-ms:30000}") long syncDelayTimeoutMs) {
this.discoveryClient = discoveryClient;
this.discoveryStatusListener = discoveryStatusListener;

this.lockManager = lockManager;
this.permissionsRepository = permissionsRepository;
Expand All @@ -105,27 +98,17 @@ public UserRolesSyncer(
this.syncFailureDelayMs = syncFailureDelayMs;
this.syncDelayTimeoutMs = syncDelayTimeoutMs;

this.isEnabled =
new AtomicBoolean(
// default to enabled iff discovery is not available
!discoveryClient.isPresent());

this.registry = registry;
this.userRolesSyncCount = registry.gauge(metricName("syncCount"));
}

@Override
public void onApplicationEvent(RemoteStatusChangedEvent event) {
isEnabled.set(isInService());
}

@Scheduled(fixedDelay = 30000L)
public void schedule() {
if (syncDelayMs < 0 || !isEnabled.get()) {
if (syncDelayMs < 0 || !discoveryStatusListener.isEnabled()) {
log.warn(
"User roles syncing is disabled (syncDelayMs: {}, isEnabled: {})",
syncDelayMs,
isEnabled.get());
discoveryStatusListener.isEnabled());
return;
}

Expand Down Expand Up @@ -315,20 +298,4 @@ private <T> T timeIt(String timerName, Callable<T> theThing) {
.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}

private boolean isInService() {
InstanceInfo.InstanceStatus remoteStatus = null;
if (discoveryClient.isPresent()) {
remoteStatus = discoveryClient.get().getInstanceRemoteStatus();
}

boolean isInService = (remoteStatus == null || remoteStatus == InstanceInfo.InstanceStatus.UP);

log.info(
"User roles syncing is {} (discoveryStatus: {})",
isInService ? "active" : "disabled",
remoteStatus);

return isInService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package com.netflix.spinnaker.fiat.roles

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.appinfo.InstanceInfo.InstanceStatus
import com.netflix.discovery.DiscoveryClient
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.fiat.config.ResourceProvidersHealthIndicator
Expand All @@ -33,6 +31,7 @@ import com.netflix.spinnaker.fiat.model.resources.ServiceAccount
import com.netflix.spinnaker.fiat.permissions.PermissionsResolver
import com.netflix.spinnaker.fiat.permissions.RedisPermissionsRepository
import com.netflix.spinnaker.fiat.providers.ResourceProvider
import com.netflix.spinnaker.kork.discovery.DiscoveryStatusListener
import com.netflix.spinnaker.kork.jedis.EmbeddedRedis
import com.netflix.spinnaker.kork.jedis.JedisClientDelegate
import com.netflix.spinnaker.kork.lock.LockManager
Expand Down Expand Up @@ -140,7 +139,7 @@ class UserRolesSyncerSpec extends Specification {

@Subject
def syncer = new UserRolesSyncer(
Optional.ofNullable(null),
new DiscoveryStatusListener(true),
registry,
lockManager,
repo,
Expand Down Expand Up @@ -213,7 +212,7 @@ class UserRolesSyncerSpec extends Specification {
given:
def lockManager = Mock(LockManager)
def userRolesSyncer = new UserRolesSyncer(
Optional.ofNullable(discoveryClient),
new DiscoveryStatusListener(discoveryStatusEnabled),
registry,
lockManager,
null,
Expand All @@ -227,25 +226,15 @@ class UserRolesSyncerSpec extends Specification {
)

when:
userRolesSyncer.onApplicationEvent(null)
userRolesSyncer.schedule()

then:
(shouldAcquireLock ? 1 : 0) * lockManager.acquireLock(_, _)

where:
discoveryClient || shouldAcquireLock
null || true
discoveryClient(InstanceStatus.UP) || true
discoveryClient(InstanceStatus.OUT_OF_SERVICE) || false
discoveryClient(InstanceStatus.DOWN) || false
discoveryClient(InstanceStatus.STARTING) || false
}

DiscoveryClient discoveryClient(InstanceStatus instanceStatus) {
return Mock(DiscoveryClient) {
1 * getInstanceRemoteStatus() >> { return instanceStatus }
}
discoveryStatusEnabled || shouldAcquireLock
true || true
false || false
}

class AlwaysUpHealthIndicator extends ResourceProvidersHealthIndicator {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
includeProviders=file,github,google-groups,ldap
korkVersion=7.50.0
korkVersion=7.51.2
org.gradle.parallel=true
spinnakerGradleVersion=8.3.0
# To enable a composite reference to a project, set the
Expand Down

0 comments on commit b60d8dc

Please sign in to comment.