Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries 82/9182/4
authorMoiz Raja <moraja@cisco.com>
Sun, 20 Jul 2014 07:59:05 +0000 (00:59 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:47:28 +0000 (15:47 -0700)
The way things were coded in removeFrom assumed that when a set of entries was deleted
from the persistent log the sequence number would be reset. This is most likely to not be
the case.

Switched to usage which is more appropriate for akka-persistence namely to persist
the operation to delete entries in the akka-persistence journal and on recovery take the
appropriate action.

Also since now we are using the akka-persistence journal purely for persistence we can safely
add other stuff to the journal and so I added UpdateElectionTerm also to the journal. This
takes care of persisting the election information in the same journal.

Change-Id: I34202535711660f19b4cdd9e71a7b55fde7433ce
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 664ab5e7b29868014a8a913dc519313f90865de6..9c0a4fbffd5a03bf8e645e4d5aab9f72e66b507c 100644 (file)
@@ -38,4 +38,11 @@ public interface ElectionTerm {
      * @param votedFor
      */
     void update(long currentTerm, String votedFor);
+
+    /**
+     *
+     * @param currentTerm
+     * @param votedFor
+     */
+    void updateAndPersist(long currentTerm, String votedFor);
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java
deleted file mode 100644 (file)
index 6a598be..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft;
-
-public class ElectionTermImpl implements ElectionTerm{
-    /**
-     * Identifier of the actor whose election term information this is
-     */
-    private final String id;
-
-    private long currentTerm;
-
-    private String votedFor;
-
-    public ElectionTermImpl(String id) {
-        this.id = id;
-
-        // TODO: Read currentTerm from some persistent state
-        currentTerm = 0;
-
-        // TODO: Read votedFor from some file
-        votedFor = "";
-    }
-
-    public long getCurrentTerm() {
-        return currentTerm;
-    }
-
-    public String getVotedFor() {
-        return votedFor;
-    }
-
-    public void update(long currentTerm, String votedFor){
-        this.currentTerm = currentTerm;
-        this.votedFor = votedFor;
-
-        // TODO : Write to some persistent state
-    }
-}
index 9b7940838dd0b22980a8002a47d501a2327a7e1a..7814bad00b4d1423cea9847b06882d36138e0238 100644 (file)
@@ -99,9 +99,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
+        final String id1 = getSelf().path().toString();
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
-            id, new ElectionTermImpl(getSelf().path().toString()),
+            id, new ElectionTermImpl(),
             -1, -1, replicatedLog, peerAddresses, LOG);
     }
 
@@ -120,6 +121,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "Last index in log : " + replicatedLog.lastIndex());
@@ -316,6 +321,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         deleteMessages(sequenceNumber);
     }
 
+
     private class ReplicatedLogImpl implements ReplicatedLog {
         private final List<ReplicatedLogEntry> journal;
         private final Object snapshot;
@@ -375,11 +381,30 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return;
             }
-            for (int i = adjustedIndex;
-                 i < journal.size(); i++) {
-                deleteMessage(i);
-                journal.remove(i);
+
+            journal.subList(adjustedIndex , journal.size()).clear();
+        }
+
+
+        @Override public void removeFromAndPersist(long index) {
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+                return;
             }
+
+            // FIXME: Maybe this should be done after the command is saved
+            journal.subList(adjustedIndex , journal.size()).clear();
+
+            persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+                @Override public void apply(DeleteEntries param)
+                    throws Exception {
+                    //FIXME : Doing nothing for now
+                }
+            });
+
+
         }
 
         @Override public void append(
@@ -394,6 +419,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return entries;
             }
