From fdab53ef9033fc83c812f7d3d6d3327d3d176f0f Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Sun, 13 Jul 2014 18:52:50 -0700 Subject: [PATCH] Test RaftActor using a simple program Change-Id: Id0a8091c8f6ee89178bf774be398647c60a25d03 Signed-off-by: Moiz Raja --- .../cluster/example/ClientActor.java | 46 +++ .../cluster/example/ExampleActor.java | 55 +++- .../controller/cluster/example/Main.java | 67 ++++ .../cluster/example/messages/KeyValue.java | 29 ++ .../example/messages/KeyValueSaved.java | 12 + .../cluster/raft/ClientRequestTracker.java | 34 ++ .../raft/ClientRequestTrackerImpl.java | 40 +++ .../cluster/raft/FollowerLogInformation.java | 14 + .../raft/FollowerLogInformationImpl.java | 12 + .../controller/cluster/raft/RaftActor.java | 222 +++++++++++++- .../cluster/raft/RaftActorContext.java | 22 ++ .../cluster/raft/RaftActorContextImpl.java | 24 +- .../cluster/raft/ReplicatedLog.java | 33 ++ .../behaviors/AbstractRaftActorBehavior.java | 136 ++++---- .../cluster/raft/behaviors/Candidate.java | 47 +-- .../cluster/raft/behaviors/Follower.java | 115 ++++++- .../cluster/raft/behaviors/Leader.java | 215 +++++++++++-- .../raft/behaviors/RaftActorBehavior.java | 15 +- .../raft/client/messages/FindLeader.java | 13 + .../raft/client/messages/FindLeaderReply.java | 21 ++ .../raft/internal/messages/ApplyState.java | 37 +++ .../raft/internal/messages/Replicate.java | 38 +++ .../cluster/raft/messages/AppendEntries.java | 10 + .../raft/messages/AppendEntriesReply.java | 29 +- .../src/main/resources/application.conf | 12 + .../cluster/raft/MockRaftActorContext.java | 84 +++++ .../AbstractRaftActorBehaviorTest.java | 290 ++---------------- .../cluster/raft/behaviors/CandidateTest.java | 75 +++-- .../cluster/raft/behaviors/FollowerTest.java | 267 ++++++++++++++++ .../cluster/raft/behaviors/LeaderTest.java | 19 +- 30 files changed, 1602 insertions(+), 431 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java new file mode 100644 index 0000000000..2560f16588 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Creator; +import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; + +public class ClientActor extends UntypedActor { + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + private final ActorRef target; + + public ClientActor(ActorRef target){ + this.target = target; + } + + public static Props props(final ActorRef target){ + return Props.create(new Creator(){ + + @Override public ClientActor create() throws Exception { + return new ClientActor(target); + } + }); + } + + @Override public void onReceive(Object message) throws Exception { + if(message instanceof KeyValue) { + target.tell(message, getSelf()); + } else if(message instanceof KeyValueSaved){ + LOG.info("KeyValue saved"); + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 3c8e12b8b1..35a2c98bd4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -8,27 +8,68 @@ package org.opendaylight.controller.cluster.example; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; +import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.raft.RaftActor; +import java.util.HashMap; +import java.util.Map; + /** * A sample actor showing how the RaftActor is to be extended */ public class ExampleActor extends RaftActor { - public ExampleActor(String id) { - super(id); + + private final Map state = new HashMap(); + + private long persistIdentifier = 1; + + + public ExampleActor(String id, Map peerAddresses) { + super(id, peerAddresses); + } + + public static Props props(final String id, final Map peerAddresses){ + return Props.create(new Creator(){ + + @Override public ExampleActor create() throws Exception { + return new ExampleActor(id, peerAddresses); + } + }); } @Override public void onReceiveCommand(Object message){ - /* - Here the extended class does whatever it needs to do. - If it cannot handle a message then it passes it on to the super - class for handling - */ + if(message instanceof KeyValue){ + + if(isLeader()) { + String persistId = Long.toString(persistIdentifier++); + persistData(getSender(), persistId, message); + } else { + getLeader().forward(message, getContext()); + } + } super.onReceiveCommand(message); } + @Override protected void applyState(ActorRef clientActor, String identifier, + Object data) { + if(data instanceof KeyValue){ + KeyValue kv = (KeyValue) data; + state.put(kv.getKey(), kv.getValue()); + if(clientActor != null) { + clientActor.tell(new KeyValueSaved(), getSelf()); + } + } + } + @Override public void onReceiveRecover(Object message) { super.onReceiveRecover(message); } + @Override public String persistenceId() { + return getId(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java new file mode 100644 index 0000000000..27d083f2b3 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.opendaylight.controller.cluster.example.messages.KeyValue; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +public class Main { + private static final ActorSystem actorSystem = ActorSystem.create(); + // Create three example actors + private static Map allPeers = new HashMap<>(); + + static { + allPeers.put("example-1", "akka://default/user/example-1"); + allPeers.put("example-2", "akka://default/user/example-2"); + allPeers.put("example-3", "akka://default/user/example-3"); + } + + public static void main(String[] args) throws Exception{ + ActorRef example1Actor = + actorSystem.actorOf(ExampleActor.props("example-1", + withoutPeer("example-1")), "example-1"); + + ActorRef example2Actor = + actorSystem.actorOf(ExampleActor.props("example-2", + withoutPeer("example-2")), "example-2"); + + ActorRef example3Actor = + actorSystem.actorOf(ExampleActor.props("example-3", + withoutPeer("example-3")), "example-3"); + + ActorRef clientActor = actorSystem.actorOf(ClientActor.props(example1Actor)); + BufferedReader br = + new BufferedReader(new InputStreamReader(System.in)); + + while(true) { + System.out.print("Enter Integer (0 to exit):"); + try { + int i = Integer.parseInt(br.readLine()); + if(i == 0){ + System.exit(0); + } + clientActor.tell(new KeyValue("key " + i, "value " + i), null); + } catch (NumberFormatException nfe) { + System.err.println("Invalid Format!"); + } + } + } + + private static Map withoutPeer(String peerId) { + Map without = new HashMap<>(allPeers); + without.remove(peerId); + return without; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java new file mode 100644 index 0000000000..05e9ba7eab --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example.messages; + +import java.io.Serializable; + +public class KeyValue implements Serializable{ + private final String key; + private final String value; + + public KeyValue(String key, String value){ + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java new file mode 100644 index 0000000000..e10e5a7b43 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.example.messages; + +public class KeyValueSaved { +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java new file mode 100644 index 0000000000..4972b348ff --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTracker.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; + +public interface ClientRequestTracker { + /** + * The client actor who is waiting for a response + * + * @return + */ + ActorRef getClientActor(); + + /** + * + * @return + */ + String getIdentifier(); + + /** + * The index of the log entry which needs to be replicated + * + * @return + */ + long getIndex(); + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java new file mode 100644 index 0000000000..15de6d01a7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ClientRequestTrackerImpl.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; + +public class ClientRequestTrackerImpl implements ClientRequestTracker { + + private final ActorRef clientActor; + private final String identifier; + private final long logIndex; + + public ClientRequestTrackerImpl(ActorRef clientActor, String identifier, + long logIndex) { + + this.clientActor = clientActor; + + this.identifier = identifier; + + this.logIndex = logIndex; + } + + @Override public ActorRef getClientActor() { + return clientActor; + } + + @Override public long getIndex() { + return logIndex; + } + + public String getIdentifier() { + return identifier; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 5f185cbb9c..f3de983538 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -21,12 +21,26 @@ public interface FollowerLogInformation { */ public long incrNextIndex(); + /** + * Decrement the value of the nextIndex + * @return + */ + public long decrNextIndex(); + + /** + * + * @param nextIndex + */ + void setNextIndex(long nextIndex); + /** * Increment the value of the matchIndex * @return */ public long incrMatchIndex(); + public void setMatchIndex(long matchIndex); + /** * The identifier of the follower * This could simply be the url of the remote actor diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 6aa76508fc..94f9a53a85 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -29,10 +29,22 @@ public class FollowerLogInformationImpl implements FollowerLogInformation{ return nextIndex.incrementAndGet(); } + @Override public long decrNextIndex() { + return nextIndex.decrementAndGet(); + } + + @Override public void setNextIndex(long nextIndex) { + this.nextIndex.set(nextIndex); + } + public long incrMatchIndex(){ return matchIndex.incrementAndGet(); } + @Override public void setMatchIndex(long matchIndex) { + this.matchIndex.set(matchIndex); + } + public String getId() { return id; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 186de02fed..826faf7414 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -8,13 +8,26 @@ package org.opendaylight.controller.cluster.raft; -import akka.persistence.UntypedEventsourcedProcessor; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Procedure; +import akka.persistence.RecoveryCompleted; +import akka.persistence.UntypedPersistentActor; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; +import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; -import java.util.Collections; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -58,7 +71,9 @@ import java.util.Collections; * * UntypeEventSourceProcessor */ -public abstract class RaftActor extends UntypedEventsourcedProcessor { +public abstract class RaftActor extends UntypedPersistentActor { + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); /** * The current state determines the current behavior of a RaftActor @@ -72,51 +87,226 @@ public abstract class RaftActor extends UntypedEventsourcedProcessor { */ private RaftActorContext context; - public RaftActor(String id){ + /** + * The in-memory journal + */ + private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl(); + + + + public RaftActor(String id, Map peerAddresses){ context = new RaftActorContextImpl(this.getSelf(), this.getContext(), - id, new ElectionTermImpl(id), - 0, 0, new ReplicatedLogImpl()); + id, new ElectionTermImpl(getSelf().path().toString()), + -1, -1, replicatedLog, peerAddresses, LOG); currentBehavior = switchBehavior(RaftState.Follower); } @Override public void onReceiveRecover(Object message) { - throw new UnsupportedOperationException("onReceiveRecover"); + if(message instanceof ReplicatedLogEntry) { + replicatedLog.append((ReplicatedLogEntry) message); + } else if(message instanceof RecoveryCompleted){ + LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex()); + } } @Override public void onReceiveCommand(Object message) { - RaftState state = currentBehavior.handleMessage(getSender(), message); - currentBehavior = switchBehavior(state); + if(message instanceof ApplyState){ + + ApplyState applyState = (ApplyState) message; + + LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex()); + + applyState(applyState.getClientActor(), applyState.getIdentifier(), + applyState.getReplicatedLogEntry().getData()); + } else if(message instanceof FindLeader){ + getSender().tell(new FindLeaderReply( + context.getPeerAddress(currentBehavior.getLeaderId())), + getSelf()); + } else { + RaftState state = + currentBehavior.handleMessage(getSender(), message); + currentBehavior = switchBehavior(state); + } } private RaftActorBehavior switchBehavior(RaftState state){ + if(currentBehavior != null) { + if (currentBehavior.state() == state) { + return currentBehavior; + } + LOG.info("Switching from state " + currentBehavior.state() + " to " + + state); + + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.error(e, "Failed to close behavior : " + currentBehavior.state()); + } + + } else { + LOG.info("Switching behavior to " + state); + } RaftActorBehavior behavior = null; if(state == RaftState.Candidate){ - behavior = new Candidate(context, Collections.EMPTY_LIST); + behavior = new Candidate(context); } else if(state == RaftState.Follower){ behavior = new Follower(context); } else { - behavior = new Leader(context, Collections.EMPTY_LIST); + behavior = new Leader(context); } return behavior; } + /** + * When a derived RaftActor needs to persist something it must call + * persistData. + * + * @param clientActor + * @param identifier + * @param data + */ + protected void persistData(ActorRef clientActor, String identifier, Object data){ + LOG.debug("Persist data " + identifier); + ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( + context.getReplicatedLog().lastIndex() + 1, + context.getTermInformation().getCurrentTerm(), data); + + replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry); + } + + protected abstract void applyState(ActorRef clientActor, String identifier, Object data); + + protected String getId(){ + return context.getId(); + } + + protected boolean isLeader(){ + return context.getId().equals(currentBehavior.getLeaderId()); + } + + protected ActorSelection getLeader(){ + String leaderId = currentBehavior.getLeaderId(); + String peerAddress = context.getPeerAddress(leaderId); + LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress); + return context.actorSelection(peerAddress); + } + private class ReplicatedLogImpl implements ReplicatedLog { + private final List journal = new ArrayList(); + private long snapshotIndex = 0; + private Object snapShot = null; + @Override public ReplicatedLogEntry get(long index) { - throw new UnsupportedOperationException("get"); + if(index < 0 || index >= journal.size()){ + return null; + } + + return journal.get((int) (index - snapshotIndex)); } @Override public ReplicatedLogEntry last() { - throw new UnsupportedOperationException("last"); + if(journal.size() == 0){ + return null; + } + return get(journal.size() - 1); } + @Override public long lastIndex() { + if(journal.size() == 0){ + return -1; + } + + return last().getIndex(); + } + + @Override public long lastTerm() { + if(journal.size() == 0){ + return -1; + } + + return last().getTerm(); + } + + @Override public void removeFrom(long index) { - throw new UnsupportedOperationException("removeFrom"); + if(index < 0 || index >= journal.size()){ + return; + } + for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){ + deleteMessage(i); + journal.remove(i); + } + } + + @Override public void append(final ReplicatedLogEntry replicatedLogEntry) { + journal.add(replicatedLogEntry); + } + + @Override public List getFrom(long index) { + List entries = new ArrayList<>(100); + if(index < 0 || index >= journal.size()){ + return entries; + } + for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){ + entries.add(journal.get(i)); + } + return entries; + } + + @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){ + appendAndPersist(null, null, replicatedLogEntry); + } + + public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){ + context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex()); + // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs + journal.add(replicatedLogEntry); + persist(replicatedLogEntry, + new Procedure() { + public void apply(ReplicatedLogEntry evt) throws Exception { + // Send message for replication + if(clientActor != null) { + currentBehavior.handleMessage(getSelf(), + new Replicate(clientActor, identifier, + replicatedLogEntry)); + } + } + }); } - @Override public void append(ReplicatedLogEntry replicatedLogEntry) { - throw new UnsupportedOperationException("append"); + @Override public long size() { + return journal.size() + snapshotIndex; } } + + private static class ReplicatedLogImplEntry implements ReplicatedLogEntry, + Serializable { + + private final long index; + private final long term; + private final Object payload; + + public ReplicatedLogImplEntry(long index, long term, Object payload){ + + this.index = index; + this.term = term; + this.payload = payload; + } + + @Override public Object getData() { + return payload; + } + + @Override public long getTerm() { + return term; + } + + @Override public long getIndex() { + return index; + } + } + + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index 4bc9162fb5..cd5865b02e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -12,6 +12,9 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.event.LoggingAdapter; + +import java.util.Map; /** * The RaftActorContext contains that portion of the RaftActors state that @@ -91,4 +94,23 @@ public interface RaftActorContext { * @return The ActorSystem associated with this context */ ActorSystem getActorSystem(); + + /** + * + * @return + */ + LoggingAdapter getLogger(); + + /** + * Get a mapping of peer id's their addresses + * @return + */ + Map getPeerAddresses(); + + /** + * + * @param peerId + * @return + */ + String getPeerAddress(String peerId); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 845011a7e3..03534d61a0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -13,6 +13,9 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActorContext; +import akka.event.LoggingAdapter; + +import java.util.Map; public class RaftActorContextImpl implements RaftActorContext{ @@ -30,10 +33,15 @@ public class RaftActorContextImpl implements RaftActorContext{ private final ReplicatedLog replicatedLog; + private final Map peerAddresses; + + private final LoggingAdapter LOG; + + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, - long lastApplied, ReplicatedLog replicatedLog) { + long lastApplied, ReplicatedLog replicatedLog, Map peerAddresses, LoggingAdapter logger) { this.actor = actor; this.context = context; this.id = id; @@ -41,6 +49,8 @@ public class RaftActorContextImpl implements RaftActorContext{ this.commitIndex = commitIndex; this.lastApplied = lastApplied; this.replicatedLog = replicatedLog; + this.peerAddresses = peerAddresses; + this.LOG = logger; } public ActorRef actorOf(Props props){ @@ -86,4 +96,16 @@ public class RaftActorContextImpl implements RaftActorContext{ @Override public ActorSystem getActorSystem() { return context.system(); } + + @Override public LoggingAdapter getLogger() { + return this.LOG; + } + + @Override public Map getPeerAddresses() { + return peerAddresses; + } + + @Override public String getPeerAddress(String peerId) { + return peerAddresses.get(peerId); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java index f12bc9af73..34e7ac9768 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.raft; +import java.util.List; + /** * Represents the ReplicatedLog that needs to be kept in sync by the RaftActor */ @@ -28,6 +30,18 @@ public interface ReplicatedLog { */ ReplicatedLogEntry last(); + /** + * + * @return + */ + long lastIndex(); + + /** + * + * @return + */ + long lastTerm(); + /** * Remove all the entries from the logs >= index * @@ -40,4 +54,23 @@ public interface ReplicatedLog { * @param replicatedLogEntry */ void append(ReplicatedLogEntry replicatedLogEntry); + + /** + * + * @param replicatedLogEntry + */ + void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry); + + /** + * + * @param index + */ + List getFrom(long index); + + + /** + * + * @return + */ + long size(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 167082711d..304b2fdbab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -10,9 +10,11 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import akka.actor.Cancellable; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -65,9 +67,13 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { /** * */ - private Cancellable electionCancel = null; + /** + * + */ + protected String leaderId = null; + protected AbstractRaftActorBehavior(RaftActorContext context) { this.context = context; @@ -93,65 +99,25 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected RaftState appendEntries(ActorRef sender, - AppendEntries appendEntries, RaftState raftState){ + AppendEntries appendEntries, RaftState raftState) { - // 1. Reply false if term < currentTerm (§5.1) - if(appendEntries.getTerm() < currentTerm()){ - sender.tell(new AppendEntriesReply(currentTerm(), false), actor()); - return state(); + if (raftState != state()) { + context.getLogger().debug("Suggested state is " + raftState + + " current behavior state is " + state()); } - // 2. Reply false if log doesn’t contain an entry at prevLogIndex - // whose term matches prevLogTerm (§5.3) - ReplicatedLogEntry previousEntry = context.getReplicatedLog() - .get(appendEntries.getPrevLogIndex()); - - if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){ - sender.tell(new AppendEntriesReply(currentTerm(), false), actor()); + // 1. Reply false if term < currentTerm (§5.1) + if (appendEntries.getTerm() < currentTerm()) { + context.getLogger().debug( + "Cannot append entries because sender term " + appendEntries + .getTerm() + " is less than " + currentTerm()); + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm()), actor() + ); return state(); } - if(appendEntries.getEntries() != null) { - // 3. If an existing entry conflicts with a new one (same index - // but different terms), delete the existing entry and all that - // follow it (§5.3) - int addEntriesFrom = 0; - for (int i = 0; - i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { - ReplicatedLogEntry newEntry = context.getReplicatedLog() - .get(i + 1); - - if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){ - break; - } - if (newEntry != null && newEntry.getTerm() != appendEntries - .getEntries().get(i).getTerm()) { - context.getReplicatedLog().removeFrom(i + 1); - break; - } - } - - // 4. Append any new entries not already in the log - for (int i = addEntriesFrom; - i < appendEntries.getEntries().size(); i++) { - context.getReplicatedLog() - .append(appendEntries.getEntries().get(i)); - } - } - - - // 5. If leaderCommit > commitIndex, set commitIndex = - // min(leaderCommit, index of last new entry) - context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), - context.getReplicatedLog().last().getIndex())); - - // If commitIndex > lastApplied: increment lastApplied, apply - // log[lastApplied] to state machine (§5.3) - if (appendEntries.getLeaderCommit() > context.getLastApplied()) { - applyLogToStateMachine(appendEntries.getLeaderCommit()); - } - - sender.tell(new AppendEntriesReply(currentTerm(), true), actor()); return handleAppendEntries(sender, appendEntries, raftState); } @@ -201,7 +167,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { if (requestVote.getLastLogTerm() > lastTerm()) { candidateLatest = true; } else if ((requestVote.getLastLogTerm() == lastTerm()) - && requestVote.getLastLogIndex() >= lastTerm()) { + && requestVote.getLastLogIndex() >= lastIndex()) { candidateLatest = true; } @@ -236,23 +202,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { protected abstract RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState); - /** - * @return The derived class should return the state that corresponds to - * it's behavior - */ - protected abstract RaftState state(); - protected FiniteDuration electionDuration() { long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS); } - protected void scheduleElection(FiniteDuration interval) { - + protected void stopElection() { if (electionCancel != null && !electionCancel.isCancelled()) { electionCancel.cancel(); } + } + + protected void scheduleElection(FiniteDuration interval) { + + stopElection(); // Schedule an election. When the scheduler triggers an ElectionTimeout // message is sent to itself @@ -275,13 +239,44 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } protected long lastTerm() { - return context.getReplicatedLog().last().getTerm(); + return context.getReplicatedLog().lastTerm(); } protected long lastIndex() { - return context.getReplicatedLog().last().getIndex(); + return context.getReplicatedLog().lastIndex(); } + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + return null; + } + + protected void applyLogToStateMachine(long index) { + // Now maybe we apply to the state machine + for (long i = context.getLastApplied() + 1; + i < index + 1; i++) { + ActorRef clientActor = null; + String identifier = null; + ClientRequestTracker tracker = findClientRequestTracker(i); + + if (tracker != null) { + clientActor = tracker.getClientActor(); + identifier = tracker.getIdentifier(); + } + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(i); + + if (replicatedLogEntry != null) { + actor().tell(new ApplyState(clientActor, identifier, + replicatedLogEntry), actor()); + } else { + context.getLogger().error( + "Missing index " + i + " from log. Cannot apply state."); + } + } + // Send a local message to the local RaftActor (it's derived class to be + // specific to apply the log to it's index) + context.setLastApplied(index); + } @Override public RaftState handleMessage(ActorRef sender, Object message) { @@ -307,6 +302,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return raftState; } + @Override public String getLeaderId() { + return leaderId; + } + private RaftState applyTerm(RaftRPC rpc) { // If RPC request or response contains term T > currentTerm: // set currentTerm = T, convert to follower (§5.1) @@ -318,11 +317,4 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { return state(); } - private void applyLogToStateMachine(long index) { - // Send a local message to the local RaftActor (it's derived class to be - // specific to apply the log to it's index) - context.setLastApplied(index); - } - - } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java index 3e6b502631..0d035dbce7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java @@ -18,13 +18,13 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; /** * The behavior of a RaftActor when it is in the CandidateState - *

+ *

* Candidates (§5.2): *

    *
  • On conversion to candidate, start election: @@ -48,9 +48,11 @@ public class Candidate extends AbstractRaftActorBehavior { private final int votesRequired; - public Candidate(RaftActorContext context, List peerPaths) { + public Candidate(RaftActorContext context) { super(context); + Collection peerPaths = context.getPeerAddresses().values(); + for (String peerPath : peerPaths) { peerToActor.put(peerPath, context.actorSelection(peerPath)); @@ -83,11 +85,7 @@ public class Candidate extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries, RaftState suggestedState) { - // There is some peer who thinks it's a leader but is not - // I will not accept this append entries - sender.tell(new AppendEntriesReply( - context.getTermInformation().getCurrentTerm(), false), - context.getActor()); + context.getLogger().error("An unexpected AppendEntries received in state " + state()); return suggestedState; } @@ -102,30 +100,30 @@ public class Candidate extends AbstractRaftActorBehavior { @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { - if(suggestedState == RaftState.Follower) { + if (suggestedState == RaftState.Follower) { // If base class thinks I should be follower then I am return suggestedState; } - if(requestVoteReply.isVoteGranted()){ + if (requestVoteReply.isVoteGranted()) { voteCount++; } - if(voteCount >= votesRequired){ + if (voteCount >= votesRequired) { return RaftState.Leader; } return state(); } - @Override protected RaftState state() { + @Override public RaftState state() { return RaftState.Candidate; } @Override public RaftState handleMessage(ActorRef sender, Object message) { - if(message instanceof ElectionTimeout){ - if(votesRequired == 0){ + if (message instanceof ElectionTimeout) { + if (votesRequired == 0) { // If there are no peers then we should be a Leader // We wait for the election timeout to occur before declare // ourselves the leader. This gives enough time for a leader @@ -141,24 +139,33 @@ public class Candidate extends AbstractRaftActorBehavior { } - private void startNewTerm(){ + private void startNewTerm() { + + // set voteCount back to 1 (that is voting for self) voteCount = 1; // Increment the election term and vote for self long currentTerm = context.getTermInformation().getCurrentTerm(); - context.getTermInformation().update(currentTerm+1, context.getId()); + context.getTermInformation().update(currentTerm + 1, context.getId()); + + context.getLogger().debug("Starting new term " + (currentTerm+1)); // Request for a vote - for(ActorSelection peerActor : peerToActor.values()){ + for (ActorSelection peerActor : peerToActor.values()) { peerActor.tell(new RequestVote( context.getTermInformation().getCurrentTerm(), - context.getId(), context.getReplicatedLog().last().getIndex(), - context.getReplicatedLog().last().getTerm()), - context.getActor()); + context.getId(), + context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().lastTerm()), + context.getActor() + ); } } + @Override public void close() throws Exception { + stopElection(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index d93271072c..88558cac16 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -36,6 +37,114 @@ public class Follower extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries, RaftState suggestedState) { + + // If we got here then we do appear to be talking to the leader + leaderId = appendEntries.getLeaderId(); + + // 2. Reply false if log doesn’t contain an entry at prevLogIndex + // whose term matches prevLogTerm (§5.3) + ReplicatedLogEntry previousEntry = context.getReplicatedLog() + .get(appendEntries.getPrevLogIndex()); + + + if (lastIndex() > -1 && previousEntry != null + && previousEntry.getTerm() != appendEntries + .getPrevLogTerm()) { + + context.getLogger().debug( + "Cannot append entries because previous entry term " + + previousEntry.getTerm() + + " is not equal to append entries prevLogTerm " + + appendEntries.getPrevLogTerm()); + + sender.tell( + new AppendEntriesReply(context.getId(), currentTerm(), false, + lastIndex(), lastTerm()), actor() + ); + return state(); + } + + if (appendEntries.getEntries() != null + && appendEntries.getEntries().size() > 0) { + context.getLogger().debug( + "Number of entries to be appended = " + appendEntries + .getEntries().size()); + + // 3. If an existing entry conflicts with a new one (same index + // but different terms), delete the existing entry and all that + // follow it (§5.3) + int addEntriesFrom = 0; + if (context.getReplicatedLog().size() > 0) { + for (int i = 0; + i < appendEntries.getEntries() + .size(); i++, addEntriesFrom++) { + ReplicatedLogEntry matchEntry = + appendEntries.getEntries().get(i); + ReplicatedLogEntry newEntry = context.getReplicatedLog() + .get(matchEntry.getIndex()); + + if (newEntry == null) { + //newEntry not found in the log + break; + } + + if (newEntry != null && newEntry.getTerm() == matchEntry + .getTerm()) { + continue; + } + if (newEntry != null && newEntry.getTerm() != matchEntry + .getTerm()) { + context.getLogger().debug( + "Removing entries from log starting at " + + matchEntry.getIndex()); + context.getReplicatedLog() + .removeFrom(matchEntry.getIndex()); + break; + } + } + } + + context.getLogger().debug( + "After cleanup entries to be added from = " + (addEntriesFrom + + lastIndex())); + + // 4. Append any new entries not already in the log + for (int i = addEntriesFrom; + i < appendEntries.getEntries().size(); i++) { + context.getLogger().debug( + "Append entry to log " + appendEntries.getEntries().get(i) + .toString()); + context.getReplicatedLog() + .appendAndPersist(appendEntries.getEntries().get(i)); + } + + context.getLogger().debug( + "Log size is now " + context.getReplicatedLog().size()); + } + + + // 5. If leaderCommit > commitIndex, set commitIndex = + // min(leaderCommit, index of last new entry) + + long prevCommitIndex = context.getCommitIndex(); + + context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), + context.getReplicatedLog().lastIndex())); + + if (prevCommitIndex != context.getCommitIndex()) { + context.getLogger() + .debug("Commit index set to " + context.getCommitIndex()); + } + + // If commitIndex > lastApplied: increment lastApplied, apply + // log[lastApplied] to state machine (§5.3) + if (appendEntries.getLeaderCommit() > context.getLastApplied()) { + applyLogToStateMachine(appendEntries.getLeaderCommit()); + } + + sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true, + lastIndex(), lastTerm()), actor()); + return suggestedState; } @@ -49,7 +158,7 @@ public class Follower extends AbstractRaftActorBehavior { return suggestedState; } - @Override protected RaftState state() { + @Override public RaftState state() { return RaftState.Follower; } @@ -62,4 +171,8 @@ public class Follower extends AbstractRaftActorBehavior { return super.handleMessage(sender, message); } + + @Override public void close() throws Exception { + stopElection(); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index cfefd21c56..c06ee9bd2b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -12,17 +12,22 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.raft.ClientRequestTracker; +import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.internal.messages.Replicate; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import scala.concurrent.duration.FiniteDuration; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,21 +66,37 @@ public class Leader extends AbstractRaftActorBehavior { private Cancellable heartbeatCancel = null; - public Leader(RaftActorContext context, List followerPaths) { + private List trackerList = new ArrayList<>(); + + private final int minReplicationCount; + + public Leader(RaftActorContext context) { super(context); - for (String followerPath : followerPaths) { + if(lastIndex() >= 0) { + context.setCommitIndex(lastIndex()); + } + + for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl(followerPath, - new AtomicLong(0), - new AtomicLong(0)); + new FollowerLogInformationImpl(followerId, + new AtomicLong(lastIndex()), + new AtomicLong(-1)); - followerToActor.put(followerPath, - context.actorSelection(followerLogInformation.getId())); - followerToLog.put(followerPath, followerLogInformation); + followerToActor.put(followerId, + context.actorSelection(context.getPeerAddress(followerId))); + + followerToLog.put(followerId, followerLogInformation); + + } + if (followerToActor.size() > 0) { + minReplicationCount = (followerToActor.size() + 1) / 2 + 1; + } else { + minReplicationCount = 0; } + // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to @@ -87,47 +108,184 @@ public class Leader extends AbstractRaftActorBehavior { @Override protected RaftState handleAppendEntries(ActorRef sender, AppendEntries appendEntries, RaftState suggestedState) { + + context.getLogger() + .error("An unexpected AppendEntries received in state " + state()); + return suggestedState; } @Override protected RaftState handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply, RaftState suggestedState) { + + // Do not take any other action since a behavior change is coming + if (suggestedState != state()) + return suggestedState; + + // Update the FollowerLogInformation + String followerId = appendEntriesReply.getFollowerId(); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + if (appendEntriesReply.isSuccess()) { + followerLogInformation + .setMatchIndex(appendEntriesReply.getLogLastIndex()); + followerLogInformation + .setNextIndex(appendEntriesReply.getLogLastIndex() + 1); + } else { + followerLogInformation.decrNextIndex(); + } + + // Now figure out if this reply warrants a change in the commitIndex + // If there exists an N such that N > commitIndex, a majority + // of matchIndex[i] ≥ N, and log[N].term == currentTerm: + // set commitIndex = N (§5.3, §5.4). + for (long N = context.getCommitIndex() + 1; ; N++) { + int replicatedCount = 1; + + for (FollowerLogInformation info : followerToLog.values()) { + if (info.getMatchIndex().get() >= N) { + replicatedCount++; + } + } + + if (replicatedCount >= minReplicationCount){ + ReplicatedLogEntry replicatedLogEntry = + context.getReplicatedLog().get(N); + if (replicatedLogEntry != null + && replicatedLogEntry.getTerm() + == currentTerm()) { + context.setCommitIndex(N); + } + } else { + break; + } + } + + if(context.getCommitIndex() > context.getLastApplied()){ + applyLogToStateMachine(context.getCommitIndex()); + } + return suggestedState; } + protected ClientRequestTracker findClientRequestTracker(long logIndex) { + for (ClientRequestTracker tracker : trackerList) { + if (tracker.getIndex() == logIndex) { + return tracker; + } + } + + return null; + } + @Override protected RaftState handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply, RaftState suggestedState) { return suggestedState; } - @Override protected RaftState state() { + @Override public RaftState state() { return RaftState.Leader; } @Override public RaftState handleMessage(ActorRef sender, Object message) { Preconditions.checkNotNull(sender, "sender should not be null"); - scheduleHeartBeat(HEART_BEAT_INTERVAL); - - if (message instanceof SendHeartBeat) { - for (ActorSelection follower : followerToActor.values()) { - follower.tell(new AppendEntries( - context.getTermInformation().getCurrentTerm(), - context.getId(), - context.getReplicatedLog().last().getIndex(), - context.getReplicatedLog().last().getTerm(), - Collections.EMPTY_LIST, context.getCommitIndex()), - context.getActor()); + try { + if (message instanceof SendHeartBeat) { + return sendHeartBeat(); + } else if (message instanceof Replicate) { + + Replicate replicate = (Replicate) message; + long logIndex = replicate.getReplicatedLogEntry().getIndex(); + + context.getLogger().debug("Replicate message " + logIndex); + + if (followerToActor.size() == 0) { + context.setCommitIndex( + replicate.getReplicatedLogEntry().getIndex()); + + context.getActor() + .tell(new ApplyState(replicate.getClientActor(), + replicate.getIdentifier(), + replicate.getReplicatedLogEntry()), + context.getActor() + ); + } else { + + trackerList.add( + new ClientRequestTrackerImpl(replicate.getClientActor(), + replicate.getIdentifier(), + logIndex) + ); + + ReplicatedLogEntry prevEntry = + context.getReplicatedLog().get(lastIndex() - 1); + long prevLogIndex = -1; + long prevLogTerm = -1; + if (prevEntry != null) { + prevLogIndex = prevEntry.getIndex(); + prevLogTerm = prevEntry.getTerm(); + } + // Send an AppendEntries to all followers + for (String followerId : followerToActor.keySet()) { + ActorSelection followerActor = + followerToActor.get(followerId); + FollowerLogInformation followerLogInformation = + followerToLog.get(followerId); + followerActor.tell( + new AppendEntries(currentTerm(), context.getId(), + prevLogIndex, prevLogTerm, + context.getReplicatedLog().getFrom( + followerLogInformation.getNextIndex() + .get() + ), context.getCommitIndex() + ), + actor() + ); + } + } } - return state(); + } finally { + scheduleHeartBeat(HEART_BEAT_INTERVAL); } + return super.handleMessage(sender, message); } - private void scheduleHeartBeat(FiniteDuration interval) { + private RaftState sendHeartBeat() { + if (followerToActor.size() > 0) { + for (String follower : followerToActor.keySet()) { + + FollowerLogInformation followerLogInformation = + followerToLog.get(follower); + + AtomicLong nextIndex = + followerLogInformation.getNextIndex(); + + List entries = + context.getReplicatedLog().getFrom(nextIndex.get()); + + followerToActor.get(follower).tell(new AppendEntries( + context.getTermInformation().getCurrentTerm(), + context.getId(), + context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().lastTerm(), + entries, context.getCommitIndex()), + context.getActor() + ); + } + } + return state(); + } + + private void stopHeartBeat() { if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) { heartbeatCancel.cancel(); } + } + + private void scheduleHeartBeat(FiniteDuration interval) { + stopHeartBeat(); // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat // message is sent to itself. @@ -135,9 +293,18 @@ public class Leader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatCancel = - context.getActorSystem().scheduler().scheduleOnce(interval, + context.getActorSystem().scheduler().scheduleOnce( + interval, context.getActor(), new SendHeartBeat(), context.getActorSystem().dispatcher(), context.getActor()); } + @Override public void close() throws Exception { + stopHeartBeat(); + } + + @Override public String getLeaderId() { + return context.getId(); + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java index 6811678aaa..ca2d916ecf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java @@ -24,7 +24,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; * In each of these behaviors the Raft Actor handles the same Raft messages * differently. */ -public interface RaftActorBehavior { +public interface RaftActorBehavior extends AutoCloseable{ /** * Handle a message. If the processing of the message warrants a state * change then a new state should be returned otherwise this method should @@ -36,4 +36,17 @@ public interface RaftActorBehavior { * @return The new state or self (this) */ RaftState handleMessage(ActorRef sender, Object message); + + /** + * The state associated with a given behavior + * + * @return + */ + RaftState state(); + + /** + * + * @return + */ + String getLeaderId(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java new file mode 100644 index 0000000000..a60aea46e8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeader.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.client.messages; + +public class FindLeader { + +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java new file mode 100644 index 0000000000..b36ef112b3 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FindLeaderReply.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.client.messages; + +public class FindLeaderReply { + private final String leaderActor; + + public FindLeaderReply(String leaderActor) { + this.leaderActor = leaderActor; + } + + public String getLeaderActor() { + return leaderActor; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java new file mode 100644 index 0000000000..c9ba26eaad --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + +public class ApplyState { + private final ActorRef clientActor; + private final String identifier; + private final ReplicatedLogEntry replicatedLogEntry; + + public ApplyState(ActorRef clientActor, String identifier, + ReplicatedLogEntry replicatedLogEntry) { + this.clientActor = clientActor; + this.identifier = identifier; + this.replicatedLogEntry = replicatedLogEntry; + } + + public ActorRef getClientActor() { + return clientActor; + } + + public String getIdentifier() { + return identifier; + } + + public ReplicatedLogEntry getReplicatedLogEntry() { + return replicatedLogEntry; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java new file mode 100644 index 0000000000..6ff7cfce5c --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.internal.messages; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; + +public class Replicate { + private final ActorRef clientActor; + private final String identifier; + private final ReplicatedLogEntry replicatedLogEntry; + + public Replicate(ActorRef clientActor, String identifier, + ReplicatedLogEntry replicatedLogEntry) { + + this.clientActor = clientActor; + this.identifier = identifier; + this.replicatedLogEntry = replicatedLogEntry; + } + + public ActorRef getClientActor() { + return clientActor; + } + + public String getIdentifier() { + return identifier; + } + + public ReplicatedLogEntry getReplicatedLogEntry() { + return replicatedLogEntry; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index c0e89a8ef4..9bb5029548 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -62,4 +62,14 @@ public class AppendEntries extends AbstractRaftRPC { public long getLeaderCommit() { return leaderCommit; } + + @Override public String toString() { + return "AppendEntries{" + + "leaderId='" + leaderId + '\'' + + ", prevLogIndex=" + prevLogIndex + + ", prevLogTerm=" + prevLogTerm + + ", entries=" + entries + + ", leaderCommit=" + leaderCommit + + '}'; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index 28f0f6b52b..7524d8f232 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -17,9 +17,24 @@ public class AppendEntriesReply extends AbstractRaftRPC{ // prevLogIndex and prevLogTerm private final boolean success; - public AppendEntriesReply(long term, boolean success) { + // The index of the last entry in the followers log + // This will be used to set the matchIndex for the follower on the + // Leader + private final long logLastIndex; + + private final long logLastTerm; + + // The followerId - this will be used to figure out which follower is + // responding + private final String followerId; + + public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm) { super(term); + + this.followerId = followerId; this.success = success; + this.logLastIndex = logLastIndex; + this.logLastTerm = logLastTerm; } public long getTerm() { @@ -29,4 +44,16 @@ public class AppendEntriesReply extends AbstractRaftRPC{ public boolean isSuccess() { return success; } + + public long getLogLastIndex() { + return logLastIndex; + } + + public long getLogLastTerm() { + return logLastTerm; + } + + public String getFollowerId() { + return followerId; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf new file mode 100644 index 0000000000..494a99e5d6 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf @@ -0,0 +1,12 @@ +akka { + loglevel = "DEBUG" + actor { + serializers { + java = "akka.serialization.JavaSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.raft.RaftActor$ReplicatedLogImplEntry" = java + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 3b332e4ec7..addf51a63c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -12,9 +12,14 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.event.Logging; +import akka.event.LoggingAdapter; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class MockRaftActorContext implements RaftActorContext { @@ -25,6 +30,7 @@ public class MockRaftActorContext implements RaftActorContext { private long lastApplied = 0; private final ElectionTerm electionTerm; private ReplicatedLog replicatedLog; + private Map peerAddresses = new HashMap(); public MockRaftActorContext(){ electionTerm = null; @@ -102,6 +108,22 @@ public class MockRaftActorContext implements RaftActorContext { return this.system; } + @Override public LoggingAdapter getLogger() { + return Logging.getLogger(system, this); + } + + @Override public Map getPeerAddresses() { + return peerAddresses; + } + + @Override public String getPeerAddress(String peerId) { + return peerAddresses.get(peerId); + } + + public void setPeerAddresses(Map peerAddresses) { + this.peerAddresses = peerAddresses; + } + public static class MockReplicatedLog implements ReplicatedLog { private ReplicatedLogEntry replicatedLogEntry = new MockReplicatedLogEntry(0,0, ""); @@ -115,12 +137,35 @@ public class MockRaftActorContext implements RaftActorContext { return last; } + @Override public long lastIndex() { + return last.getIndex(); + } + + @Override public long lastTerm() { + return last.getTerm(); + } + @Override public void removeFrom(long index) { } @Override public void append(ReplicatedLogEntry replicatedLogEntry) { } + @Override public void appendAndPersist( + ReplicatedLogEntry replicatedLogEntry) { + } + + @Override public List getFrom(long index) { + return Collections.EMPTY_LIST; + } + + @Override public long size() { + if(replicatedLogEntry != null){ + return 1; + } + return 0; + } + public void setReplicatedLogEntry( ReplicatedLogEntry replicatedLogEntry) { this.replicatedLogEntry = replicatedLogEntry; @@ -135,13 +180,35 @@ public class MockRaftActorContext implements RaftActorContext { private final List log = new ArrayList<>(10000); @Override public ReplicatedLogEntry get(long index) { + if(index >= log.size() || index < 0){ + return null; + } return log.get((int) index); } @Override public ReplicatedLogEntry last() { + if(log.size() == 0){ + return null; + } return log.get(log.size()-1); } + @Override public long lastIndex() { + if(log.size() == 0){ + return -1; + } + + return last().getIndex(); + } + + @Override public long lastTerm() { + if(log.size() == 0){ + return -1; + } + + return last().getTerm(); + } + @Override public void removeFrom(long index) { for(int i=(int) index ; i < log.size() ; i++) { log.remove(i); @@ -151,6 +218,23 @@ public class MockRaftActorContext implements RaftActorContext { @Override public void append(ReplicatedLogEntry replicatedLogEntry) { log.add(replicatedLogEntry); } + + @Override public void appendAndPersist( + ReplicatedLogEntry replicatedLogEntry) { + append(replicatedLogEntry); + } + + @Override public List getFrom(long index) { + List entries = new ArrayList<>(); + for(int i=(int) index ; i < log.size() ; i++) { + entries.add(get(i)); + } + return entries; + } + + @Override public long size() { + return log.size(); + } } public static class MockReplicatedLogEntry implements ReplicatedLogEntry { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java index 3cd373adf4..273342eb47 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { @@ -53,35 +52,6 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { }}; } - /** - * This test verifies that when an AppendEntries RPC is received by a RaftActor - * with a commitIndex that is greater than what has been applied to the - * state machine of the RaftActor, the RaftActor applies the state and - * sets it current applied state to the commitIndex of the sender. - * - * @throws Exception - */ - @Test - public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { - new JavaTestKit(getSystem()) {{ - - RaftActorContext context = - createActorContext(); - - context.setLastApplied(100); - setLastLogEntry((MockRaftActorContext) context, 0, 0, ""); - - // The new commitIndex is 101 - AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); - - RaftState raftState = - createBehavior(context).handleMessage(getRef(), appendEntries); - - assertEquals(101L, context.getLastApplied()); - - }}; - } /** * This test verifies that when an AppendEntries is received with a term that @@ -134,248 +104,56 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { }}; } - /** - * This test verifies that when an AppendEntries is received a specific prevLogTerm - * which does not match the term that is in RaftActors log entry at prevLogIndex - * then the RaftActor does not change it's state and it returns a failure. - * - * @throws Exception - */ - @Test - public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() - throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); - - // First set the receivers term to lower number - context.getTermInformation().update(95, "test"); - - // Set the last log entry term for the receiver to be greater than - // what we will be sending as the prevLogTerm in AppendEntries - MockRaftActorContext.MockReplicatedLog mockReplicatedLog = - setLastLogEntry(context, 20, 0, ""); - - // Also set the entry at index 0 with term 20 which will be greater - // than the prevLogTerm sent by the sender - mockReplicatedLog.setReplicatedLogEntry( - new MockRaftActorContext.MockReplicatedLogEntry(20, 0, "")); - - // AppendEntries is now sent with a bigger term - // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); - - RaftActorBehavior behavior = createBehavior(context); - - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); - - RaftState raftState = - behavior.handleMessage(getRef(), appendEntries); - - assertEquals(expected, raftState); - - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); - - assertEquals(false, out); - - }}; - } - - /** - * This test verifies that when a new AppendEntries message is received with - * new entries and the logs of the sender and receiver match that the new - * entries get added to the log and the log is incremented by the number of - * entries received in appendEntries - * - * @throws Exception - */ @Test - public void testHandleAppendEntriesAddNewEntries() throws Exception { - new JavaTestKit(getSystem()) {{ + public void testHandleAppendEntriesAddSameEntryToLog(){ + new JavaTestKit(getSystem()) { + { - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); - // First set the receivers term to lower number - context.getTermInformation().update(1, "test"); - - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one")); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two")); - - context.setReplicatedLog(log); - - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three")); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four")); - - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 2, 1, entries, 101); + // First set the receivers term to lower number + context.getTermInformation().update(2, "test"); - RaftActorBehavior behavior = createBehavior(context); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); - if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) { - // Resetting the Candidates term to make sure it will match - // the term sent by AppendEntries. If this was not done then - // the test will fail because the Candidate will assume that - // the message was sent to it from a lower term peer and will - // thus respond with a failure - context.getTermInformation().update(1, "test"); - } + context.setReplicatedLog(log); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); - RaftState raftState = - behavior.handleMessage(getRef(), appendEntries); + AppendEntries appendEntries = + new AppendEntries(2, "leader-1", -1, 1, entries, 0); - assertEquals(expected, raftState); - assertEquals(5, log.last().getIndex() + 1); - assertNotNull(log.get(3)); - assertNotNull(log.get(4)); + RaftActorBehavior behavior = createBehavior(context); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } + if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) { + // Resetting the Candidates term to make sure it will match + // the term sent by AppendEntries. If this was not done then + // the test will fail because the Candidate will assume that + // the message was sent to it from a lower term peer and will + // thus respond with a failure + context.getTermInformation().update(2, "test"); } - }.get(); - assertEquals(true, out); + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); - }}; - } + assertEquals(expected, raftState); - /** - * This test verifies that when a new AppendEntries message is received with - * new entries and the logs of the sender and receiver are out-of-sync that - * the log is first corrected by removing the out of sync entries from the - * log and then adding in the new entries sent with the AppendEntries message - * - * @throws Exception - */ - @Test - public void testHandleAppendEntriesCorrectReceiverLogEntries() - throws Exception { - new JavaTestKit(getSystem()) {{ + assertEquals(1, log.size()); - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); - // First set the receivers term to lower number - context.getTermInformation().update(2, "test"); - - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one")); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two")); - - context.setReplicatedLog(log); - - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1")); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three")); - - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 1, 1, entries, 101); - - RaftActorBehavior behavior = createBehavior(context); - - if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) { - // Resetting the Candidates term to make sure it will match - // the term sent by AppendEntries. If this was not done then - // the test will fail because the Candidate will assume that - // the message was sent to it from a lower term peer and will - // thus respond with a failure - context.getTermInformation().update(2, "test"); - } - - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); - - RaftState raftState = - behavior.handleMessage(getRef(), appendEntries); - - assertEquals(expected, raftState); - - // The entry at index 2 will be found out-of-sync with the leader - // and will be removed - // Then the two new entries will be added to the log - // Thus making the log to have 4 entries - assertEquals(4, log.last().getIndex() + 1); - assertNotNull(log.get(2)); - - // Check that the entry at index 2 has the new data - assertEquals("two-1", log.get(2).getData()); - assertNotNull(log.get(3)); - - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); - - assertEquals(true, out); - - - }}; + }}; } /** @@ -566,7 +344,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest { } protected AppendEntriesReply createAppendEntriesReplyWithNewerTerm() { - return new AppendEntriesReply(100, false); + return new AppendEntriesReply("follower-1", 100, false, 100, 100); } protected RequestVote createRequestVoteWithNewerTerm() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java index 183c668fca..8bcee58afe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java @@ -4,6 +4,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import junit.framework.Assert; +import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; @@ -15,8 +16,9 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -37,12 +39,38 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { private final ActorRef peerActor4 = getSystem().actorOf(Props.create( DoNothingActor.class)); + private final Map onePeer = new HashMap<>(); + private final Map twoPeers = new HashMap<>(); + private final Map fourPeers = new HashMap<>(); + + @Before + public void setUp(){ + onePeer.put(peerActor1.path().toString(), + peerActor1.path().toString()); + + twoPeers.put(peerActor1.path().toString(), + peerActor1.path().toString()); + twoPeers.put(peerActor2.path().toString(), + peerActor2.path().toString()); + + fourPeers.put(peerActor1.path().toString(), + peerActor1.path().toString()); + fourPeers.put(peerActor2.path().toString(), + peerActor2.path().toString()); + fourPeers.put(peerActor3.path().toString(), + peerActor3.path().toString()); + fourPeers.put(peerActor4.path().toString(), + peerActor3.path().toString()); + + + } + @Test public void testWhenACandidateIsCreatedItIncrementsTheCurrentTermAndVotesForItself(){ RaftActorContext raftActorContext = createActorContext(); long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm(); - new Candidate(raftActorContext, Collections.EMPTY_LIST); + new Candidate(raftActorContext); assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm()); assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor()); @@ -55,7 +83,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new Within(duration("1 seconds")) { protected void run() { - Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST); + Candidate candidate = new Candidate(createActorContext(getTestActor())); final Boolean out = new ExpectMsg(duration("1 seconds"), "ElectionTimeout") { // do not put code outside this method, will run afterwards @@ -78,7 +106,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { public void testHandleElectionTimeoutWhenThereAreZeroPeers(){ RaftActorContext raftActorContext = createActorContext(); Candidate candidate = - new Candidate(raftActorContext, Collections.EMPTY_LIST); + new Candidate(raftActorContext); RaftState raftState = candidate.handleMessage(candidateActor, new ElectionTimeout()); @@ -87,12 +115,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleElectionTimeoutWhenThereAreTwoPeers(){ - RaftActorContext raftActorContext = createActorContext(); + public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){ + MockRaftActorContext raftActorContext = + (MockRaftActorContext) createActorContext(); + raftActorContext.setPeerAddresses(onePeer); Candidate candidate = - new Candidate(raftActorContext, Arrays - .asList(peerActor1.path().toString(), - peerActor2.path().toString())); + new Candidate(raftActorContext); RaftState raftState = candidate.handleMessage(candidateActor, new ElectionTimeout()); @@ -101,12 +129,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { } @Test - public void testBecomeLeaderOnReceivingMajorityVotesInThreePeerCluster(){ - RaftActorContext raftActorContext = createActorContext(); + public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){ + MockRaftActorContext raftActorContext = + (MockRaftActorContext) createActorContext(); + raftActorContext.setPeerAddresses(twoPeers); Candidate candidate = - new Candidate(raftActorContext, Arrays - .asList(peerActor1.path().toString(), - peerActor2.path().toString())); + new Candidate(raftActorContext); RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); @@ -115,17 +143,16 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { } @Test - public void testBecomeLeaderOnReceivingMajorityVotesInFivePeerCluster(){ - RaftActorContext raftActorContext = createActorContext(); + public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){ + MockRaftActorContext raftActorContext = + (MockRaftActorContext) createActorContext(); + raftActorContext.setPeerAddresses(fourPeers); Candidate candidate = - new Candidate(raftActorContext, Arrays - .asList(peerActor1.path().toString(), - peerActor2.path().toString(), - peerActor3.path().toString())); + new Candidate(raftActorContext); RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); - RaftState stateOnSecondVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true)); + RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true)); Assert.assertEquals(RaftState.Candidate, stateOnFirstVote); Assert.assertEquals(RaftState.Leader, stateOnSecondVote); @@ -139,7 +166,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new Within(duration("1 seconds")) { protected void run() { - Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST); + Candidate candidate = new Candidate(createActorContext(getTestActor())); candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.EMPTY_LIST, 0)); @@ -168,7 +195,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { new Within(duration("1 seconds")) { protected void run() { - Candidate candidate = new Candidate(createActorContext(getTestActor()), Collections.EMPTY_LIST); + Candidate candidate = new Candidate(createActorContext(getTestActor())); candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0)); @@ -261,7 +288,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest { @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Candidate(actorContext, Collections.EMPTY_LIST); + return new Candidate(actorContext); } @Override protected RaftActorContext createActorContext() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 90acbb1eae..ca0e13db03 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -8,12 +8,19 @@ import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import java.util.ArrayList; +import java.util.List; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class FollowerTest extends AbstractRaftActorBehaviorTest { @@ -133,4 +140,264 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { }}; } + /** + * This test verifies that when an AppendEntries RPC is received by a RaftActor + * with a commitIndex that is greater than what has been applied to the + * state machine of the RaftActor, the RaftActor applies the state and + * sets it current applied state to the commitIndex of the sender. + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { + new JavaTestKit(getSystem()) {{ + + RaftActorContext context = + createActorContext(); + + context.setLastApplied(100); + setLastLogEntry((MockRaftActorContext) context, 0, 0, ""); + + // The new commitIndex is 101 + AppendEntries appendEntries = + new AppendEntries(100, "leader-1", 0, 0, null, 101); + + RaftState raftState = + createBehavior(context).handleMessage(getRef(), appendEntries); + + assertEquals(101L, context.getLastApplied()); + + }}; + } + + /** + * This test verifies that when an AppendEntries is received a specific prevLogTerm + * which does not match the term that is in RaftActors log entry at prevLogIndex + * then the RaftActor does not change it's state and it returns a failure. + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(95, "test"); + + // Set the last log entry term for the receiver to be greater than + // what we will be sending as the prevLogTerm in AppendEntries + MockRaftActorContext.MockReplicatedLog mockReplicatedLog = + setLastLogEntry(context, 20, 0, ""); + + // Also set the entry at index 0 with term 20 which will be greater + // than the prevLogTerm sent by the sender + mockReplicatedLog.setReplicatedLogEntry( + new MockRaftActorContext.MockReplicatedLogEntry(20, 0, "")); + + // AppendEntries is now sent with a bigger term + // this will set the receivers term to be the same as the sender's term + AppendEntries appendEntries = + new AppendEntries(100, "leader-1", 0, 0, null, 101); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(false, out); + + + }}; + } + + + + /** + * This test verifies that when a new AppendEntries message is received with + * new entries and the logs of the sender and receiver match that the new + * entries get added to the log and the log is incremented by the number of + * entries received in appendEntries + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesAddNewEntries() throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 3, "three")); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(1, 4, "four")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = + new AppendEntries(1, "leader-1", 2, 1, entries, 4); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + assertEquals(5, log.last().getIndex() + 1); + assertNotNull(log.get(3)); + assertNotNull(log.get(4)); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + + + }}; + } + + + + /** + * This test verifies that when a new AppendEntries message is received with + * new entries and the logs of the sender and receiver are out-of-sync that + * the log is first corrected by removing the out of sync entries from the + * log and then adding in the new entries sent with the AppendEntries message + * + * @throws Exception + */ + @Test + public void testHandleAppendEntriesCorrectReceiverLogEntries() + throws Exception { + new JavaTestKit(getSystem()) {{ + + MockRaftActorContext context = (MockRaftActorContext) + createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(2, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = + new MockRaftActorContext.SimpleReplicatedLog(); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 0, "zero")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 1, "one")); + log.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(2, 2, "two-1")); + entries.add( + new MockRaftActorContext.MockReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = + new AppendEntries(2, "leader-1", 1, 1, entries, 3); + + RaftActorBehavior behavior = createBehavior(context); + + // Send an unknown message so that the state of the RaftActor remains unchanged + RaftState expected = behavior.handleMessage(getRef(), "unknown"); + + RaftState raftState = + behavior.handleMessage(getRef(), appendEntries); + + assertEquals(expected, raftState); + + // The entry at index 2 will be found out-of-sync with the leader + // and will be removed + // Then the two new entries will be added to the log + // Thus making the log to have 4 entries + assertEquals(4, log.last().getIndex() + 1); + assertNotNull(log.get(2)); + + // Check that the entry at index 2 has the new data + assertEquals("two-1", log.get(2).getData()); + assertNotNull(log.get(3)); + + // Also expect an AppendEntriesReply to be sent where success is false + final Boolean out = new ExpectMsg(duration("1 seconds"), + "AppendEntriesReply") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof AppendEntriesReply) { + AppendEntriesReply reply = (AppendEntriesReply) in; + return reply.isSuccess(); + } else { + throw noMatch(); + } + } + }.get(); + + assertEquals(true, out); + + + }}; + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 5684d6651e..e5e54d5944 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -12,9 +12,8 @@ import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -27,7 +26,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { public void testHandleMessageForUnknownMessage() throws Exception { new JavaTestKit(getSystem()) {{ Leader leader = - new Leader(createActorContext(), Collections.EMPTY_LIST); + new Leader(createActorContext()); // handle message should return the Leader state when it receives an // unknown message @@ -46,11 +45,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { ActorRef followerActor = getTestActor(); - List followers = new ArrayList(); + MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(); - followers.add(followerActor.path().toString()); + Map peerAddresses = new HashMap(); - Leader leader = new Leader(createActorContext(), followers); + peerAddresses.put(followerActor.path().toString(), followerActor.path().toString()); + + actorContext.setPeerAddresses(peerAddresses); + + Leader leader = new Leader(actorContext); leader.handleMessage(senderActor, new SendHeartBeat()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { @@ -78,7 +81,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Leader(actorContext, Collections.EMPTY_LIST); + return new Leader(actorContext); } @Override protected RaftActorContext createActorContext() { -- 2.36.6