Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries 82/9182/4
authorMoiz Raja <moraja@cisco.com>
Sun, 20 Jul 2014 07:59:05 +0000 (00:59 -0700)
committerMoiz Raja <moraja@cisco.com>
Sat, 26 Jul 2014 22:47:28 +0000 (15:47 -0700)
The way things were coded in removeFrom assumed that when a set of entries was deleted
from the persistent log the sequence number would be reset. This is most likely to not be
the case.

Switched to usage which is more appropriate for akka-persistence namely to persist
the operation to delete entries in the akka-persistence journal and on recovery take the
appropriate action.

Also since now we are using the akka-persistence journal purely for persistence we can safely
add other stuff to the journal and so I added UpdateElectionTerm also to the journal. This
takes care of persisting the election information in the same journal.

Change-Id: I34202535711660f19b4cdd9e71a7b55fde7433ce
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 664ab5e..9c0a4fb 100644 (file)
@@ -38,4 +38,11 @@ public interface ElectionTerm {
      * @param votedFor
      */
     void update(long currentTerm, String votedFor);
+
+    /**
+     *
+     * @param currentTerm
+     * @param votedFor
+     */
+    void updateAndPersist(long currentTerm, String votedFor);
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java
deleted file mode 100644 (file)
index 6a598be..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft;
-
-public class ElectionTermImpl implements ElectionTerm{
-    /**
-     * Identifier of the actor whose election term information this is
-     */
-    private final String id;
-
-    private long currentTerm;
-
-    private String votedFor;
-
-    public ElectionTermImpl(String id) {
-        this.id = id;
-
-        // TODO: Read currentTerm from some persistent state
-        currentTerm = 0;
-
-        // TODO: Read votedFor from some file
-        votedFor = "";
-    }
-
-    public long getCurrentTerm() {
-        return currentTerm;
-    }
-
-    public String getVotedFor() {
-        return votedFor;
-    }
-
-    public void update(long currentTerm, String votedFor){
-        this.currentTerm = currentTerm;
-        this.votedFor = votedFor;
-
-        // TODO : Write to some persistent state
-    }
-}
index 9b79408..7814bad 100644 (file)
@@ -99,9 +99,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
+        final String id1 = getSelf().path().toString();
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
-            id, new ElectionTermImpl(getSelf().path().toString()),
+            id, new ElectionTermImpl(),
             -1, -1, replicatedLog, peerAddresses, LOG);
     }
 
@@ -120,6 +121,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "Last index in log : " + replicatedLog.lastIndex());
@@ -316,6 +321,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         deleteMessages(sequenceNumber);
     }
 
+
     private class ReplicatedLogImpl implements ReplicatedLog {
         private final List<ReplicatedLogEntry> journal;
         private final Object snapshot;
@@ -375,11 +381,30 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return;
             }
-            for (int i = adjustedIndex;
-                 i < journal.size(); i++) {
-                deleteMessage(i);
-                journal.remove(i);
+
+            journal.subList(adjustedIndex , journal.size()).clear();
+        }
+
+
+        @Override public void removeFromAndPersist(long index) {
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+                return;
             }
+
+            // FIXME: Maybe this should be done after the command is saved
+            journal.subList(adjustedIndex , journal.size()).clear();
+
+            persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+                @Override public void apply(DeleteEntries param)
+                    throws Exception {
+                    //FIXME : Doing nothing for now
+                }
+            });
+
+
         }
 
         @Override public void append(
@@ -394,6 +419,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return entries;
             }
