From: Moiz Raja Date: Sun, 20 Jul 2014 07:59:05 +0000 (-0700) Subject: Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries X-Git-Tag: release/helium~431 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3019650e87a3fc05f80e8f6359e01ca5f1c5f197 Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries The way things were coded in removeFrom assumed that when a set of entries was deleted from the persistent log the sequence number would be reset. This is most likely to not be the case. Switched to usage which is more appropriate for akka-persistence namely to persist the operation to delete entries in the akka-persistence journal and on recovery take the appropriate action. Also since now we are using the akka-persistence journal purely for persistence we can safely add other stuff to the journal and so I added UpdateElectionTerm also to the journal. This takes care of persisting the election information in the same journal. Change-Id: I34202535711660f19b4cdd9e71a7b55fde7433ce Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java index 664ab5e7b2..9c0a4fbffd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTerm.java @@ -38,4 +38,11 @@ public interface ElectionTerm { * @param votedFor */ void update(long currentTerm, String votedFor); + + /** + * + * @param currentTerm + * @param votedFor + */ + void updateAndPersist(long currentTerm, String votedFor); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java deleted file mode 100644 index 6a598be680..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ElectionTermImpl.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft; - -public class ElectionTermImpl implements ElectionTerm{ - /** - * Identifier of the actor whose election term information this is - */ - private final String id; - - private long currentTerm; - - private String votedFor; - - public ElectionTermImpl(String id) { - this.id = id; - - // TODO: Read currentTerm from some persistent state - currentTerm = 0; - - // TODO: Read votedFor from some file - votedFor = ""; - } - - public long getCurrentTerm() { - return currentTerm; - } - - public String getVotedFor() { - return votedFor; - } - - public void update(long currentTerm, String votedFor){ - this.currentTerm = currentTerm; - this.votedFor = votedFor; - - // TODO : Write to some persistent state - } -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 9b7940838d..7814bad00b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -99,9 +99,10 @@ public abstract class RaftActor extends UntypedPersistentActor { public RaftActor(String id, Map peerAddresses) { + final String id1 = getSelf().path().toString(); context = new RaftActorContextImpl(this.getSelf(), this.getContext(), - id, new ElectionTermImpl(getSelf().path().toString()), + id, new ElectionTermImpl(), -1, -1, replicatedLog, peerAddresses, LOG); } @@ -120,6 +121,10 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof ReplicatedLogEntry) { replicatedLog.append((ReplicatedLogEntry) message); + } else if (message instanceof DeleteEntries) { + replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); } else if (message instanceof RecoveryCompleted) { LOG.debug( "Last index in log : " + replicatedLog.lastIndex()); @@ -316,6 +321,7 @@ public abstract class RaftActor extends UntypedPersistentActor { deleteMessages(sequenceNumber); } + private class ReplicatedLogImpl implements ReplicatedLog { private final List journal; private final Object snapshot; @@ -375,11 +381,30 @@ public abstract class RaftActor extends UntypedPersistentActor { if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return; } - for (int i = adjustedIndex; - i < journal.size(); i++) { - deleteMessage(i); - journal.remove(i); + + journal.subList(adjustedIndex , journal.size()).clear(); + } + + + @Override public void removeFromAndPersist(long index) { + int adjustedIndex = adjustedIndex(index); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + return; } + + // FIXME: Maybe this should be done after the command is saved + journal.subList(adjustedIndex , journal.size()).clear(); + + persist(new DeleteEntries(adjustedIndex), new Procedure(){ + + @Override public void apply(DeleteEntries param) + throws Exception { + //FIXME : Doing nothing for now + } + }); + + } @Override public void append( @@ -394,6 +419,8 @@ public abstract class RaftActor extends UntypedPersistentActor { if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { return entries; } + + for (int i = adjustedIndex; i < journal.size(); i++) { entries.add(journal.get(i)); @@ -522,6 +549,19 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + private static class DeleteEntries implements Serializable { + private final int fromIndex; + + + public DeleteEntries(int fromIndex) { + this.fromIndex = fromIndex; + } + + public int getFromIndex() { + return fromIndex; + } + } + private static class Snapshot implements Serializable { private final Object state; @@ -571,5 +611,58 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + private class ElectionTermImpl implements ElectionTerm { + /** + * Identifier of the actor whose election term information this is + */ + private long currentTerm = 0; + private String votedFor = null; + + public long getCurrentTerm() { + return currentTerm; + } + + public String getVotedFor() { + return votedFor; + } + + @Override public void update(long currentTerm, String votedFor) { + LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor); + + this.currentTerm = currentTerm; + this.votedFor = votedFor; + } + + @Override + public void updateAndPersist(long currentTerm, String votedFor){ + update(currentTerm, votedFor); + // FIXME : Maybe first persist then update the state + persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ + + @Override public void apply(UpdateElectionTerm param) + throws Exception { + + } + }); + } + } + + private static class UpdateElectionTerm implements Serializable { + private final long currentTerm; + private final String votedFor; + + public UpdateElectionTerm(long currentTerm, String votedFor) { + this.currentTerm = currentTerm; + this.votedFor = votedFor; + } + + public long getCurrentTerm() { + return currentTerm; + } + + public String getVotedFor() { + return votedFor; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index 3de0de5131..29d6a4557d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -50,6 +50,15 @@ public interface ReplicatedLog { */ void removeFrom(long index); + + /** + * Remove all entries starting from the specified entry and persist the + * information to disk + * + * @param index + */ + void removeFromAndPersist(long index); + /** * Append an entry to the log * @param replicatedLogEntry diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index c64855ff7d..1d78bb0227 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -176,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (candidateLatest) { grantVote = true; - context.getTermInformation().update(requestVote.getTerm(), + context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 13f5f1d7a4..0c23db4c8a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -123,7 +123,7 @@ public class Candidate extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); return RaftState.Follower; } } @@ -153,7 +153,7 @@ public class Candidate extends AbstractRaftActorBehavior { // Increment the election term and vote for self long currentTerm = context.getTermInformation().getCurrentTerm(); - context.getTermInformation().update(currentTerm + 1, context.getId()); + context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId()); context.getLogger().debug("Starting new term " + (currentTerm+1)); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 823d563a74..74069a18e1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -122,7 +122,7 @@ public class Follower extends AbstractRaftActorBehavior { + matchEntry.getIndex() ); context.getReplicatedLog() - .removeFrom(matchEntry.getIndex()); + .removeFromAndPersist(matchEntry.getIndex()); break; } } @@ -195,7 +195,7 @@ public class Follower extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index a647e17a20..a9882a6647 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -202,7 +202,7 @@ public class Leader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - context.getTermInformation().update(rpc.getTerm(), null); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); return RaftState.Follower; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index e02994bb06..77d0071917 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -43,7 +43,35 @@ public class MockRaftActorContext implements RaftActorContext { this.system = system; this.actor = actor; - electionTerm = new ElectionTermImpl(id); + final String id1 = id; + electionTerm = new ElectionTerm() { + /** + * Identifier of the actor whose election term information this is + */ + private final String id = id1; + private long currentTerm = 0; + private String votedFor = ""; + + public long getCurrentTerm() { + return currentTerm; + } + + public String getVotedFor() { + return votedFor; + } + + public void update(long currentTerm, String votedFor){ + this.currentTerm = currentTerm; + this.votedFor = votedFor; + + // TODO : Write to some persistent state + } + + @Override public void updateAndPersist(long currentTerm, + String votedFor) { + update(currentTerm, votedFor); + } + }; initReplicatedLog(); } @@ -131,8 +159,9 @@ public class MockRaftActorContext implements RaftActorContext { } + public static class SimpleReplicatedLog implements ReplicatedLog { - private final List log = new ArrayList<>(10000); + private final List log = new ArrayList<>(); @Override public ReplicatedLogEntry get(long index) { if(index >= log.size() || index < 0){ @@ -168,9 +197,13 @@ public class MockRaftActorContext implements RaftActorContext { if(index >= log.size() || index < 0){ return; } - for(int i=(int) index ; i < log.size() ; i++) { - log.remove(i); - } + + log.subList((int) index, log.size()).clear(); + //log.remove((int) index); + } + + @Override public void removeFromAndPersist(long index) { + removeFrom(index); } @Override public void append(ReplicatedLogEntry replicatedLogEntry) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index f6a6217d08..1cf178bb00 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -379,7 +379,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNotNull(log.get(2)); // Check that the entry at index 2 has the new data + assertEquals("one", log.get(1).getData()); assertEquals("two-1", log.get(2).getData()); + assertEquals("three", log.get(3).getData()); assertNotNull(log.get(3)); // Also expect an AppendEntriesReply to be sent where success is false