Add the ability to activate/deactivate singleton supervisor.
Make the supervisor pick owners only from the active datacenter.
All in all this should give us the building blocks needed for
the active/backup cluster use-case.
JIRA: CONTROLLER-1982
Change-Id: Iae0a4a59cc461c8187c59bb8a5a66c73dee5be4c
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
-import org.opendaylight.controller.eos.akka.owner.supervisor.OwnerSyncer;
+import org.opendaylight.controller.eos.akka.owner.supervisor.IdleSupervisor;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
// start the initial sync behavior that switches to the regular one after syncing
- ownerSupervisor = clusterSingleton.init(SingletonActor.of(OwnerSyncer.create(), "OwnerSupervisor"));
+ ownerSupervisor = clusterSingleton.init(SingletonActor.of(IdleSupervisor.create(), "OwnerSupervisor"));
}
public static Behavior<BootstrapCommand> create() {
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.Member;
+import akka.cluster.typed.Cluster;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initial Supervisor behavior that stays idle and only switches itself to the active behavior when its running
+ * in the primary datacenter, or is activated on demand. Once the supervisor instance is no longer needed in the
+ * secondary datacenter it needs to be deactivated manually.
+ */
+public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(IdleSupervisor.class);
+
+ private static final String DATACENTER_PREFIX = "dc-";
+ private static final String DEFAULT_DATACENTER = "dc-default";
+
+ private IdleSupervisor(final ActorContext<OwnerSupervisorCommand> context) {
+ super(context);
+ final Cluster cluster = Cluster.get(context.getSystem());
+
+ final String datacenterRole = extractDatacenterRole(cluster.selfMember());
+ if (datacenterRole.equals(DEFAULT_DATACENTER)) {
+ LOG.debug("No datacenter configured, activating default data center");
+ context.getSelf().tell(ActivateDataCenter.INSTANCE);
+ }
+
+ LOG.debug("Idle supervisor started on {}.", cluster.selfMember());
+ }
+
+ public static Behavior<OwnerSupervisorCommand> create() {
+
+ return Behaviors.setup(IdleSupervisor::new);
+ }
+
+ @Override
+ public Receive<OwnerSupervisorCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(ActivateDataCenter.class, this::onActivateDataCenter)
+ .build();
+ }
+
+ private Behavior<OwnerSupervisorCommand> onActivateDataCenter(final ActivateDataCenter message) {
+ LOG.debug("Received ActivateDataCenter command switching to syncer behavior,");
+ return OwnerSyncer.create();
+ }
+
+ private String extractDatacenterRole(final Member selfMember) {
+ return selfMember.getRoles().stream()
+ .filter(role -> role.startsWith(DATACENTER_PREFIX))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(selfMember + " does not have a valid role"));
+ }
+}
import java.util.Set;
import java.util.stream.Collectors;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
- private static final String DATACENTER_PREFIX = "dc";
+ private static final String DATACENTER_PREFIX = "dc-";
private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
private final Cluster cluster;
private final SelfUniqueAddress node;
+ private final String dataCenter;
private final Set<String> activeMembers;
final DistributedData distributedData = DistributedData.get(context.getSystem());
final ActorRef<Replicator.Command> replicator = distributedData.replicator();
- this.cluster = Cluster.get(context.getSystem());
- this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ cluster = Cluster.get(context.getSystem());
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ dataCenter = extractDatacenterRole(cluster.selfMember());
- this.node = distributedData.selfUniqueAddress();
- this.activeMembers = getActiveMembers(cluster);
+ node = distributedData.selfUniqueAddress();
+ activeMembers = getActiveMembers();
this.currentCandidates = currentCandidates;
this.currentOwners = currentOwners;
public Receive<OwnerSupervisorCommand> createReceive() {
return newReceiveBuilder()
.onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+ .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
.onMessage(MemberUpEvent.class, this::onPeerUp)
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.build();
}
+ private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
+ LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+ return IdleSupervisor.create();
+ }
+
private void reassignUnreachableOwners() {
final Set<String> ownersToReassign = new HashSet<>();
for (final String owner : ownerToEntity.keys()) {
}
private void handleReachableEvent(final Set<String> roles) {
- activeMembers.add(extractRole(roles));
- assignMissingOwners();
+ if (roles.contains(dataCenter)) {
+ activeMembers.add(extractRole(roles));
+ assignMissingOwners();
+ } else {
+ LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+ }
}
private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
}
private void handleUnreachableEvent(final Set<String> roles) {
- activeMembers.remove(extractRole(roles));
- reassignUnreachableOwners();
+ if (roles.contains(dataCenter)) {
+ activeMembers.remove(extractRole(roles));
+ reassignUnreachableOwners();
+ } else {
+ LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+ }
}
- private static Set<String> getActiveMembers(final Cluster cluster) {
- final Set<String> activeMembers = new HashSet<>();
- cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
- activeMembers.removeAll(cluster.state().getUnreachable().stream()
- .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+ private Set<String> getActiveMembers() {
+ final Set<String> members = new HashSet<>();
+ cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
+ // filter out unreachable
+ members.removeAll(cluster.state().getUnreachable().stream()
+ .map(OwnerSupervisor::extractRole)
+ .collect(Collectors.toSet()));
+
+ // filter out members not from our datacenter
+ cluster.state().getMembers().forEach(member -> {
+ if (!member.roles().contains(dataCenter)) {
+ members.remove(extractRole(member));
+ }
+ });
- return activeMembers;
+ return members;
}
private static String extractRole(final Member member) {
}
private static String extractRole(final Set<String> roles) {
- return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+ return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+ }
+
+ private String extractDatacenterRole(final Member member) {
+ return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class ActivateDataCenter extends OwnerSupervisorCommand implements Serializable {
+ public static final ActivateDataCenter INSTANCE = new ActivateDataCenter();
+
+ private static final long serialVersionUID = 1L;
+
+ private ActivateDataCenter() {
+ // NOOP
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class DeactivateDataCenter extends OwnerSupervisorCommand implements Serializable {
+ public static final DeactivateDataCenter INSTANCE = new DeactivateDataCenter();
+ private static final long serialVersionUID = 1L;
+
+ private DeactivateDataCenter() {
+ // NOOP
+ }
+}
import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+ protected static final String DEFAULT_DATACENTER = "dc-default";
+
protected static final List<String> TWO_NODE_SEED_NODES =
List.of("akka://ClusterSystem@127.0.0.1:2550",
"akka://ClusterSystem@127.0.0.1:2551");
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552");
+ protected static final List<String> DATACENTER_SEED_NODES =
+ List.of("akka://ClusterSystem@127.0.0.1:2550",
+ "akka://ClusterSystem@127.0.0.1:2551",
+ "akka://ClusterSystem@127.0.0.1:2552",
+ "akka://ClusterSystem@127.0.0.1:2553");
+
private static final String REMOTE_PROTOCOL = "akka";
private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
private static final String ROLE_PARAM = "akka.cluster.roles";
private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
+ private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
protected static ClusterNode startupRemote(final int port, final List<String> roles)
runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
}
+ protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
+ final List<String> seedNodes, final String dataCenter)
+ throws ExecutionException, InterruptedException {
+ final Map<String, Object> overrides = new HashMap<>();
+ overrides.put(PORT_PARAM, port);
+ overrides.put(ROLE_PARAM, roles);
+ if (!seedNodes.isEmpty()) {
+ overrides.put(SEED_NODES_PARAM, seedNodes);
+ }
+ overrides.put(DATA_CENTER_PARAM, dataCenter);
+
+ final Config config = ConfigFactory.parseMap(overrides)
+ .withFallback(ConfigFactory.load());
+
+ // Create a classic Akka system since thats what we will have in osgi
+ final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+ final ActorRef<BootstrapCommand> eosBootstrap =
+ Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
+
+ final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+ GetRunningContext::new,
+ Duration.ofSeconds(5),
+ Adapter.toTyped(system.scheduler()));
+ final RunningContext runningContext = ask.toCompletableFuture().get();
+
+ return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+ runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+ }
+
private static Behavior<BootstrapCommand> rootBehavior() {
return Behaviors.setup(context -> EOSMain.create());
}
return listener;
}
- protected static void reachableMember(final ClusterNode node, final String role) {
+ protected static void reachableMember(final ClusterNode node, final String... role) {
reachableMember(node.getOwnerSupervisor(), role);
}
- protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+ protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final String... role) {
ownerSupervisor.tell(new MemberReachableEvent(
new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
}
- protected static void unreachableMember(final ClusterNode node, final String role) {
+ protected static void unreachableMember(final ClusterNode node, final String... role) {
unreachableMember(node.getOwnerSupervisor(), role);
}
- protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+ protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final String... role) {
ownerSupervisor.tell(new MemberUnreachableEvent(
new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
}
});
}
+ protected static void activateDatacenter(final ClusterNode clusterNode) {
+ clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
+ }
+
+ protected static void deactivateDatacenter(final ClusterNode clusterNode) {
+ clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
+ }
+
protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
await().until(() -> !listener.getChanges().isEmpty());
protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
- private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class);
+ private final Logger log;
private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
private final String member;
public MockEntityOwnershipListener(final String member) {
-
+ log = LoggerFactory.getLogger("EOS-listener-" + member);
this.member = member;
}
registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2");
final ActorRef<OwnerSupervisorCommand> ownerSupervisor = runningContext.getOwnerSupervisor();
- reachableMember(ownerSupervisor, "member-2");
- unreachableMember(ownerSupervisor, "member-1");
+ reachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER);
+ unreachableMember(ownerSupervisor, "member-1", DEFAULT_DATACENTER);
verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER);
final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two");
final Optional<EntityOwnershipState> state = service.getOwnershipState(entity2);
assertFalse(state.isPresent());
- unreachableMember(ownerSupervisor, "member-2");
+ unreachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER);
verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER);
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class DataCentersTest extends AbstractNativeEosTest {
+
+ private ClusterNode node1 = null;
+ private ClusterNode node2 = null;
+ private ClusterNode node3 = null;
+ private ClusterNode node4 = null;
+ public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+ public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+ @Before
+ public void setUp() throws Exception {
+ node1 = startupWithDatacenter(2550, Collections.singletonList("member-1"), DATACENTER_SEED_NODES, "dc-primary");
+ node2 = startupWithDatacenter(2551, Collections.singletonList("member-2"), DATACENTER_SEED_NODES, "dc-primary");
+ node3 = startupWithDatacenter(2552, Collections.singletonList("member-3"), DATACENTER_SEED_NODES, "dc-backup");
+ node4 = startupWithDatacenter(2553, Collections.singletonList("member-4"), DATACENTER_SEED_NODES, "dc-backup");
+
+ // need to wait until all nodes are ready
+ final Cluster cluster = Cluster.get(node4.getActorSystem());
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ final List<Member> members = new ArrayList<>();
+ cluster.state().getMembers().forEach(members::add);
+ if (members.size() != 4) {
+ return false;
+ }
+
+ for (final Member member : members) {
+ if (!member.status().equals(MemberStatus.up())) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+
+ @Test
+ public void testDatacenterActivation() throws Exception {
+ registerCandidates(node1, ENTITY_1, "member-1");
+ registerCandidates(node3, ENTITY_1, "member-3");
+ registerCandidates(node4, ENTITY_1, "member-4");
+
+ activateDatacenter(node1);
+
+ waitUntillOwnerPresent(node1, ENTITY_1);
+ final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+
+ final MockEntityOwnershipListener listener2 = registerListener(node3, ENTITY_1);
+ verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+ unregisterCandidates(node1, ENTITY_1, "member-1");
+
+ verifyListenerState(listener1, ENTITY_1, false, false, true);
+ verifyListenerState(listener2, ENTITY_1, false, false, false);
+
+ deactivateDatacenter(node1);
+ activateDatacenter(node4);
+
+ verifyListenerState(listener1, ENTITY_1, true, false, false);
+ verifyListenerState(listener2, ENTITY_1, true, true, false);
+
+ unregisterCandidates(node3, ENTITY_1, "member-3");
+
+ // checking index after notif so current + 1
+ verifyListenerState(listener1, ENTITY_1, true, false, false);
+ verifyListenerState(listener2, ENTITY_1, true, false, true);
+
+ deactivateDatacenter(node3);
+ activateDatacenter(node2);
+
+ // no candidate in dc-primary so no owners after datacenter activation
+ verifyListenerState(listener1, ENTITY_1, false, false, false);
+ verifyListenerState(listener2, ENTITY_1, false, false, false);
+ }
+
+ @Test
+ public void testDataCenterShutdown() {
+ registerCandidates(node1, ENTITY_1, "member-1");
+ registerCandidates(node3, ENTITY_1, "member-3");
+ registerCandidates(node4, ENTITY_1, "member-4");
+
+ activateDatacenter(node1);
+
+ waitUntillOwnerPresent(node1, ENTITY_1);
+ final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+
+ final MockEntityOwnershipListener listener2 = registerListener(node3, ENTITY_1);
+ verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+ unregisterCandidates(node1, ENTITY_1, "member-1");
+
+ verifyListenerState(listener1, ENTITY_1, false, false, true);
+ verifyListenerState(listener2, ENTITY_1, false, false, false);
+
+ ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+
+ activateDatacenter(node3);
+ verifyListenerState(listener2, ENTITY_1, true, true, false);
+
+ unregisterCandidates(node3, ENTITY_1, "member-3");
+ verifyListenerState(listener2, ENTITY_1, true, false, true);
+ }
+
+ @After
+ public void tearDown() {
+ ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(node4.getActorSystem(), Duration.ofSeconds(20));
+ }
+
+}
@Before
public void setUp() throws Exception {
clusterNode = startup(2550, List.of("member-1"));
-
- reachableMember(clusterNode, "member-2");
- reachableMember(clusterNode, "member-3");
}
@After
public void testListenerPriorToAddingCandidates() {
final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
- registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+ registerCandidates(clusterNode, ENTITY_1, "member-1");
+ waitUntillOwnerPresent(clusterNode, ENTITY_1);
+
+ reachableMember(clusterNode, "member-2", DEFAULT_DATACENTER);
+ reachableMember(clusterNode, "member-3", DEFAULT_DATACENTER);
+
+ registerCandidates(clusterNode, ENTITY_1, "member-2", "member-3");
verifyListenerState(listener, ENTITY_1, true, true, false);
unregisterCandidates(clusterNode, ENTITY_1, "member-1");
registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
waitUntillOwnerPresent(clusterNode, ENTITY_1);
+ reachableMember(clusterNode, "member-2", DEFAULT_DATACENTER);
+ reachableMember(clusterNode, "member-3", DEFAULT_DATACENTER);
+
final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
verifyListenerState(listener, ENTITY_1, true, true, false);
registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
waitUntillOwnerPresent(clusterNode, ENTITY_1);
+ reachableMember(clusterNode, "member-2", DEFAULT_DATACENTER);
+ reachableMember(clusterNode, "member-3", DEFAULT_DATACENTER);
+
final MockEntityOwnershipListener listener1 = registerListener(clusterNode, ENTITY_1);
final MockEntityOwnershipListener listener2 = registerListener(clusterNode, ENTITY_2);
ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
-
if (node3 != null) {
ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
}
verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
- unreachableMember(node1, "member-2");
+ unreachableMember(node1, "member-2", DEFAULT_DATACENTER);
verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
verifyListenerState(secondEntityListener2, ENTITY_2, true, false, true);
verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
- unreachableMember(node1, "member-3");
+ unreachableMember(node1, "member-3", DEFAULT_DATACENTER);
verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
- unregisterCandidates(node1, ENTITY_1, "member-1");
- unregisterCandidates(node1, ENTITY_2, "member-1");
+ unregisterCandidates(node1, ENTITY_1, "member-1", DEFAULT_DATACENTER);
+ unregisterCandidates(node1, ENTITY_2, "member-1", DEFAULT_DATACENTER);
verifyListenerState(firstEntityListener1, ENTITY_1, false, false, true);
verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
verifyListenerState(secondEntityListener2, ENTITY_2, false, false, false);
verifyListenerState(secondEntityListener3, ENTITY_2, false, false, false);
- reachableMember(node1, "member-2");
+ reachableMember(node1, "member-2", DEFAULT_DATACENTER);
verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
final ClusterNode node = startup(2550, Collections.singletonList("member-1"));
try {
- reachableMember(node, "member-2");
- reachableMember(node, "member-3");
+ reachableMember(node, "member-2", DEFAULT_DATACENTER);
+ reachableMember(node, "member-3", DEFAULT_DATACENTER);
registerCandidates(node, ENTITY_1, "member-1", "member-2", "member-3");
final MockEntityOwnershipListener listener = registerListener(node, ENTITY_1);
verifyListenerState(listener, ENTITY_1,true, true, false);
- unreachableMember(node, "member-1");
+ unreachableMember(node, "member-1", DEFAULT_DATACENTER);
verifyListenerState(listener, ENTITY_1, true, false, true);
- unreachableMember(node, "member-2");
+ unreachableMember(node, "member-2", DEFAULT_DATACENTER);
verifyListenerState(listener, ENTITY_1, true, false, false);
- unreachableMember(node, "member-3");
+ unreachableMember(node, "member-3", DEFAULT_DATACENTER);
verifyListenerState(listener, ENTITY_1, false, false, false);
- reachableMember(node, "member-2");
+ reachableMember(node, "member-2", DEFAULT_DATACENTER);
verifyListenerState(listener, ENTITY_1, true, false, false);
// no notification here as member-2 is already the owner
- reachableMember(node, "member-1");
+ reachableMember(node, "member-1", DEFAULT_DATACENTER);
- unreachableMember(node, "member-2");
+ unreachableMember(node, "member-2", DEFAULT_DATACENTER);
verifyListenerState(listener, ENTITY_1,true, true, false);
} finally {
ActorTestKit.shutdown(node.getActorSystem());
// this one could not be assigned during init as we dont have member-2 thats reachable
verifyListenerState(listener2, ENTITY_2, false, false, false);
- reachableMember(node, "member-2");
+ reachableMember(node, "member-2", DEFAULT_DATACENTER);
verifyListenerState(listener2, ENTITY_2, true, false, false);
} finally {
ActorTestKit.shutdown(node.getActorSystem());