From a89f83014714207a4ccb704eb75050d758266d71 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 24 Apr 2015 18:26:22 -0400 Subject: [PATCH] Bug 3020: Add leader version to LeaderStateChanged Added the leader's payload version to the LeaderStateChanged message and modified the raft code to set it. Change-Id: I9a34f90641a2962418d234bb56e55f2df5207e5b Signed-off-by: Tom Pantelis (cherry picked from commit 13ba9adfa24716a7b27bc4cfef198b3fa5c577b0) --- .../controller/cluster/raft/RaftActor.java | 16 ++++++-- .../raft/behaviors/AbstractLeader.java | 2 + .../behaviors/AbstractRaftActorBehavior.java | 11 ++++++ .../DelegatingRaftActorBehavior.java | 5 +++ .../cluster/raft/behaviors/Follower.java | 2 + .../raft/behaviors/RaftActorBehavior.java | 5 +++ .../cluster/raft/MockRaftActor.java | 4 +- .../cluster/raft/RaftActorTest.java | 12 ++++++ .../cluster/raft/behaviors/FollowerTest.java | 7 +++- .../cluster/raft/behaviors/LeaderTest.java | 4 ++ .../notifications/LeaderStateChanged.java | 10 ++++- .../controller/cluster/datastore/Shard.java | 5 ++- .../messages/ShardLeaderStateChanged.java | 4 +- .../datastore/RoleChangeNotifierTest.java | 6 ++- .../cluster/datastore/ShardManagerTest.java | 38 +++++++++++-------- 15 files changed, 102 insertions(+), 29 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 0b3fca090c..ebc157bc17 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -303,9 +303,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) { + if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId()) || + oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if(roleChangeNotifier.isPresent()) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); + roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), + currentBehavior.getLeaderPayloadVersion()), getSelf()); } onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); @@ -318,8 +320,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { - return new LeaderStateChanged(memberId, leaderId); + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { + return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @Override @@ -633,10 +635,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static class BehaviorStateHolder { private RaftActorBehavior behavior; private String leaderId; + private short leaderPayloadVersion; void init(RaftActorBehavior behavior) { this.behavior = behavior; this.leaderId = behavior != null ? behavior.getLeaderId() : null; + this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1; } RaftActorBehavior getBehavior() { @@ -646,5 +650,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { String getLeaderId() { return leaderId; } + + short getLeaderPayloadVersion() { + return leaderPayloadVersion; + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 2eb3b32c6f..de1e1d11ff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public AbstractLeader(RaftActorContext context) { super(context, RaftState.Leader); + setLeaderPayloadVersion(context.getPayloadVersion()); + final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 9e4ae6f6a5..fc2f137e88 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -61,6 +61,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected String leaderId = null; + private short leaderPayloadVersion = -1; + private long replicatedToAllIndex = -1; private final String logName; @@ -420,6 +422,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return leaderId; } + @Override + public short getLeaderPayloadVersion() { + return leaderPayloadVersion; + } + + public void setLeaderPayloadVersion(short leaderPayloadVersion) { + this.leaderPayloadVersion = leaderPayloadVersion; + } + protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) { LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state()); try { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java index 776dae73fa..64bdc4a504 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java @@ -55,4 +55,9 @@ public class DelegatingRaftActorBehavior implements RaftActorBehavior { public long getReplicatedToAllIndex() { return delegate.getReplicatedToAllIndex(); } + + @Override + public short getLeaderPayloadVersion() { + return delegate.getLeaderPayloadVersion(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 150f300a46..d16ddabdde 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -120,6 +120,8 @@ public class Follower extends AbstractRaftActorBehavior { // If we got here then we do appear to be talking to the leader leaderId = appendEntries.getLeaderId(); + setLeaderPayloadVersion(appendEntries.getPayloadVersion()); + // 2. Reply false if log doesn’t contain an entry at prevLogIndex // whose term matches prevLogTerm (§5.3) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index b766e0ce39..a4f7a42640 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -62,4 +62,9 @@ public interface RaftActorBehavior extends AutoCloseable{ * @return */ long getReplicatedToAllIndex(); + + /** + * Returns the leader's payload data version. + */ + short getLeaderPayloadVersion(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index a6853caf9e..d72d40416f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -29,6 +29,8 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { + public static final short PAYLOAD_VERSION = 5; + final RaftActor actorDelegate; final RaftActorRecoveryCohort recoveryCohortDelegate; final RaftActorSnapshotCohort snapshotCohortDelegate; @@ -70,7 +72,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { - super(id, peerAddresses, config, (short) 0); + super(id, peerAddresses, config, PAYLOAD_VERSION); state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 45c2e0d0ad..fb9541c8d9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -418,15 +418,18 @@ public class RaftActorTest extends AbstractActorTest { notifierActor, LeaderStateChanged.class); assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion()); notifierActor.underlyingActor().clear(); MockRaftActor raftActor = raftActorRef.underlyingActor(); final String newLeaderId = "new-leader"; + final short newLeaderVersion = 6; Follower follower = new Follower(raftActor.getRaftActorContext()) { @Override public RaftActorBehavior handleMessage(ActorRef sender, Object message) { leaderId = newLeaderId; + setLeaderPayloadVersion(newLeaderVersion); return this; } }; @@ -448,6 +451,15 @@ public class RaftActorTest extends AbstractActorTest { leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); assertEquals(persistenceId, leaderStateChange.getMemberId()); assertEquals(newLeaderId, leaderStateChange.getLeaderId()); + assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS); + leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class); + assertNull(leaderStateChange); }}; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 443ebc31a4..ce1b46e9fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -401,7 +401,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1, (short)0); + short leaderPayloadVersion = 10; + String leaderId = "leader-1"; + AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion); follower = createBehavior(context); @@ -413,6 +415,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Entry 3", entries.get(0), log.get(3)); assertEquals("Entry 4", entries.get(1), log.get(4)); + assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion()); + assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId()); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index ccde8bfb22..07548d6a92 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -58,6 +58,7 @@ public class LeaderTest extends AbstractLeaderTest { Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower")); private Leader leader; + private final short payloadVersion = 5; @Override @After @@ -967,6 +968,7 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setElectionTimeoutFactor(100000); MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef); context.setConfigParams(configParams); + context.setPayloadVersion(payloadVersion); return context; } @@ -1360,6 +1362,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); + assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); + short payloadVersion = 5; AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java index 23c95ecc99..359e2b221b 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java @@ -20,10 +20,12 @@ import javax.annotation.Nullable; public class LeaderStateChanged { private final String memberId; private final String leaderId; + private final short leaderPayloadVersion; - public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId) { + public LeaderStateChanged(@Nonnull String memberId, @Nullable String leaderId, short leaderPayloadVersion) { this.memberId = Preconditions.checkNotNull(memberId); this.leaderId = leaderId; + this.leaderPayloadVersion = leaderPayloadVersion; } public @Nonnull String getMemberId() { @@ -34,11 +36,15 @@ public class LeaderStateChanged { return leaderId; } + public short getLeaderPayloadVersion() { + return leaderPayloadVersion; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId) - .append("]"); + .append(", leaderPayloadVersion=").append(leaderPayloadVersion).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 71afb5c9b7..c8f2b1b8d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -278,9 +278,10 @@ public class Shard extends RaftActor { } @Override - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) { + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { return new ShardLeaderStateChanged(memberId, leaderId, - isLeader() ? Optional.of(store.getDataTree()) : Optional.absent()); + isLeader() ? Optional.of(store.getDataTree()) : Optional.absent(), + leaderPayloadVersion); } private void onDatastoreContext(DatastoreContext context) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java index d9a55ab1e9..20b7d818e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ShardLeaderStateChanged.java @@ -25,8 +25,8 @@ public class ShardLeaderStateChanged extends LeaderStateChanged { private final Optional localShardDataTree; public ShardLeaderStateChanged(@Nonnull String memberId, @Nonnull String leaderId, - @Nonnull Optional localShardDataTree) { - super(memberId, leaderId); + @Nonnull Optional localShardDataTree, short leaderPayloadVersion) { + super(memberId, leaderId, leaderPayloadVersion); this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java index 1ab03b216c..de852c0887 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -81,7 +81,7 @@ public class RoleChangeNotifierTest extends AbstractActorTest { TestActorRef notifierTestActorRef = TestActorRef.create( getSystem(), RoleChangeNotifier.getProps(actorId), actorId); - notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender()); + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1", (short)5), ActorRef.noSender()); // listener registers after the sate has been changed, ensure we sent the latest state change after a reply notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef()); @@ -91,12 +91,14 @@ public class RoleChangeNotifierTest extends AbstractActorTest { LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class); assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId()); + assertEquals("getLeaderPayloadVersion", 5, leaderStateChanged.getLeaderPayloadVersion()); - notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender()); + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2", (short)6), ActorRef.noSender()); leaderStateChanged = expectMsgClass(LeaderStateChanged.class); assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId()); + assertEquals("getLeaderPayloadVersion", 6, leaderStateChanged.getLeaderPayloadVersion()); }}; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index ae8db2b5f5..e95993de24 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -156,7 +156,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new ActorInitialized(), mockShardActor); DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef()); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), @@ -183,7 +184,7 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -205,7 +206,8 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent()), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent(), + DataStoreVersions.CURRENT_VERSION), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -257,7 +259,8 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -292,7 +295,8 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), @@ -390,7 +394,7 @@ public class ShardManagerTest extends AbstractActorTest { String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, - Optional.of(mock(DataTree.class))), mockShardActor2); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -457,10 +461,11 @@ public class ShardManagerTest extends AbstractActorTest { String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, - Optional.of(mock(DataTree.class))), mockShardActor1); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); shardManager1.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))), + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -538,10 +543,11 @@ public class ShardManagerTest extends AbstractActorTest { String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, - Optional.of(mock(DataTree.class))), mockShardActor1); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); shardManager1.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))), + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -567,8 +573,8 @@ public class ShardManagerTest extends AbstractActorTest { assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default")); - shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))), - mockShardActor1); + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); shardManager1.tell(new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1); @@ -683,7 +689,7 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, - Optional.of(mock(DataTree.class)))); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); @@ -705,7 +711,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); @@ -725,7 +732,8 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); -- 2.36.6