+
+
             for (int i = adjustedIndex;
                  i < journal.size(); i++) {
                 entries.add(journal.get(i));
@@ -522,6 +549,19 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private static class DeleteEntries implements Serializable {
+        private final int fromIndex;
+
+
+        public DeleteEntries(int fromIndex) {
+            this.fromIndex = fromIndex;
+        }
+
+        public int getFromIndex() {
+            return fromIndex;
+        }
+    }
+
 
     private static class Snapshot implements Serializable {
         private final Object state;
@@ -571,5 +611,58 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private class ElectionTermImpl implements ElectionTerm {
+        /**
+         * Identifier of the actor whose election term information this is
+         */
+        private long currentTerm = 0;
+        private String votedFor = null;
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+
+        @Override public void update(long currentTerm, String votedFor) {
+            LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        @Override
+        public void updateAndPersist(long currentTerm, String votedFor){
+            update(currentTerm, votedFor);
+            // FIXME : Maybe first persist then update the state
+            persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+
+                @Override public void apply(UpdateElectionTerm param)
+                    throws Exception {
+
+                }
+            });
+        }
+    }
+
+    private static class UpdateElectionTerm implements Serializable {
+        private final long currentTerm;
+        private final String votedFor;
+
+        public UpdateElectionTerm(long currentTerm, String votedFor) {
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+    }
 
 }
index 3de0de5..29d6a45 100644 (file)
@@ -50,6 +50,15 @@ public interface ReplicatedLog {
      */
     void removeFrom(long index);
 
+
+    /**
+     * Remove all entries starting from the specified entry and persist the
+     * information to disk
+     *
+     * @param index
+     */
+    void removeFromAndPersist(long index);
+
     /**
      * Append an entry to the log
      * @param replicatedLogEntry
index c64855f..1d78bb0 100644 (file)
@@ -176,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
             if (candidateLatest) {
                 grantVote = true;
-                context.getTermInformation().update(requestVote.getTerm(),
+                context.getTermInformation().updateAndPersist(requestVote.getTerm(),
                     requestVote.getCandidateId());
             }
         }
index 13f5f1d..0c23db4 100644 (file)
@@ -123,7 +123,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
                 return RaftState.Follower;
             }
         }
@@ -153,7 +153,7 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().update(currentTerm + 1, context.getId());
+        context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId());
 
         context.getLogger().debug("Starting new term " + (currentTerm+1));
 
index 823d563..74069a1 100644 (file)
@@ -122,7 +122,7 @@ public class Follower extends AbstractRaftActorBehavior {
                                 + matchEntry.getIndex()
                         );
                         context.getReplicatedLog()
-                            .removeFrom(matchEntry.getIndex());
+                            .removeFromAndPersist(matchEntry.getIndex());
                         break;
                     }
                 }
@@ -195,7 +195,7 @@ public class Follower extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
             }
         }
 
index a647e17..a9882a6 100644 (file)
@@ -202,7 +202,7 @@ public class Leader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().update(rpc.getTerm(), null);
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
                 return RaftState.Follower;
             }
         }
index e02994b..77d0071 100644 (file)
@@ -43,7 +43,35 @@ public class MockRaftActorContext implements RaftActorContext {
         this.system = system;
         this.actor = actor;
 
-        electionTerm = new ElectionTermImpl(id);
+        final String id1 = id;
+        electionTerm = new ElectionTerm() {
+            /**
+             * Identifier of the actor whose election term information this is
+             */
+            private final String id = id1;
+            private long currentTerm = 0;
+            private String votedFor = "";
+
+            public long getCurrentTerm() {
+                return currentTerm;
+            }
+
+            public String getVotedFor() {
+                return votedFor;
+            }
+
+            public void update(long currentTerm, String votedFor){
+                this.currentTerm = currentTerm;
+                this.votedFor = votedFor;
+
+                // TODO : Write to some persistent state
+            }
+
+            @Override public void updateAndPersist(long currentTerm,
+                String votedFor) {
+                update(currentTerm, votedFor);
+            }
+        };
 
         initReplicatedLog();
     }
@@ -131,8 +159,9 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
 
+
     public static class SimpleReplicatedLog implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> log = new ArrayList<>(10000);
+        private final List<ReplicatedLogEntry> log = new ArrayList<>();
 
         @Override public ReplicatedLogEntry get(long index) {
             if(index >= log.size() || index < 0){
@@ -168,9 +197,13 @@ public class MockRaftActorContext implements RaftActorContext {
             if(index >= log.size() || index < 0){
                 return;
             }
-            for(int i=(int) index ; i < log.size() ; i++) {
-                log.remove(i);
-            }
+
+            log.subList((int) index, log.size()).clear();
+            //log.remove((int) index);
+        }
+
+        @Override public void removeFromAndPersist(long index) {
+            removeFrom(index);
         }
 
         @Override public void append(ReplicatedLogEntry replicatedLogEntry) {
index f6a6217..1cf178b 100644 (file)
@@ -379,7 +379,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertNotNull(log.get(2));
 
             // Check that the entry at index 2 has the new data
+            assertEquals("one", log.get(1).getData());
             assertEquals("two-1", log.get(2).getData());
+            assertEquals("three", log.get(3).getData());
             assertNotNull(log.get(3));
 
             // Also expect an AppendEntriesReply to be sent where success is false

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.