Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 9b7940838dd0b22980a8002a47d501a2327a7e1a..7814bad00b4d1423cea9847b06882d36138e0238 100644 (file)
@@ -99,9 +99,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
+        final String id1 = getSelf().path().toString();
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(),
-            id, new ElectionTermImpl(getSelf().path().toString()),
+            id, new ElectionTermImpl(),
             -1, -1, replicatedLog, peerAddresses, LOG);
     }
 
@@ -120,6 +121,10 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof ReplicatedLogEntry) {
             replicatedLog.append((ReplicatedLogEntry) message);
+        } else if (message instanceof DeleteEntries) {
+            replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+        } else if (message instanceof UpdateElectionTerm) {
+            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "Last index in log : " + replicatedLog.lastIndex());
@@ -316,6 +321,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         deleteMessages(sequenceNumber);
     }
 
+
     private class ReplicatedLogImpl implements ReplicatedLog {
         private final List<ReplicatedLogEntry> journal;
         private final Object snapshot;
@@ -375,11 +381,30 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return;
             }
-            for (int i = adjustedIndex;
-                 i < journal.size(); i++) {
-                deleteMessage(i);
-                journal.remove(i);
+
+            journal.subList(adjustedIndex , journal.size()).clear();
+        }
+
+
+        @Override public void removeFromAndPersist(long index) {
+            int adjustedIndex = adjustedIndex(index);
+
+            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+                return;
             }
+
+            // FIXME: Maybe this should be done after the command is saved
+            journal.subList(adjustedIndex , journal.size()).clear();
+
+            persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+                @Override public void apply(DeleteEntries param)
+                    throws Exception {
+                    //FIXME : Doing nothing for now
+                }
+            });
+
+
         }
 
         @Override public void append(
@@ -394,6 +419,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
                 return entries;
             }
+
+
             for (int i = adjustedIndex;
                  i < journal.size(); i++) {
                 entries.add(journal.get(i));
@@ -522,6 +549,19 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private static class DeleteEntries implements Serializable {
+        private final int fromIndex;
+
+
+        public DeleteEntries(int fromIndex) {
+            this.fromIndex = fromIndex;
+        }
+
+        public int getFromIndex() {
+            return fromIndex;
+        }
+    }
+
 
     private static class Snapshot implements Serializable {
         private final Object state;
@@ -571,5 +611,58 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    private class ElectionTermImpl implements ElectionTerm {
+        /**
+         * Identifier of the actor whose election term information this is
+         */
+        private long currentTerm = 0;
+        private String votedFor = null;
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+
+        @Override public void update(long currentTerm, String votedFor) {
+            LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        @Override
+        public void updateAndPersist(long currentTerm, String votedFor){
+            update(currentTerm, votedFor);
+            // FIXME : Maybe first persist then update the state
+            persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
+
+                @Override public void apply(UpdateElectionTerm param)
+                    throws Exception {
+
+                }
+            });
+        }
+    }
+
+    private static class UpdateElectionTerm implements Serializable {
+        private final long currentTerm;
+        private final String votedFor;
+
+        public UpdateElectionTerm(long currentTerm, String votedFor) {
+            this.currentTerm = currentTerm;
+            this.votedFor = votedFor;
+        }
+
+        public long getCurrentTerm() {
+            return currentTerm;
+        }
+
+        public String getVotedFor() {
+            return votedFor;
+        }
+    }
 
 }