import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo;
+import org.opendaylight.controller.cluster.notifications.DefaultLeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
self().tell(applyState, self());
}
- protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ @NonNullByDefault
+ final LeaderStateChanged newLeaderStateChanged(final String memberId, final @Nullable String leaderId,
final short leaderPayloadVersion) {
- return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
+ return wrapLeaderStateChanged(new DefaultLeaderStateChanged(memberId, leaderId, leaderPayloadVersion));
+ }
+
+ @NonNullByDefault
+ protected LeaderStateChanged wrapLeaderStateChanged(final LeaderStateChanged change) {
+ return change;
}
@Override
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.newRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
// with a higher term.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ rc -> rc.newRole().equals(RaftState.Follower.name()));
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's index 1 entry.
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.newRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
// with a higher term.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ rc -> rc.newRole().equals(RaftState.Follower.name()));
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's entry.
// The leader should transition to IsolatedLeader.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+ rc -> rc.newRole().equals(RaftState.IsolatedLeader.name()));
forceElectionOnFollower1();
// with a higher term.
expectFirstMatching(leaderNotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+ rc -> rc.newRole().equals(RaftState.Follower.name()));
// The previous leader has conflicting log entries starting at index 2 with different terms which should get
// replaced by the new leader's entries.
follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
expectFirstMatching(follower1NotifierActor, RoleChanged.class,
- rc -> rc.getNewRole().equals(RaftState.Leader.name()));
+ rc -> rc.newRole().equals(RaftState.Leader.name()));
currentTerm = follower1Context.currentTerm();
}
Collections.reverse(leaderStateChanges);
final var actual = leaderStateChanges.iterator();
for (int i = expLeaderIds.length - 1; i >= 0; i--) {
- assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
+ assertEquals("getLeaderId", expLeaderIds[i], actual.next().leaderId());
}
}
LeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier,
LeaderStateChanged.class);
- assertEquals("getLeaderId", null, leaderStateChanged.getLeaderId());
+ assertEquals("leaderId", null, leaderStateChanged.leaderId());
MessageCollectorActor.clearMessages(roleChangeNotifier);
followerInstance.stopDropMessages(AppendEntries.class);
- leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier,
- LeaderStateChanged.class);
- assertEquals("getLeaderId", leaderId, leaderStateChanged.getLeaderId());
+ leaderStateChanged = MessageCollectorActor.expectFirstMatching(roleChangeNotifier, LeaderStateChanged.class);
+ assertEquals("leaderId", leaderId, leaderStateChanged.leaderId());
}
private void createNewLeaderActor() {
// Verify the expected raft state changes. It should go to PreLeader since it has an uncommitted entry.
List<RoleChanged> roleChange = expectMatching(follower1NotifierActor, RoleChanged.class, 3);
- assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).getNewRole());
- assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).getNewRole());
- assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).getNewRole());
+ assertEquals("Role change 1", RaftState.Candidate.name(), roleChange.get(0).newRole());
+ assertEquals("Role change 2", RaftState.PreLeader.name(), roleChange.get(1).newRole());
+ assertEquals("Role change 3", RaftState.Leader.name(), roleChange.get(2).newRole());
final long previousTerm = currentTerm;
currentTerm = follower1Context.currentTerm();
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertNull(raftRoleChanged.getOldRole());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+ assertEquals(persistenceId, raftRoleChanged.memberId());
+ assertNull(raftRoleChanged.oldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.newRole());
// check if the notifier got a role change from Follower to Candidate
raftRoleChanged = matches.get(1);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+ assertEquals(persistenceId, raftRoleChanged.memberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.oldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.newRole());
// check if the notifier got a role change from Candidate to Leader
raftRoleChanged = matches.get(2);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+ assertEquals(persistenceId, raftRoleChanged.memberId());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.oldRole());
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.newRole());
LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
notifierActor, LeaderStateChanged.class);
- assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
- assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
+ assertEquals(raftRoleChanged.memberId(), leaderStateChange.leaderId());
+ assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.leaderPayloadVersion());
MessageCollectorActor.clearMessages(notifierActor);
raftActor.newBehavior(follower);
leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
- assertEquals(persistenceId, leaderStateChange.getMemberId());
- assertEquals(null, leaderStateChange.getLeaderId());
+ assertEquals(persistenceId, leaderStateChange.memberId());
+ assertNull(leaderStateChange.leaderId());
raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
- assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.oldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.newRole());
MessageCollectorActor.clearMessages(notifierActor);
raftActor.handleCommand("any");
leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
- assertEquals(persistenceId, leaderStateChange.getMemberId());
- assertEquals(newLeaderId, leaderStateChange.getLeaderId());
- assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
+ assertEquals(persistenceId, leaderStateChange.memberId());
+ assertEquals(newLeaderId, leaderStateChange.leaderId());
+ assertEquals(newLeaderVersion, leaderStateChange.leaderPayloadVersion());
MessageCollectorActor.clearMessages(notifierActor);
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertNull(raftRoleChanged.getOldRole());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+ assertEquals(persistenceId, raftRoleChanged.memberId());
+ assertNull(raftRoleChanged.oldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.newRole());
// check if the notifier got a role change from Follower to Candidate
raftRoleChanged = matches.get(1);
- assertEquals(persistenceId, raftRoleChanged.getMemberId());
- assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
- assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
+ assertEquals(persistenceId, raftRoleChanged.memberId());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.oldRole());
+ assertEquals(RaftState.Candidate.name(), raftRoleChanged.newRole());
}
@Test
0L, -1L, (short)1), ActorRef.noSender());
LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
notifierActor, LeaderStateChanged.class);
- assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
+ assertEquals("leaderId", "leader", leaderStateChange.leaderId());
MessageCollectorActor.clearMessages(notifierActor);
raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender());
leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
- assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
- assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
+ assertEquals(persistenceId, leaderStateChange.memberId());
+ assertNull(leaderStateChange.leaderId());
TEST_LOG.info("testLeaderTransitioning ending");
}
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
* @return the first matching message
*/
public static <T> T getFirstMatching(final ActorRef actor, final Class<T> clazz) {
- List<Object> allMessages = getAllMessages(actor);
+ final var allMessages = getAllMessages(actor);
- for (Object message : allMessages) {
- if (message.getClass().equals(clazz)) {
+ for (var message : allMessages) {
+ if (matches(clazz, message)) {
return clazz.cast(message);
}
}
List<T> output = new ArrayList<>();
- for (Object message : allMessages) {
- if (message.getClass().equals(clazz)) {
+ for (var message : allMessages) {
+ if (matches(clazz, message)) {
output.add(clazz.cast(message));
}
}
return output;
}
+ private static boolean matches(final Class<?> clazz, final Object obj) {
+ return clazz.equals(obj.getClass()) || Modifier.isAbstract(clazz.getModifiers()) && clazz.isInstance(obj);
+ }
+
public static void waitUntilReady(final ActorRef actor) throws TimeoutException, InterruptedException {
long timeout = 500;
FiniteDuration duration = FiniteDuration.create(timeout, TimeUnit.MILLISECONDS);
--- /dev/null
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
+/**
+ * Default implementation of {@link LeaderStateChanged}.
+ */
+@NonNullByDefault
+public record DefaultLeaderStateChanged(
+ String memberId,
+ @Nullable String leaderId,
+ short leaderPayloadVersion) implements LeaderStateChanged {
+ public DefaultLeaderStateChanged {
+ requireNonNull(memberId);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
+/**
+ * Bese lass for extended {@link LeaderStateChanged} base implementations.
+ */
+@NonNullByDefault
+public abstract non-sealed class ForwadingLeaderStateChanged implements LeaderStateChanged {
+ @Override
+ public final String memberId() {
+ return delegate().memberId();
+ }
+
+ @Override
+ public final @Nullable String leaderId() {
+ return delegate().leaderId();
+ }
+
+ @Override
+ public final short leaderPayloadVersion() {
+ return delegate().leaderPayloadVersion();
+ }
+
+ protected abstract LeaderStateChanged delegate();
+}
*/
package org.opendaylight.controller.cluster.notifications;
-import static java.util.Objects.requireNonNull;
-
-import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
*
* @author Thomas Pantelis
*/
-public class LeaderStateChanged {
- private final @NonNull String memberId;
- private final @Nullable String leaderId;
- private final short leaderPayloadVersion;
-
- public LeaderStateChanged(final @NonNull String memberId, final @Nullable String leaderId,
- final short leaderPayloadVersion) {
- this.memberId = requireNonNull(memberId);
- this.leaderId = leaderId;
- this.leaderPayloadVersion = leaderPayloadVersion;
- }
-
- public @NonNull String getMemberId() {
- return memberId;
- }
-
- public @Nullable String getLeaderId() {
- return leaderId;
- }
+@NonNullByDefault
+public sealed interface LeaderStateChanged extends MemberNotication
+ permits DefaultLeaderStateChanged, ForwadingLeaderStateChanged {
- public short getLeaderPayloadVersion() {
- return leaderPayloadVersion;
- }
+ @Nullable String leaderId();
- @Override
- public String toString() {
- return "LeaderStateChanged [memberId=" + memberId
- + ", leaderId=" + leaderId
- + ", leaderPayloadVersion=" + leaderPayloadVersion + "]";
- }
+ short leaderPayloadVersion();
}
--- /dev/null
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+/**
+ * A notification about a member.
+ */
+public sealed interface MemberNotication permits LeaderStateChanged, RoleChanged {
+
+
+ String memberId();
+}
* Reply message sent from a RoleChangeNotifier to the Role Change Listener.
* Can be sent to a separate actor system and hence should be made serializable.
*/
+// FIXME: get a cookie or something?
+// FIXME: definitely final
public class RegisterRoleChangeListenerReply implements Serializable {
+ @java.io.Serial
private static final long serialVersionUID = -1972061601184451430L;
}
// this message is sent by RaftActor. Notify registered listeners when this message is received.
LOG.info("RoleChangeNotifier for {} , received role change from {} to {}", memberId,
- roleChanged.getOldRole(), roleChanged.getNewRole());
+ roleChanged.oldRole(), roleChanged.newRole());
- latestRoleChangeNotification =
- new RoleChangeNotification(roleChanged.getMemberId(),
- roleChanged.getOldRole(), roleChanged.getNewRole());
+ latestRoleChangeNotification = new RoleChangeNotification(roleChanged.memberId(), roleChanged.oldRole(),
+ roleChanged.newRole());
- for (ActorRef listener : registeredListeners.values()) {
+ for (var listener : registeredListeners.values()) {
listener.tell(latestRoleChangeNotification, self());
}
} else if (message instanceof LeaderStateChanged leaderStateChanged) {
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.notifications;
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
/**
* Role Change message initiated internally from the Raft Actor when a the behavior/role changes.
- * Since its internal , need not be serialized
+ * Since its internal, need not be serialized.
*/
-public class RoleChanged {
- private final String memberId;
- private final String oldRole;
- private final String newRole;
-
- public RoleChanged(String memberId, String oldRole, String newRole) {
- this.memberId = memberId;
- this.oldRole = oldRole;
- this.newRole = newRole;
- }
-
- public String getMemberId() {
- return memberId;
- }
-
- public String getOldRole() {
- return oldRole;
- }
-
- public String getNewRole() {
- return newRole;
- }
-
- @Override
- public String toString() {
- return "RoleChanged [memberId=" + memberId + ", oldRole=" + oldRole + ", newRole=" + newRole + "]";
+@NonNullByDefault
+public record RoleChanged(String memberId, @Nullable String oldRole, String newRole) implements MemberNotication {
+ public RoleChanged {
+ requireNonNull(memberId);
+ requireNonNull(newRole);
}
}
*/
package org.opendaylight.controller.cluster.notifications;
-import java.util.ArrayList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import java.util.List;
+import java.util.stream.Stream;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.testkit.javadsl.TestKit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RoleChangeNotifierTest {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+class RoleChangeNotifierTest {
private static final String MEMBER_ID = "member-1";
private static final int LISTENER_COUNT = 3;
- private ActorSystem system;
+
private List<TestProbe> listeners;
+ private ActorSystem system;
private ActorRef notifier;
- @Before
- public void setUp() {
+ @BeforeEach
+ void beforeEach() {
system = ActorSystem.apply();
notifier = system.actorOf(RoleChangeNotifier.getProps(MEMBER_ID));
- listeners = new ArrayList<>(LISTENER_COUNT);
- for (int i = 0; i < LISTENER_COUNT; i++) {
- listeners.add(new TestProbe(system));
- }
+ listeners = Stream.generate(() -> new TestProbe(system)).limit(LISTENER_COUNT).toList();
}
- @After
- public void tearDown() {
+ @AfterEach
+ void afterEach() {
TestKit.shutdownActorSystem(system);
}
@Test
- public void testHandleReceiveRoleChange() {
+ void testHandleReceiveRoleChange() {
registerListeners();
- final RoleChanged msg = new RoleChanged(MEMBER_ID, "old", "new");
+ final var msg = new RoleChanged(MEMBER_ID, "old", "new");
notifier.tell(msg, ActorRef.noSender());
checkListenerRoleChangeNotification(msg);
}
@Test
- public void testHandleReceiveLeaderStateChanged() {
+ void testHandleReceiveLeaderStateChanged() {
registerListeners();
- final LeaderStateChanged msg = new LeaderStateChanged(MEMBER_ID, "leader", (short) 0);
+ final var msg = new DefaultLeaderStateChanged(MEMBER_ID, "leader", (short) 0);
notifier.tell(msg, ActorRef.noSender());
checkListenerLeaderStateChanged(msg);
}
@Test
- public void testHandleReceiveRegistrationAfterRoleChange() {
- final RoleChanged roleChanged1 = new RoleChanged(MEMBER_ID, "old1", "new1");
- final RoleChanged lastRoleChanged = new RoleChanged(MEMBER_ID, "old2", "new2");
+ void testHandleReceiveRegistrationAfterRoleChange() {
+ final var roleChanged1 = new RoleChanged(MEMBER_ID, "old1", "new1");
+ final var lastRoleChanged = new RoleChanged(MEMBER_ID, "old2", "new2");
notifier.tell(roleChanged1, ActorRef.noSender());
notifier.tell(lastRoleChanged, ActorRef.noSender());
registerListeners();
}
@Test
- public void testHandleReceiveRegistrationAfterLeaderStateChange() {
- final LeaderStateChanged leaderStateChanged1 = new LeaderStateChanged(MEMBER_ID, "leader1", (short) 0);
- final LeaderStateChanged lastLeaderStateChanged = new LeaderStateChanged(MEMBER_ID, "leader2", (short) 1);
+ void testHandleReceiveRegistrationAfterLeaderStateChange() {
+ final var leaderStateChanged1 = new DefaultLeaderStateChanged(MEMBER_ID, "leader1", (short) 0);
+ final var lastLeaderStateChanged = new DefaultLeaderStateChanged(MEMBER_ID, "leader2", (short) 1);
notifier.tell(leaderStateChanged1, ActorRef.noSender());
notifier.tell(lastLeaderStateChanged, ActorRef.noSender());
registerListeners();
}
private void registerListeners() {
- for (final TestProbe listener : listeners) {
+ for (var listener : listeners) {
notifier.tell(new RegisterRoleChangeListener(), listener.ref());
listener.expectMsgClass(RegisterRoleChangeListenerReply.class);
}
}
private void checkListenerRoleChangeNotification(final RoleChanged roleChanged) {
- for (final TestProbe listener : listeners) {
+ for (var listener : listeners) {
final RoleChangeNotification received = listener.expectMsgClass(RoleChangeNotification.class);
- Assert.assertEquals(roleChanged.getMemberId(), received.getMemberId());
- Assert.assertEquals(roleChanged.getOldRole(), received.getOldRole());
- Assert.assertEquals(roleChanged.getNewRole(), received.getNewRole());
+ assertEquals(roleChanged.memberId(), received.getMemberId());
+ assertEquals(roleChanged.newRole(), received.getNewRole());
+ assertEquals(roleChanged.oldRole(), received.getOldRole());
}
}
private void checkListenerLeaderStateChanged(final LeaderStateChanged leaderStateChanged) {
- for (final TestProbe listener : listeners) {
- final LeaderStateChanged received = listener.expectMsgClass(LeaderStateChanged.class);
- Assert.assertEquals(leaderStateChanged.getMemberId(), received.getMemberId());
- Assert.assertEquals(leaderStateChanged.getLeaderId(), received.getLeaderId());
- Assert.assertEquals(leaderStateChanged.getLeaderPayloadVersion(), received.getLeaderPayloadVersion());
+ for (var listener : listeners) {
+ assertEquals(leaderStateChanged, listener.expectMsgClass(LeaderStateChanged.class));
}
}
-
}
\ No newline at end of file
}
@Override
- protected final LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
- final short leaderPayloadVersion) {
- return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
- : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
+ protected final LeaderStateChanged wrapLeaderStateChanged(final LeaderStateChanged change) {
+ return new ShardLeaderStateChanged(change, isLeader() ? store.getDataTree() : null);
}
private void onDatastoreContext(final DatastoreContext context) {
import static java.util.Objects.requireNonNull;
-import org.eclipse.jdt.annotation.NonNull;
+import com.google.common.annotations.VisibleForTesting;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.notifications.DefaultLeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.ForwadingLeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
*
* @author Thomas Pantelis
*/
-public final class ShardLeaderStateChanged extends LeaderStateChanged {
+@NonNullByDefault
+public final class ShardLeaderStateChanged extends ForwadingLeaderStateChanged {
private final @Nullable ReadOnlyDataTree localShardDataTree;
+ private final LeaderStateChanged delegate;
- public ShardLeaderStateChanged(final @NonNull String memberId, final @Nullable String leaderId,
- final @NonNull ReadOnlyDataTree localShardDataTree, final short leaderPayloadVersion) {
- super(memberId, leaderId, leaderPayloadVersion);
- this.localShardDataTree = requireNonNull(localShardDataTree);
+ public ShardLeaderStateChanged(final LeaderStateChanged delegate,
+ final @Nullable ReadOnlyDataTree localShardDataTree) {
+ this.delegate = requireNonNull(delegate);
+ this.localShardDataTree = localShardDataTree;
}
- public ShardLeaderStateChanged(final @NonNull String memberId, final @Nullable String leaderId,
+ @VisibleForTesting
+ public ShardLeaderStateChanged(final String memberId, final @Nullable String leaderId,
+ final ReadOnlyDataTree localShardDataTree, final short leaderPayloadVersion) {
+ this(new DefaultLeaderStateChanged(memberId, leaderId, leaderPayloadVersion),
+ requireNonNull(localShardDataTree));
+ }
+
+ @VisibleForTesting
+ public ShardLeaderStateChanged(final String memberId, final @Nullable String leaderId,
final short leaderPayloadVersion) {
- super(memberId, leaderId, leaderPayloadVersion);
- localShardDataTree = null;
+ this(new DefaultLeaderStateChanged(memberId, leaderId, leaderPayloadVersion), null);
}
public @Nullable ReadOnlyDataTree localShardDataTree() {
return localShardDataTree;
}
+
+ @Override
+ protected LeaderStateChanged delegate() {
+ return delegate;
+ }
}
private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
LOG.info("{}: Received LeaderStateChanged message: {}", logName(), leaderStateChanged);
- ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+ ShardInformation shardInformation = findShardInformation(leaderStateChanged.memberId());
if (shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.localShardDataTree());
- shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
- if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ shardInformation.setLeaderVersion(leaderStateChanged.leaderPayloadVersion());
+ if (shardInformation.setLeaderId(leaderStateChanged.leaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
notifyShardAvailabilityCallbacks(shardInformation);
checkReady();
} else {
- LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+ LOG.debug("No shard found with member Id {}", leaderStateChanged.memberId());
}
}
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.Before;
import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.DefaultLeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
TestActorRef<RoleChangeNotifier> notifierTestActorRef = TestActorRef.create(getSystem(),
RoleChangeNotifier.getProps(actorId), actorId);
- notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1", (short) 5), ActorRef.noSender());
+ notifierTestActorRef.tell(new DefaultLeaderStateChanged("member1", "leader1", (short) 5), ActorRef.noSender());
// listener registers after the sate has been changed, ensure we
// sent the latest state change after a reply
testKit.expectMsgClass(RegisterRoleChangeListenerReply.class);
LeaderStateChanged leaderStateChanged = testKit.expectMsgClass(LeaderStateChanged.class);
- assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
- assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId());
- assertEquals("getLeaderPayloadVersion", 5, leaderStateChanged.getLeaderPayloadVersion());
+ assertEquals("getMemberId", "member1", leaderStateChanged.memberId());
+ assertEquals("getLeaderId", "leader1", leaderStateChanged.leaderId());
+ assertEquals("getLeaderPayloadVersion", 5, leaderStateChanged.leaderPayloadVersion());
- notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2", (short) 6), ActorRef.noSender());
+ notifierTestActorRef.tell(new DefaultLeaderStateChanged("member1", "leader2", (short) 6), ActorRef.noSender());
leaderStateChanged = testKit.expectMsgClass(LeaderStateChanged.class);
- assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId());
- assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId());
- assertEquals("getLeaderPayloadVersion", 6, leaderStateChanged.getLeaderPayloadVersion());
+ assertEquals("getMemberId", "member1", leaderStateChanged.memberId());
+ assertEquals("getLeaderId", "leader2", leaderStateChanged.leaderId());
+ assertEquals("getLeaderPayloadVersion", 6, leaderStateChanged.leaderPayloadVersion());
}
}
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.DefaultLeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
shardManager.tell(
new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()),
mockShardActor);
- shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
+ shardManager.tell(new DefaultLeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION),
mockShardActor);
shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), kit.getRef());