From: Tom Pantelis Date: Thu, 15 Dec 2016 15:28:07 +0000 (-0500) Subject: BUG-7033: Add batchHint flag to RaftActor#persistData X-Git-Tag: release/carbon~346 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b8da9f6fa8bf4284805349f4638ebdadf169ff5f BUG-7033: Add batchHint flag to RaftActor#persistData Added a batchHint flag where, if true, it elides sending AppendEntries immediately. AppendEntries will be sent on the next persistData call with batchHint false or at the next heartbeat interval. Change-Id: Id887ab46d5147e4fb5e50e52acaa8887d4048e1d Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 2523a640b2..6e8051fe55 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -70,7 +70,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, protected void handleNonRaftCommand(Object message) { if(message instanceof KeyValue){ if(isLeader()) { - persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message); + persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false); } else { if(getLeader() != null) { getLeader().forward(message, getContext()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 8ed92d4224..1c08b20f2d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -284,7 +284,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof Runnable) { ((Runnable)message).run(); } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload)message); + persistData(null, null, (NoopPayload)message, false); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); } @@ -514,11 +514,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } /** - * When a derived RaftActor needs to persist something it must call - * persistData. + * Persists the given Payload in the journal and replicates to any followers. After successful completion, + * {@link #applyState(ActorRef, Identifier, Object)} is notified. + * + * @param clientActor optional ActorRef that is provided via the applyState callback + * @param identifier the payload identifier + * @param data the payload data to persist + * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with + * subsequent payloads for efficiency. Otherwise the payload is immediately replicated. */ - protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { - + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data, + final boolean batchHint) { ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); @@ -556,7 +562,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (wasAppended && hasFollowers()) { // Send log entry for replication. - getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry)); + getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, + !batchHint)); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 782ecc06a8..5418f6b084 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -237,7 +237,8 @@ class RaftActorServerConfigurationSupport { operationContext.includeSelfInNewConfiguration(raftActor)); LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig()); - raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); + raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), + payload, false); currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout( operationContext.getLoggingContext()))); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java index a5381d5f22..1845f735b4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java @@ -18,12 +18,14 @@ public class Replicate implements Serializable { private final ActorRef clientActor; private final Identifier identifier; private final ReplicatedLogEntry replicatedLogEntry; + private final boolean sendImmediate; - public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) { - + public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry, + boolean sendImmediate) { this.clientActor = clientActor; this.identifier = identifier; this.replicatedLogEntry = replicatedLogEntry; + this.sendImmediate = sendImmediate; } public ActorRef getClientActor() { @@ -37,4 +39,8 @@ public class Replicate implements Serializable { public ReplicatedLogEntry getReplicatedLogEntry() { return replicatedLogEntry; } + + public boolean isSendImmediate() { + return sendImmediate; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index d97905cf11..4fae48e1aa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -571,8 +571,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), - replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); + log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(), + replicate.isSendImmediate()); // Create a tracker entry we will use this later to notify the // client actor @@ -589,7 +590,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { applyLogToStateMachine(logIndex); } - if (!followerToLog.isEmpty()) { + if (replicate.isSendImmediate() && !followerToLog.isEmpty()) { sendAppendEntries(0, false); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 9bc8a53615..4a053137a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -116,12 +116,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest public void handleCommand(Object message) { if (message instanceof MockPayload) { MockPayload payload = (MockPayload) message; - super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload); + super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false); return; } if (message instanceof ServerConfigurationPayload) { - super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message); + super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message, false); return; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index cdb3e05329..8ec50754e8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -885,7 +885,7 @@ public class RaftActorTest extends AbstractActorTest { // Persist another entry (this will cause a CaptureSnapshot to be triggered leaderActor.persistData(mockActorRef, new MockIdentifier("x"), - new MockRaftActorContext.MockPayload("duh")); + new MockRaftActorContext.MockPayload("duh"), false); // Now send a CaptureSnapshotReply mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef); @@ -1294,7 +1294,8 @@ public class RaftActorTest extends AbstractActorTest { Leader leader = new Leader(leaderActor.getRaftActorContext()); leaderActor.setCurrentBehavior(leader); - leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1")); + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"), + false); ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0); assertNotNull("ReplicatedLogEntry not found", logEntry); @@ -1312,4 +1313,42 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex()); assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied()); } + + @Test + public void testReplicateWithBatchHint() throws Exception { + final String leaderId = factory.generateActorId("leader-"); + final String followerId = factory.generateActorId("follower-"); + + final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class)); + + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef leaderActorRef = factory.createTestActor( + MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config), + leaderId); + MockRaftActor leaderActor = leaderActorRef.underlyingActor(); + leaderActor.waitForInitializeBehaviorComplete(); + + leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId); + + Leader leader = new Leader(leaderActor.getRaftActorContext()); + leaderActor.setCurrentBehavior(leader); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0)); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true); + MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true); + MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500); + + leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("AppendEntries size", 3, appendEntries.getEntries().size()); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index a842069c54..fbfdea0249 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -155,7 +155,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); } @Test @@ -518,7 +518,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().append(newEntry); final Identifier id = new MockIdentifier("state-id"); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new Replicate(leaderActor, id, newEntry, true)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -648,7 +649,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertTrue(raftBehavior instanceof Leader); @@ -693,7 +694,7 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -706,7 +707,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -769,7 +770,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it should not initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1f786e6a20..23a5a0e20b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -459,7 +459,7 @@ public class Shard extends RaftActor { // applyState() will be invoked once consensus is reached on the payload void persistPayload(final TransactionIdentifier transactionId, final Payload payload) { // We are faking the sender - persistData(self(), transactionId, payload); + persistData(self(), transactionId, payload, false); } private void handleCommitTransaction(final CommitTransaction commit) {