BUG-7033: Add batchHint flag to RaftActor#persistData 39/49439/5
authorTom Pantelis <tpanteli@brocade.com>
Thu, 15 Dec 2016 15:28:07 +0000 (10:28 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 20 Dec 2016 15:21:07 +0000 (15:21 +0000)
Added a batchHint flag where, if true, it elides sending AppendEntries
immediately. AppendEntries will be sent on the next persistData call
with batchHint false or at the next heartbeat interval.

Change-Id: Id887ab46d5147e4fb5e50e52acaa8887d4048e1d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index 2523a64..6e8051f 100644 (file)
@@ -70,7 +70,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     protected void handleNonRaftCommand(Object message) {
         if(message instanceof KeyValue){
             if(isLeader()) {
-                persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message);
+                persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
             } else {
                 if(getLeader() != null) {
                     getLeader().forward(message, getContext());
index 8ed92d4..1c08b20 100644 (file)
@@ -284,7 +284,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message);
+            persistData(null, null, (NoopPayload)message, false);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
@@ -514,11 +514,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
+     *
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
         ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
@@ -556,7 +562,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (wasAppended && hasFollowers()) {
             // Send log entry for replication.
-            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
         }
     }
 
index 782ecc0..5418f6b 100644 (file)
@@ -237,7 +237,8 @@ class RaftActorServerConfigurationSupport {
                     operationContext.includeSelfInNewConfiguration(raftActor));
             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
 
-            raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
+            raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(),
+                    payload, false);
 
             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(
                     operationContext.getLoggingContext())));
index a5381d5..1845f73 100644 (file)
@@ -18,12 +18,14 @@ public class Replicate implements Serializable {
     private final ActorRef clientActor;
     private final Identifier identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
+    private final boolean sendImmediate;
 
-    public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry) {
-
+    public Replicate(ActorRef clientActor, Identifier identifier, ReplicatedLogEntry replicatedLogEntry,
+            boolean sendImmediate) {
         this.clientActor = clientActor;
         this.identifier = identifier;
         this.replicatedLogEntry = replicatedLogEntry;
+        this.sendImmediate = sendImmediate;
     }
 
     public ActorRef getClientActor() {
@@ -37,4 +39,8 @@ public class Replicate implements Serializable {
     public ReplicatedLogEntry getReplicatedLogEntry() {
         return replicatedLogEntry;
     }
+
+    public boolean isSendImmediate() {
+        return sendImmediate;
+    }
 }
index d97905c..4fae48e 100644 (file)
@@ -571,8 +571,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
-                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
+        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
+                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
+                replicate.isSendImmediate());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
@@ -589,7 +590,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(logIndex);
         }
 
-        if (!followerToLog.isEmpty()) {
+        if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
             sendAppendEntries(0, false);
         }
     }
index 9bc8a53..4a05313 100644 (file)
@@ -116,12 +116,12 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         public void handleCommand(Object message) {
             if (message instanceof MockPayload) {
                 MockPayload payload = (MockPayload) message;
-                super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload);
+                super.persistData(collectorActor, new MockIdentifier(payload.toString()), payload, false);
                 return;
             }
 
             if (message instanceof ServerConfigurationPayload) {
-                super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message);
+                super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload) message, false);
                 return;
             }
 
index cdb3e05..8ec5075 100644 (file)
@@ -885,7 +885,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         // Persist another entry (this will cause a CaptureSnapshot to be triggered
         leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
-                new MockRaftActorContext.MockPayload("duh"));
+                new MockRaftActorContext.MockPayload("duh"), false);
 
         // Now send a CaptureSnapshotReply
         mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
@@ -1294,7 +1294,8 @@ public class RaftActorTest extends AbstractActorTest {
         Leader leader = new Leader(leaderActor.getRaftActorContext());
         leaderActor.setCurrentBehavior(leader);
 
-        leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"));
+        leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockRaftActorContext.MockPayload("1"),
+                false);
 
         ReplicatedLogEntry logEntry = leaderActor.getReplicatedLog().get(0);
         assertNotNull("ReplicatedLogEntry not found", logEntry);
@@ -1312,4 +1313,42 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("getCommitIndex", 0, leaderActor.getRaftActorContext().getCommitIndex());
         assertEquals("getLastApplied", 0, leaderActor.getRaftActorContext().getLastApplied());
     }
+
+    @Test
+    public void testReplicateWithBatchHint() throws Exception {
+        final String leaderId = factory.generateActorId("leader-");
+        final String followerId = factory.generateActorId("follower-");
+
+        final ActorRef followerActor = factory.createActor(Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+        TestActorRef<MockRaftActor> leaderActorRef = factory.createTestActor(
+                MockRaftActor.props(leaderId, ImmutableMap.of(followerId, followerActor.path().toString()), config),
+                    leaderId);
+        MockRaftActor leaderActor = leaderActorRef.underlyingActor();
+        leaderActor.waitForInitializeBehaviorComplete();
+
+        leaderActor.getRaftActorContext().getTermInformation().update(1, leaderId);
+
+        Leader leader = new Leader(leaderActor.getRaftActorContext());
+        leaderActor.setCurrentBehavior(leader);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        leaderActor.onReceiveCommand(new AppendEntriesReply(followerId, 1, true, -1, -1, (short)0));
+
+        leaderActor.persistData(leaderActorRef, new MockIdentifier("1"), new MockPayload("1"), true);
+        MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+        leaderActor.persistData(leaderActorRef, new MockIdentifier("2"), new MockPayload("2"), true);
+        MessageCollectorActor.assertNoneMatching(followerActor, AppendEntries.class, 500);
+
+        leaderActor.persistData(leaderActorRef, new MockIdentifier("3"), new MockPayload("3"), false);
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("AppendEntries size", 3, appendEntries.getEntries().size());
+    }
 }
index a842069..fbfdea0 100644 (file)
@@ -155,7 +155,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
         SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
         actorContext.getReplicatedLog().append(newEntry);
-        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
     }
 
     @Test
@@ -518,7 +518,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getReplicatedLog().append(newEntry);
 
         final Identifier id = new MockIdentifier("state-id");
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(leaderActor, id, newEntry, true));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -648,7 +649,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -693,7 +694,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -706,7 +707,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
@@ -769,7 +770,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it should not initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
 
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
index 1f786e6..23a5a0e 100644 (file)
@@ -459,7 +459,7 @@ public class Shard extends RaftActor {
     // applyState() will be invoked once consensus is reached on the payload
     void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
         // We are faking the sender
-        persistData(self(), transactionId, payload);
+        persistData(self(), transactionId, payload, false);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.