import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.typed.Cluster;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
* the appropriate owners.
*/
@Singleton
-@Component(immediate = true, service = DOMEntityOwnershipService.class)
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+@Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class })
+public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
private static final String DATACENTER_PREFIX = "dc";
+ private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
private final String localCandidate;
private final Scheduler scheduler;
+ private final String datacenter;
private final ActorRef<BootstrapCommand> bootstrap;
private final RunningContext runningContext;
private final ActorRef<CandidateRegistryCommand> candidateRegistry;
private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
private final ActorRef<StateCheckerCommand> ownerStateChecker;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
@VisibleForTesting
AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
final var typedActorSystem = Adapter.toTyped(actorSystem);
-
scheduler = typedActorSystem.scheduler();
- localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+
+ final Cluster cluster = Cluster.get(typedActorSystem);
+ datacenter = cluster.selfMember().dataCenter();
+
+ localCandidate = cluster.selfMember().getRoles().stream()
.filter(role -> !role.contains(DATACENTER_PREFIX))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No valid role found."));
candidateRegistry = runningContext.getCandidateRegistry();
listenerRegistry = runningContext.getListenerRegistry();
ownerStateChecker = runningContext.getOwnerStateChecker();
+ ownerSupervisor = runningContext.getOwnerSupervisor();
}
@Inject
return registeredEntities.contains(forEntity);
}
+ @Override
+ public ListenableFuture<Void> activateDataCenter() {
+ LOG.debug("Activating datacenter: {}", datacenter);
+ final SettableFuture<Void> future = SettableFuture.create();
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
+
+ ask.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ LOG.warn("Activate DataCenter has failed.", failure);
+ future.setException(failure);
+ return;
+ }
+
+ LOG.debug("Activate DataCenter successful.");
+ future.set(null);
+ });
+
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> deactivateDataCenter() {
+ LOG.debug("Deactivating datacenter: {}", datacenter);
+ final SettableFuture<Void> future = SettableFuture.create();
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
+
+ ask.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ LOG.warn("Deactivate DataCenter has failed.", failure);
+ future.setException(failure);
+ return;
+ }
+
+ LOG.debug("Deactivate DataCenter successful.");
+ future.set(null);
+ });
+
+ return future;
+ }
+
void unregisterCandidate(final DOMEntity entity) {
LOG.debug("Unregistering candidate for {}", entity);
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Service used to bring up/down the NativeEos service in individual datacenters.
+ * Active datacenter in native eos terms means that the candidates from this datacenter can become owners of entities.
+ * Additionally the singleton component makings ownership decisions, runs only in an active datacenter.
+ *
+ * <p>
+ * Caller must make sure that only one datacenter is active at a time, otherwise the singleton actors
+ * in each datacenter will interfere with each other. The methods provided byt this service can be called
+ * on any node from the datacenter to be activated. Datacenters only need to brought up when using non-default
+ * datacenter or multiple datacenters.
+ */
+@Beta
+public interface NativeEosService {
+
+ /**
+ * Activates the native eos service in the datacenter that this method is called.
+ */
+ ListenableFuture<Void> activateDataCenter();
+
+ /**
+ * Deactivates the native eos service in the datacenter that this method is called.
+ */
+ ListenableFuture<Void> deactivateDataCenter();
+}
request.getReplyTo().tell(Empty.getInstance());
return Behaviors.stopped();
}
-}
+}
\ No newline at end of file
final String datacenterRole = extractDatacenterRole(cluster.selfMember());
if (datacenterRole.equals(DEFAULT_DATACENTER)) {
LOG.debug("No datacenter configured, activating default data center");
- context.getSelf().tell(ActivateDataCenter.INSTANCE);
+ context.getSelf().tell(new ActivateDataCenter(null));
}
LOG.debug("Idle supervisor started on {}.", cluster.selfMember());
private Behavior<OwnerSupervisorCommand> onActivateDataCenter(final ActivateDataCenter message) {
LOG.debug("Received ActivateDataCenter command switching to syncer behavior,");
- return OwnerSyncer.create();
+ return OwnerSyncer.create(message.getReplyTo());
}
private String extractDatacenterRole(final Member selfMember) {
import java.util.Set;
import java.util.stream.Collectors;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+ command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
return IdleSupervisor.create();
}
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.slf4j.Logger;
private int toSync = -1;
- private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context) {
+ private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context,
+ @Nullable final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
super(context);
LOG.debug("Starting candidate and owner sync");
Duration.ofSeconds(5)).askGet(
askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
InitialCandidateSync::new);
+
+ if (notifyDatacenterStarted != null) {
+ notifyDatacenterStarted.tell(DataCenterActivated.INSTANCE);
+ }
}
- public static Behavior<OwnerSupervisorCommand> create() {
- return Behaviors.setup(OwnerSyncer::new);
+ public static Behavior<OwnerSupervisorCommand> create(
+ final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
+ return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted));
}
@Override
*/
package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+import akka.actor.typed.ActorRef;
import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
public final class ActivateDataCenter extends OwnerSupervisorCommand implements Serializable {
- public static final ActivateDataCenter INSTANCE = new ActivateDataCenter();
-
private static final long serialVersionUID = 1L;
- private ActivateDataCenter() {
- // NOOP
+ private final ActorRef<OwnerSupervisorReply> replyTo;
+
+ public ActivateDataCenter(final @Nullable ActorRef<OwnerSupervisorReply> replyTo) {
+ this.replyTo = replyTo;
+ }
+
+ public ActorRef<OwnerSupervisorReply> getReplyTo() {
+ return replyTo;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class DataCenterActivated extends OwnerSupervisorReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public static final DataCenterActivated INSTANCE = new DataCenterActivated();
+
+ private DataCenterActivated() {
+ // NOOP
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class DataCenterDeactivated extends OwnerSupervisorReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public static final DataCenterDeactivated INSTANCE = new DataCenterDeactivated();
+
+ private DataCenterDeactivated() {
+ // NOOP
+ }
+}
*/
package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+import akka.actor.typed.ActorRef;
import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
public final class DeactivateDataCenter extends OwnerSupervisorCommand implements Serializable {
- public static final DeactivateDataCenter INSTANCE = new DeactivateDataCenter();
private static final long serialVersionUID = 1L;
- private DeactivateDataCenter() {
- // NOOP
+ private final ActorRef<OwnerSupervisorReply> replyTo;
+
+ public DeactivateDataCenter(final @Nullable ActorRef<OwnerSupervisorReply> replyTo) {
+ this.replyTo = replyTo;
+ }
+
+ public ActorRef<OwnerSupervisorReply> getReplyTo() {
+ return replyTo;
}
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+public abstract class OwnerSupervisorReply {
+ OwnerSupervisorReply() {
+ // Hidden on purpose
+ }
+}
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
});
}
- protected static void activateDatacenter(final ClusterNode clusterNode) {
- clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
+ protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(clusterNode.getOwnerSupervisor(),
+ ActivateDataCenter::new,
+ Duration.ofSeconds(20),
+ clusterNode.actorSystem.scheduler());
+ return ask.toCompletableFuture();
}
- protected static void deactivateDatacenter(final ClusterNode clusterNode) {
- clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
+ protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(clusterNode.getOwnerSupervisor(),
+ DeactivateDataCenter::new,
+ Duration.ofSeconds(20),
+ clusterNode.actorSystem.scheduler());
+ return ask.toCompletableFuture();
}
protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
registerCandidates(node3, ENTITY_1, "member-3");
registerCandidates(node4, ENTITY_1, "member-4");
- activateDatacenter(node1);
+ activateDatacenter(node1).get();
waitUntillOwnerPresent(node1, ENTITY_1);
final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
verifyListenerState(listener1, ENTITY_1, false, false, true);
verifyListenerState(listener2, ENTITY_1, false, false, false);
- deactivateDatacenter(node1);
- activateDatacenter(node4);
+ deactivateDatacenter(node1).get();
+ activateDatacenter(node4).get();
verifyListenerState(listener1, ENTITY_1, true, false, false);
verifyListenerState(listener2, ENTITY_1, true, true, false);
verifyListenerState(listener1, ENTITY_1, true, false, false);
verifyListenerState(listener2, ENTITY_1, true, false, true);
- deactivateDatacenter(node3);
- activateDatacenter(node2);
+ deactivateDatacenter(node3).get();
+ activateDatacenter(node2).get();
// no candidate in dc-primary so no owners after datacenter activation
verifyListenerState(listener1, ENTITY_1, false, false, false);
}
@Test
- public void testDataCenterShutdown() {
+ public void testDataCenterShutdown() throws Exception {
registerCandidates(node1, ENTITY_1, "member-1");
registerCandidates(node3, ENTITY_1, "member-3");
registerCandidates(node4, ENTITY_1, "member-4");
- activateDatacenter(node1);
+ activateDatacenter(node1).get();
waitUntillOwnerPresent(node1, ENTITY_1);
final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
- activateDatacenter(node3);
+ activateDatacenter(node3).get();
verifyListenerState(listener2, ENTITY_1, true, true, false);
unregisterCandidates(node3, ENTITY_1, "member-3");
}
}
}
+
+ rpc activate-eos-datacenter {
+ description "Activates the datacenter that the node this rpc is called on belongs to. The caller must maintain
+ only a single active datacenter at a time as the singleton components will interfere with each
+ other otherwise. This only needs to be used if configuring multiple datacenters or if not using
+ default datacenter.";
+ }
+
+ rpc deactivate-eos-datacenter {
+ description "Deactivates the datacenter that the node this rpc is called on belongs to. The caller must maintain
+ only a single active datacenter at a time as the singleton components will interfere with each
+ other otherwise. This only needs to be used if configuring multiple datacenters or if not using
+ default datacenter.";
+ }
}
</dependency>
<!-- OpenDaylight -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>eos-dom-akka</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>mdsal-binding-api</artifactId>
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.eos.akka.NativeEosService;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
private final DistributedDataStoreInterface operDataStore;
private final BindingNormalizedNodeSerializer serializer;
private final Timeout makeLeaderLocalTimeout;
+ private final NativeEosService nativeEosService;
public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore,
- final DistributedDataStoreInterface operDataStore,
- final BindingNormalizedNodeSerializer serializer) {
+ final DistributedDataStoreInterface operDataStore,
+ final BindingNormalizedNodeSerializer serializer,
+ final NativeEosService nativeEosService) {
this.configDataStore = configDataStore;
this.operDataStore = operDataStore;
this.serializer = serializer;
this.makeLeaderLocalTimeout =
new Timeout(configDataStore.getActorUtils().getDatastoreContext()
.getShardLeaderElectionTimeout().duration().$times(2));
+
+ this.nativeEosService = nativeEosService;
}
@Override
MoreExecutors.directExecutor());
}
+ @Override
+ public ListenableFuture<RpcResult<ActivateEosDatacenterOutput>> activateEosDatacenter(
+ final ActivateEosDatacenterInput input) {
+ LOG.debug("Activating EOS Datacenter");
+ final SettableFuture<RpcResult<ActivateEosDatacenterOutput>> future = SettableFuture.create();
+ Futures.addCallback(nativeEosService.activateDataCenter(), new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Successfully activated datacenter.");
+ future.set(RpcResultBuilder.<ActivateEosDatacenterOutput>success().build());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ future.set(ClusterAdminRpcService.<ActivateEosDatacenterOutput>newFailedRpcResultBuilder(
+ "Failed to activate datacenter.", failure).build());
+ }
+ }, MoreExecutors.directExecutor());
+
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<DeactivateEosDatacenterOutput>> deactivateEosDatacenter(
+ final DeactivateEosDatacenterInput input) {
+ LOG.debug("Deactivating EOS Datacenter");
+ final SettableFuture<RpcResult<DeactivateEosDatacenterOutput>> future = SettableFuture.create();
+ Futures.addCallback(nativeEosService.deactivateDataCenter(), new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Successfully deactivated datacenter.");
+ future.set(RpcResultBuilder.<DeactivateEosDatacenterOutput>success().build());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ future.set(ClusterAdminRpcService.<DeactivateEosDatacenterOutput>newFailedRpcResultBuilder(
+ "Failed to deactivate datacenter.", failure).build());
+ }
+ }, MoreExecutors.directExecutor());
+
+ return future;
+ }
+
private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
final Map<ShardResultKey, ShardResult> result = Maps.newHashMapWithExpectedSize(allShardReplies.size());
import com.google.common.annotations.Beta;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.eos.akka.NativeEosService;
import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
BindingNormalizedNodeSerializer serializer = null;
@Reference
RpcProviderService rpcProviderService = null;
+ @Reference
+ NativeEosService nativeEosService = null;
private ObjectRegistration<?> reg;
@Activate
void activate() {
reg = rpcProviderService.registerRpcImplementation(ClusterAdminService.class,
- new ClusterAdminRpcService(configDatastore, operDatastore, serializer));
+ new ClusterAdminRpcService(configDatastore, operDatastore, serializer, nativeEosService));
LOG.info("Cluster Admin services started");
}
String fileName = "target/testBackupDatastore";
new File(fileName).delete();
- ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
+ final ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(),
+ null, null);
RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
.setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
.moduleShardsConfig("module-shards-cars-member-1.conf").build();
- ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
- memberNode.operDataStore(), null);
+ final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+ memberNode.operDataStore(), null, null);
RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
.setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
final String... peerMemberNames) throws Exception {
memberNode.waitForMembersUp(peerMemberNames);
- ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
- memberNode.operDataStore(), null);
+ final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+ memberNode.operDataStore(), null, null);
RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
.setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
final String newLeader) throws Exception {
- ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
- memberNode.operDataStore(), null);
+ final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+ memberNode.operDataStore(), null, null);
final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
.setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
// Invoke RPC service on member-3 to remove it's local shard
- ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore(), null);
+ final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore(), null, null);
RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
.setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
// Invoke RPC service on member-1 to remove member-2
- ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
- leaderNode1.operDataStore(), null);
+ final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore(), null, null);
rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
.setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
// Invoke RPC service on leader member-1 to remove it's local shard
- ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
- leaderNode1.operDataStore(), null);
+ final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore(), null, null);
RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
.setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
newReplicaNode2.kit().getRef());
newReplicaNode2.kit().expectMsgClass(Success.class);
- ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
- newReplicaNode2.operDataStore(), null);
+ final ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
+ newReplicaNode2.operDataStore(), null, null);
RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
- ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore(), null);
+ final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore(), null, null);
RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
// Invoke RPC service on member-3 to change voting status
- ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore(), null);
+ final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore(), null, null);
RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
.changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
// Invoke RPC service on member-3 to change voting status
- ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
- leaderNode.operDataStore(), null);
+ final ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+ leaderNode.operDataStore(), null, null);
RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
.changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
// Invoke RPC service on member-3 to change voting status
- ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore(), null);
+ final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore(), null, null);
RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(List.of(
verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
- ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore(), null);
+ final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore(), null, null);
RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
- ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
- replicaNode1.operDataStore(), null);
+ final ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+ replicaNode1.operDataStore(), null, null);
RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
new SimpleEntry<>("member-6", FALSE));
- ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
- leaderNode1.operDataStore(), null);
+ final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore(), null, null);
RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
type + datastoreTypeSuffix).toString(), info.isVoting()));
}
- String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+ final String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
type + datastoreTypeSuffix).toString();
InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,