+
+
             for (int i = adjustedIndex;
                  i < journal.size(); i++) {
                 entries.add(journal.get(i));
@@ -522,6 +549,19 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private static class DeleteEntries implements Serializable {
+        private final int fromIndex;
+
+
+        public DeleteEntries(int fromIndex) {
+            this.fromIndex = fromIndex;
+        }
+
+        public int getFromIndex() {
+            return fromIndex;
+        }
+    }
+
 
     private static class Snapshot implements Serializable {
         private final Object state;
@@ -571,5 +611,58 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private class ElectionTermImpl implements ElectionTerm {
+        /**
+         * Identifier of the actor whose election term information this is
+         */
+        private long currentTerm = 0;
+        private String votedFor = null;
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+
+        @Override public void update(long currentTerm, String votedFor) {
+            LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        @Override
+        public void updateAndPersist(long currentTerm, String votedFor){
+            update(currentTerm, votedFor);
+            // FIXME : Maybe first persist then update the state
+            persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+
+                @Override public void apply(UpdateElectionTerm param)
+                    throws Exception {
+
+                }
+            });
+        }
+    }
+
+    private static class UpdateElectionTerm implements Serializable {
+        private final long currentTerm;
+        private final String votedFor;
+
+        public UpdateElectionTerm(long currentTerm, String votedFor) {
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+    }
 
 }
index 3de0de5131ae6c9195128b73a873cf8ca25bb9bf..29d6a4557db8dde765accbfe767b9b4f8debd290 100644 (file)
@@ -50,6 +50,15 @@ public interface ReplicatedLog {
      */
     void removeFrom(long index);
 
+
+    /**
+     * Remove all entries starting from the specified entry and persist the
+     * information to disk
+     *
+     * @param index
+     */
+    void removeFromAndPersist(long index);
+
     /**
      * Append an entry to the log
      * @param replicatedLogEntry
index c64855ff7d90f6d51fd622b3e90b3d8e56665205..1d78bb02276f94abc9847d95698910e33429d4d7 100644 (file)
@@ -176,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
             if (candidateLatest) {
                 grantVote = true;
-                context.getTermInformation().update(requestVote.getTerm(),
+                context.getTermInformation().updateAndPersist(requestVote.getTerm(),
                     requestVote.getCandidateId());
             }
         }
index 13f5f1d7a49ac8388da4a2926590d0ba0edcdd57..0c23db4c8a0184af45461ddfd3ec214ce117c460 100644 (file)
@@ -123,7 +123,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
                 return RaftState.Follower;
             }
         }
@@ -153,7 +153,7 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().update(currentTerm + 1, context.getId());
+        context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId());
 
         context.getLogger().debug("Starting new term " + (currentTerm+1));
 
index 823d563a74e2d6d6ac7132f3adfed10b0a866170..74069a18e1f879e71040a171eef8497889fd0f8c 100644 (file)
@@ -122,7 +122,7 @@ public class Follower extends AbstractRaftActorBehavior {
                                 + matchEntry.getIndex()
                         );
                         context.getReplicatedLog()
-                            .removeFrom(matchEntry.getIndex());
+                            .removeFromAndPersist(matchEntry.getIndex());
                         break;
                     }
                 }
@@ -195,7 +195,7 @@ public class Follower extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
             }
         }
 
index a647e17a201e2db6467895d68dbc0826b7772cb1..a9882a664767ad726c651611c6cff8ac078f6d1a 100644 (file)
@@ -202,7 +202,7 @@ public class Leader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
                 return RaftState.Follower;
             }
         }
index e02994bb06bed6a400f18b532f06dce93a92d934..77d0071917261ad0c2bfe03573c96a9b5769fb08 100644 (file)
@@ -43,7 +43,35 @@ public class MockRaftActorContext implements RaftActorContext {
         this.system = system;
         this.actor = actor;
 
-        electionTerm = new ElectionTermImpl(id);
+        final String id1 = id;
+        electionTerm = new ElectionTerm() {
+            /**
+             * Identifier of the actor whose election term information this is
+             */
+            private final String id = id1;
+            private long currentTerm = 0;
+            private String votedFor = "";
+
+            public long getCurrentTerm() {
+                return currentTerm;
+            }
+
+            public String getVotedFor() {
+                return votedFor;
+            }
+
+            public void update(long currentTerm, String votedFor){
+                this.currentTerm = currentTerm;
+                this.votedFor = votedFor;
+
+                // TODO : Write to some persistent state
+            }
+
+            @Override public void updateAndPersist(long currentTerm,
+                String votedFor) {
+                update(currentTerm, votedFor);
+            }
+        };
 
         initReplicatedLog();
     }
@@ -131,8 +159,9 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
 
+
     public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
+        private final List<ReplicatedLogEntry> log = new ArrayList<>();
 
         @Override public ReplicatedLogEntry get(long index) {
             if(index >= log.size() || index < 0){
@@ -168,9 +197,13 @@ public class MockRaftActorContext implements RaftActorContext {
             if(index >= log.size() || index < 0){
                 return;
             }
-            for(int i=(int) index ; i < log.size() ; i++) {
-                log.remove(i);
-            }
+
+            log.subList((int) index, log.size()).clear();
+            //log.remove((int) index);
+        }
+
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
 
         @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
index f6a6217d08bfe712bb688cb3701afb530451cb1c..1cf178bb006ad37d4c4bd84fa950dd81d7dece36 100644 (file)
@@ -379,7 +379,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertNotNull(log.get(2));
 
             // Check that the entry at index 2 has the new data
+            assertEquals("one", log.get(1).getData());
             assertEquals("two-1", log.get(2).getData());
+            assertEquals("three", log.get(3).getData());
             assertNotNull(log.get(3));
 
             // Also expect an AppendEntriesReply to be sent where success is false