From 684673104adccd504e394e01a852a06d42047af0 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Mon, 3 May 2021 13:26:08 +0200 Subject: [PATCH] Add cluster-admin api for datacenter activation Add rpcs into cluster-admin so we can activate/deactivate datacenters for active/backup scenariop. JIRA: CONTROLLER-1982 Change-Id: Ic68652199d79251fe9b166e47ca06520121213e4 Signed-off-by: Tomas Cere --- .../eos/akka/AkkaEntityOwnershipService.java | 63 +++++++++++++++++-- .../controller/eos/akka/NativeEosService.java | 36 +++++++++++ .../eos/akka/bootstrap/EOSMain.java | 2 +- .../akka/owner/supervisor/IdleSupervisor.java | 4 +- .../owner/supervisor/OwnerSupervisor.java | 2 + .../akka/owner/supervisor/OwnerSyncer.java | 15 ++++- .../command/ActivateDataCenter.java | 14 +++-- .../command/DataCenterActivated.java | 19 ++++++ .../command/DataCenterDeactivated.java | 19 ++++++ .../command/DeactivateDataCenter.java | 13 +++- .../command/OwnerSupervisorReply.java | 14 +++++ .../eos/akka/AbstractNativeEosTest.java | 20 ++++-- .../controller/eos/akka/DataCentersTest.java | 16 ++--- .../src/main/yang/cluster-admin.yang | 14 +++++ .../md-sal/sal-cluster-admin-impl/pom.xml | 4 ++ .../admin/ClusterAdminRpcService.java | 57 ++++++++++++++++- .../datastore/admin/OSGiClusterAdmin.java | 5 +- .../admin/ClusterAdminRpcServiceTest.java | 61 +++++++++--------- 18 files changed, 316 insertions(+), 62 deletions(-) create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java index 27fbad4474..28ada56230 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java @@ -15,6 +15,8 @@ import akka.actor.typed.javadsl.AskPattern; import akka.actor.typed.javadsl.Behaviors; import akka.cluster.typed.Cluster; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.time.Duration; import java.util.Optional; import java.util.Set; @@ -33,6 +35,10 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate; import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState; import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply; import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; @@ -59,27 +65,33 @@ import org.slf4j.LoggerFactory; * the appropriate owners. */ @Singleton -@Component(immediate = true, service = DOMEntityOwnershipService.class) -public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable { +@Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class }) +public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class); private static final String DATACENTER_PREFIX = "dc"; + private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20); private final Set registeredEntities = ConcurrentHashMap.newKeySet(); private final String localCandidate; private final Scheduler scheduler; + private final String datacenter; private final ActorRef bootstrap; private final RunningContext runningContext; private final ActorRef candidateRegistry; private final ActorRef listenerRegistry; private final ActorRef ownerStateChecker; + private final ActorRef ownerSupervisor; @VisibleForTesting AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException { final var typedActorSystem = Adapter.toTyped(actorSystem); - scheduler = typedActorSystem.scheduler(); - localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream() + + final Cluster cluster = Cluster.get(typedActorSystem); + datacenter = cluster.selfMember().dataCenter(); + + localCandidate = cluster.selfMember().getRoles().stream() .filter(role -> !role.contains(DATACENTER_PREFIX)) .findFirst() .orElseThrow(() -> new IllegalArgumentException("No valid role found.")); @@ -93,6 +105,7 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi candidateRegistry = runningContext.getCandidateRegistry(); listenerRegistry = runningContext.getListenerRegistry(); ownerStateChecker = runningContext.getOwnerStateChecker(); + ownerSupervisor = runningContext.getOwnerSupervisor(); } @Inject @@ -156,6 +169,48 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi return registeredEntities.contains(forEntity); } + @Override + public ListenableFuture activateDataCenter() { + LOG.debug("Activating datacenter: {}", datacenter); + final SettableFuture future = SettableFuture.create(); + final CompletionStage ask = + AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler); + + ask.whenComplete((reply, failure) -> { + if (failure != null) { + LOG.warn("Activate DataCenter has failed.", failure); + future.setException(failure); + return; + } + + LOG.debug("Activate DataCenter successful."); + future.set(null); + }); + + return future; + } + + @Override + public ListenableFuture deactivateDataCenter() { + LOG.debug("Deactivating datacenter: {}", datacenter); + final SettableFuture future = SettableFuture.create(); + final CompletionStage ask = + AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler); + + ask.whenComplete((reply, failure) -> { + if (failure != null) { + LOG.warn("Deactivate DataCenter has failed.", failure); + future.setException(failure); + return; + } + + LOG.debug("Deactivate DataCenter successful."); + future.set(null); + }); + + return future; + } + void unregisterCandidate(final DOMEntity entity) { LOG.debug("Unregistering candidate for {}", entity); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java new file mode 100644 index 0000000000..6445eba8cd --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import com.google.common.annotations.Beta; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Service used to bring up/down the NativeEos service in individual datacenters. + * Active datacenter in native eos terms means that the candidates from this datacenter can become owners of entities. + * Additionally the singleton component makings ownership decisions, runs only in an active datacenter. + * + *

+ * Caller must make sure that only one datacenter is active at a time, otherwise the singleton actors + * in each datacenter will interfere with each other. The methods provided byt this service can be called + * on any node from the datacenter to be activated. Datacenters only need to brought up when using non-default + * datacenter or multiple datacenters. + */ +@Beta +public interface NativeEosService { + + /** + * Activates the native eos service in the datacenter that this method is called. + */ + ListenableFuture activateDataCenter(); + + /** + * Deactivates the native eos service in the datacenter that this method is called. + */ + ListenableFuture deactivateDataCenter(); +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java index 5199d22ed5..9e646fa5c7 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java @@ -72,4 +72,4 @@ public final class EOSMain extends AbstractBehavior { request.getReplyTo().tell(Empty.getInstance()); return Behaviors.stopped(); } -} +} \ No newline at end of file diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java index 9cbea5c836..1abf6d6f12 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java @@ -37,7 +37,7 @@ public final class IdleSupervisor extends AbstractBehavior onActivateDataCenter(final ActivateDataCenter message) { LOG.debug("Received ActivateDataCenter command switching to syncer behavior,"); - return OwnerSyncer.create(); + return OwnerSyncer.create(message.getReplyTo()); } private String extractDatacenterRole(final Member selfMember) { diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java index c012afe6a1..91b06342b9 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; @@ -154,6 +155,7 @@ public final class OwnerSupervisor extends AbstractBehavior onDeactivateDatacenter(final DeactivateDataCenter command) { LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember()); + command.getReplyTo().tell(DataCenterDeactivated.INSTANCE); return IdleSupervisor.create(); } diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java index 1a8df09f1d..a73a5620b3 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java @@ -25,9 +25,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated; import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync; import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.slf4j.Logger; @@ -49,7 +52,8 @@ public final class OwnerSyncer extends AbstractBehavior private int toSync = -1; - private OwnerSyncer(final ActorContext context) { + private OwnerSyncer(final ActorContext context, + @Nullable final ActorRef notifyDatacenterStarted) { super(context); LOG.debug("Starting candidate and owner sync"); @@ -61,10 +65,15 @@ public final class OwnerSyncer extends AbstractBehavior Duration.ofSeconds(5)).askGet( askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo), InitialCandidateSync::new); + + if (notifyDatacenterStarted != null) { + notifyDatacenterStarted.tell(DataCenterActivated.INSTANCE); + } } - public static Behavior create() { - return Behaviors.setup(OwnerSyncer::new); + public static Behavior create( + final ActorRef notifyDatacenterStarted) { + return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted)); } @Override diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java index 96388d3a43..29e8502238 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java @@ -7,14 +7,20 @@ */ package org.opendaylight.controller.eos.akka.owner.supervisor.command; +import akka.actor.typed.ActorRef; import java.io.Serializable; +import org.eclipse.jdt.annotation.Nullable; public final class ActivateDataCenter extends OwnerSupervisorCommand implements Serializable { - public static final ActivateDataCenter INSTANCE = new ActivateDataCenter(); - private static final long serialVersionUID = 1L; - private ActivateDataCenter() { - // NOOP + private final ActorRef replyTo; + + public ActivateDataCenter(final @Nullable ActorRef replyTo) { + this.replyTo = replyTo; + } + + public ActorRef getReplyTo() { + return replyTo; } } diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java new file mode 100644 index 0000000000..a82bf22fe2 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import java.io.Serializable; + +public final class DataCenterActivated extends OwnerSupervisorReply implements Serializable { + private static final long serialVersionUID = 1L; + public static final DataCenterActivated INSTANCE = new DataCenterActivated(); + + private DataCenterActivated() { + // NOOP + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java new file mode 100644 index 0000000000..4879fc7a08 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import java.io.Serializable; + +public final class DataCenterDeactivated extends OwnerSupervisorReply implements Serializable { + private static final long serialVersionUID = 1L; + public static final DataCenterDeactivated INSTANCE = new DataCenterDeactivated(); + + private DataCenterDeactivated() { + // NOOP + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java index eb728ebbce..039bfc51fb 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java @@ -7,13 +7,20 @@ */ package org.opendaylight.controller.eos.akka.owner.supervisor.command; +import akka.actor.typed.ActorRef; import java.io.Serializable; +import org.eclipse.jdt.annotation.Nullable; public final class DeactivateDataCenter extends OwnerSupervisorCommand implements Serializable { - public static final DeactivateDataCenter INSTANCE = new DeactivateDataCenter(); private static final long serialVersionUID = 1L; - private DeactivateDataCenter() { - // NOOP + private final ActorRef replyTo; + + public DeactivateDataCenter(final @Nullable ActorRef replyTo) { + this.replyTo = replyTo; + } + + public ActorRef getReplyTo() { + return replyTo; } } diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java new file mode 100644 index 0000000000..c34d441b3a --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +public abstract class OwnerSupervisorReply { + OwnerSupervisorReply() { + // Hidden on purpose + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java index 201d6c659b..ee1782ad55 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -42,6 +43,7 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateD import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; @@ -234,12 +236,22 @@ public abstract class AbstractNativeEosTest { }); } - protected static void activateDatacenter(final ClusterNode clusterNode) { - clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE); + protected static CompletableFuture activateDatacenter(final ClusterNode clusterNode) { + final CompletionStage ask = + AskPattern.ask(clusterNode.getOwnerSupervisor(), + ActivateDataCenter::new, + Duration.ofSeconds(20), + clusterNode.actorSystem.scheduler()); + return ask.toCompletableFuture(); } - protected static void deactivateDatacenter(final ClusterNode clusterNode) { - clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE); + protected static CompletableFuture deactivateDatacenter(final ClusterNode clusterNode) { + final CompletionStage ask = + AskPattern.ask(clusterNode.getOwnerSupervisor(), + DeactivateDataCenter::new, + Duration.ofSeconds(20), + clusterNode.actorSystem.scheduler()); + return ask.toCompletableFuture(); } protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity, diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java index 440229e9b1..28566c7f15 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java @@ -62,7 +62,7 @@ public class DataCentersTest extends AbstractNativeEosTest { registerCandidates(node3, ENTITY_1, "member-3"); registerCandidates(node4, ENTITY_1, "member-4"); - activateDatacenter(node1); + activateDatacenter(node1).get(); waitUntillOwnerPresent(node1, ENTITY_1); final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1); @@ -76,8 +76,8 @@ public class DataCentersTest extends AbstractNativeEosTest { verifyListenerState(listener1, ENTITY_1, false, false, true); verifyListenerState(listener2, ENTITY_1, false, false, false); - deactivateDatacenter(node1); - activateDatacenter(node4); + deactivateDatacenter(node1).get(); + activateDatacenter(node4).get(); verifyListenerState(listener1, ENTITY_1, true, false, false); verifyListenerState(listener2, ENTITY_1, true, true, false); @@ -88,8 +88,8 @@ public class DataCentersTest extends AbstractNativeEosTest { verifyListenerState(listener1, ENTITY_1, true, false, false); verifyListenerState(listener2, ENTITY_1, true, false, true); - deactivateDatacenter(node3); - activateDatacenter(node2); + deactivateDatacenter(node3).get(); + activateDatacenter(node2).get(); // no candidate in dc-primary so no owners after datacenter activation verifyListenerState(listener1, ENTITY_1, false, false, false); @@ -97,12 +97,12 @@ public class DataCentersTest extends AbstractNativeEosTest { } @Test - public void testDataCenterShutdown() { + public void testDataCenterShutdown() throws Exception { registerCandidates(node1, ENTITY_1, "member-1"); registerCandidates(node3, ENTITY_1, "member-3"); registerCandidates(node4, ENTITY_1, "member-4"); - activateDatacenter(node1); + activateDatacenter(node1).get(); waitUntillOwnerPresent(node1, ENTITY_1); final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1); @@ -119,7 +119,7 @@ public class DataCentersTest extends AbstractNativeEosTest { ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20)); ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20)); - activateDatacenter(node3); + activateDatacenter(node3).get(); verifyListenerState(listener2, ENTITY_1, true, true, false); unregisterCandidates(node3, ENTITY_1, "member-3"); diff --git a/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang index 607bff6456..25c88475f6 100644 --- a/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang @@ -261,4 +261,18 @@ module cluster-admin { } } } + + rpc activate-eos-datacenter { + description "Activates the datacenter that the node this rpc is called on belongs to. The caller must maintain + only a single active datacenter at a time as the singleton components will interfere with each + other otherwise. This only needs to be used if configuring multiple datacenters or if not using + default datacenter."; + } + + rpc deactivate-eos-datacenter { + description "Deactivates the datacenter that the node this rpc is called on belongs to. The caller must maintain + only a single active datacenter at a time as the singleton components will interfere with each + other otherwise. This only needs to be used if configuring multiple datacenters or if not using + default datacenter."; + } } diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml b/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml index b9c8bf893b..52a5ac77f9 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml +++ b/opendaylight/md-sal/sal-cluster-admin-impl/pom.xml @@ -64,6 +64,10 @@ + + org.opendaylight.controller + eos-dom-akka + org.opendaylight.mdsal mdsal-binding-api diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index cc1152bb84..f11c60f561 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -54,7 +54,10 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.eos.akka.NativeEosService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; @@ -72,6 +75,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; @@ -130,10 +135,12 @@ public class ClusterAdminRpcService implements ClusterAdminService { private final DistributedDataStoreInterface operDataStore; private final BindingNormalizedNodeSerializer serializer; private final Timeout makeLeaderLocalTimeout; + private final NativeEosService nativeEosService; public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore, - final DistributedDataStoreInterface operDataStore, - final BindingNormalizedNodeSerializer serializer) { + final DistributedDataStoreInterface operDataStore, + final BindingNormalizedNodeSerializer serializer, + final NativeEosService nativeEosService) { this.configDataStore = configDataStore; this.operDataStore = operDataStore; this.serializer = serializer; @@ -141,6 +148,8 @@ public class ClusterAdminRpcService implements ClusterAdminService { this.makeLeaderLocalTimeout = new Timeout(configDataStore.getActorUtils().getDatastoreContext() .getShardLeaderElectionTimeout().duration().$times(2)); + + this.nativeEosService = nativeEosService; } @Override @@ -514,6 +523,50 @@ public class ClusterAdminRpcService implements ClusterAdminService { MoreExecutors.directExecutor()); } + @Override + public ListenableFuture> activateEosDatacenter( + final ActivateEosDatacenterInput input) { + LOG.debug("Activating EOS Datacenter"); + final SettableFuture> future = SettableFuture.create(); + Futures.addCallback(nativeEosService.activateDataCenter(), new FutureCallback<>() { + @Override + public void onSuccess(final Void result) { + LOG.debug("Successfully activated datacenter."); + future.set(RpcResultBuilder.success().build()); + } + + @Override + public void onFailure(final Throwable failure) { + future.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to activate datacenter.", failure).build()); + } + }, MoreExecutors.directExecutor()); + + return future; + } + + @Override + public ListenableFuture> deactivateEosDatacenter( + final DeactivateEosDatacenterInput input) { + LOG.debug("Deactivating EOS Datacenter"); + final SettableFuture> future = SettableFuture.create(); + Futures.addCallback(nativeEosService.deactivateDataCenter(), new FutureCallback<>() { + @Override + public void onSuccess(final Void result) { + LOG.debug("Successfully deactivated datacenter."); + future.set(RpcResultBuilder.success().build()); + } + + @Override + public void onFailure(final Throwable failure) { + future.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to deactivate datacenter.", failure).build()); + } + }, MoreExecutors.directExecutor()); + + return future; + } + private static RpcResult processReplies( final ImmutableMap> allShardReplies) { final Map result = Maps.newHashMapWithExpectedSize(allShardReplies.size()); diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/OSGiClusterAdmin.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/OSGiClusterAdmin.java index 5280ad5633..c0e944815e 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/OSGiClusterAdmin.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/OSGiClusterAdmin.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.admin; import com.google.common.annotations.Beta; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; +import org.opendaylight.controller.eos.akka.NativeEosService; import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; @@ -33,13 +34,15 @@ public final class OSGiClusterAdmin { BindingNormalizedNodeSerializer serializer = null; @Reference RpcProviderService rpcProviderService = null; + @Reference + NativeEosService nativeEosService = null; private ObjectRegistration reg; @Activate void activate() { reg = rpcProviderService.registerRpcImplementation(ClusterAdminService.class, - new ClusterAdminRpcService(configDatastore, operDatastore, serializer)); + new ClusterAdminRpcService(configDatastore, operDatastore, serializer, nativeEosService)); LOG.info("Cluster Admin services started"); } diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index f90a428c03..ba00174f24 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -127,7 +127,8 @@ public class ClusterAdminRpcServiceTest { String fileName = "target/testBackupDatastore"; new File(fileName).delete(); - ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null); + final ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), + null, null); RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder() .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); @@ -291,8 +292,8 @@ public class ClusterAdminRpcServiceTest { MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig("module-shards-cars-member-1.conf").build(); - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore(), null); + final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore(), null, null); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); @@ -335,8 +336,8 @@ public class ClusterAdminRpcServiceTest { final String... peerMemberNames) throws Exception { memberNode.waitForMembersUp(peerMemberNames); - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore(), null); + final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore(), null, null); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder() .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); @@ -356,8 +357,8 @@ public class ClusterAdminRpcServiceTest { private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName, final String newLeader) throws Exception { - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), - memberNode.operDataStore(), null); + final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore(), null, null); final RpcResult rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() .setDataStoreType(DataStoreType.Config).setShardName(shardName).build()) @@ -412,8 +413,8 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to remove it's local shard - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore(), null, null); RpcResult rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()) @@ -437,8 +438,8 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-1 to remove member-2 - ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore(), null); + final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore(), null, null); rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars") .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); @@ -473,8 +474,8 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on leader member-1 to remove it's local shard - ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore(), null); + final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore(), null, null); RpcResult rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder() .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()) @@ -521,8 +522,8 @@ public class ClusterAdminRpcServiceTest { newReplicaNode2.kit().getRef()); newReplicaNode2.kit().expectMsgClass(Success.class); - ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), - newReplicaNode2.operDataStore(), null); + final ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(), + newReplicaNode2.operDataStore(), null, null); RpcResult rpcResult = service.addReplicasForAllShards( new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); @@ -579,8 +580,8 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3"); verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2"); - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore(), null, null); RpcResult rpcResult = service3.removeAllShardReplicas( new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS); @@ -625,8 +626,8 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore(), null, null); RpcResult rpcResult = service3 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() @@ -659,8 +660,8 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status - ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), - leaderNode.operDataStore(), null); + final ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), + leaderNode.operDataStore(), null, null); RpcResult rpcResult = service .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder() @@ -701,8 +702,8 @@ public class ClusterAdminRpcServiceTest { // Invoke RPC service on member-3 to change voting status - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore(), null, null); RpcResult rpcResult = service3.changeMemberVotingStatesForAllShards( new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(List.of( @@ -753,8 +754,8 @@ public class ClusterAdminRpcServiceTest { verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE), new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE)); - ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), - replicaNode3.operDataStore(), null); + final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore(), null, null); RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards( new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); @@ -848,8 +849,8 @@ public class ClusterAdminRpcServiceTest { verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState())); - ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), - replicaNode1.operDataStore(), null); + final ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), + replicaNode1.operDataStore(), null, null); RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); @@ -915,8 +916,8 @@ public class ClusterAdminRpcServiceTest { new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE), new SimpleEntry<>("member-6", FALSE)); - ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), - leaderNode1.operDataStore(), null); + final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore(), null, null); RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards( new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS); @@ -954,7 +955,7 @@ public class ClusterAdminRpcServiceTest { type + datastoreTypeSuffix).toString(), info.isVoting())); } - String shardID = ShardIdentifier.create(shard, MemberName.forName(member), + final String shardID = ShardIdentifier.create(shard, MemberName.forName(member), type + datastoreTypeSuffix).toString(); InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null)); InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1, -- 2.36.6