From 6702aa8391498f647b7181fd971ca23674ac14d6 Mon Sep 17 00:00:00 2001 From: Kamal Rameshan Date: Thu, 31 Jul 2014 19:38:48 -0700 Subject: [PATCH] Configuration Interface for Raft A default implementation for the ConfigParams is provided. The client can override and provide an impl of ConfigParams. For now only 4 parameters have been introduced. More can be added. ExampleActor and TestDriver provide examples. Change-Id: I42a46a521ad9364d535de479c58873dc70740c9f Signed-off-by: Kamal Rameshan --- .../cluster/example/ExampleActor.java | 12 ++-- .../example/ExampleConfigParamsImpl.java | 20 ++++++ .../controller/cluster/example/Main.java | 11 ++-- .../cluster/example/TestDriver.java | 16 +++-- .../controller/cluster/raft/ConfigParams.java | 55 +++++++++++++++++ .../cluster/raft/DefaultConfigParamsImpl.java | 61 +++++++++++++++++++ .../controller/cluster/raft/RaftActor.java | 20 +++--- .../cluster/raft/RaftActorContext.java | 5 ++ .../cluster/raft/RaftActorContextImpl.java | 13 +++- .../behaviors/AbstractRaftActorBehavior.java | 27 +------- .../cluster/raft/behaviors/Leader.java | 6 +- .../cluster/raft/MockRaftActorContext.java | 5 +- 12 files changed, 201 insertions(+), 50 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 90bf121fd4..aa100df9d0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,10 +11,12 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -31,15 +33,17 @@ public class ExampleActor extends RaftActor { private long persistIdentifier = 1; - public ExampleActor(String id, Map peerAddresses) { - super(id, peerAddresses); + public ExampleActor(String id, Map peerAddresses, + Optional configParams) { + super(id, peerAddresses, configParams); } - public static Props props(final String id, final Map peerAddresses){ + public static Props props(final String id, final Map peerAddresses, + final Optional configParams){ return Props.create(new Creator(){ @Override public ExampleActor create() throws Exception { - return new ExampleActor(id, peerAddresses); + return new ExampleActor(id, peerAddresses, configParams); } }); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java new file mode 100644 index 0000000000..d11377dbcb --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.example; + +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; + +/** + * Implementation of ConfigParams for Example + */ +public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl { + @Override + public long getSnapshotBatchCount() { + return 50; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java index a148ed4009..0e5d643a64 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java @@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.raft.ConfigParams; import java.io.BufferedReader; import java.io.InputStreamReader; @@ -34,15 +36,15 @@ public class Main { public static void main(String[] args) throws Exception{ ActorRef example1Actor = actorSystem.actorOf(ExampleActor.props("example-1", - withoutPeer("example-1")), "example-1"); + withoutPeer("example-1"), Optional.absent()), "example-1"); ActorRef example2Actor = actorSystem.actorOf(ExampleActor.props("example-2", - withoutPeer("example-2")), "example-2"); + withoutPeer("example-2"), Optional.absent()), "example-2"); ActorRef example3Actor = actorSystem.actorOf(ExampleActor.props("example-3", - withoutPeer("example-3")), "example-3"); + withoutPeer("example-3"), Optional.absent()), "example-3"); List examples = Arrays.asList(example1Actor, example2Actor, example3Actor); @@ -74,7 +76,8 @@ public class Main { String actorName = "example-" + i; examples.add(i - 1, actorSystem.actorOf(ExampleActor.props(actorName, - withoutPeer(actorName)), actorName)); + withoutPeer(actorName), Optional.absent()), + actorName)); System.out.println("Created actor : " + actorName); continue; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index 18c14e7124..fd6e192bf0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -2,8 +2,10 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; @@ -27,6 +29,7 @@ public class TestDriver { private static Map actorRefs = new HashMap(); private static LogGenerator logGenerator = new LogGenerator(); private int nameCounter = 0; + private static ConfigParams configParams = new ExampleConfigParamsImpl(); /** * Create nodes, add clients and start logging. @@ -111,6 +114,11 @@ public class TestDriver { } } + public static ActorRef createExampleActor(String name) { + return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name), + Optional.of(configParams)), name); + } + public void createNodes(int num) { for (int i=0; i < num; i++) { nameCounter = nameCounter + 1; @@ -118,8 +126,7 @@ public class TestDriver { } for (String s : allPeers.keySet()) { - ActorRef exampleActor = actorSystem.actorOf( - ExampleActor.props(s, withoutPeer(s)), s); + ActorRef exampleActor = createExampleActor(s); actorRefs.put(s, exampleActor); System.out.println("Created node:"+s); @@ -137,8 +144,7 @@ public class TestDriver { } Map newActorRefs = new HashMap(num); for (Map.Entry entry : newPeers.entrySet()) { - ActorRef exampleActor = actorSystem.actorOf( - ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey()); + ActorRef exampleActor = createExampleActor(entry.getKey()); newActorRefs.put(entry.getKey(), exampleActor); //now also add these new nodes as peers from the previous nodes @@ -200,7 +206,7 @@ public class TestDriver { String address = "akka://default/user/"+actorName; allPeers.put(actorName, address); - ActorRef exampleActor = actorSystem.actorOf(ExampleActor.props(actorName, withoutPeer(actorName)), actorName); + ActorRef exampleActor = createExampleActor(actorName); for (ActorRef actor : actorRefs.values()) { actor.tell(new AddRaftPeer(actorName, address), null); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java new file mode 100644 index 0000000000..4c6434aec4 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import scala.concurrent.duration.FiniteDuration; + +/** + * Configuration Parameter interface for configuring the Raft consensus system + *

+ * Any component using this implementation might want to provide an implementation of + * this interface to configure + * + * A default implementation will be used if none is provided. + * + * @author Kamal Rameshan + */ +public interface ConfigParams { + /** + * The minimum number of entries to be present in the in-memory Raft log + * for a snapshot to be taken + * + * @return long + */ + public long getSnapshotBatchCount(); + + /** + * The interval at which a heart beat message will be sent to the remote + * RaftActor + * + * @return FiniteDuration + */ + public FiniteDuration getHeartBeatInterval(); + + /** + * The interval in which a new election would get triggered if no leader is found + * + * Normally its set to atleast twice the heart beat interval + * + * @return FiniteDuration + */ + public FiniteDuration getElectionTimeOutInterval(); + + /** + * The maximum election time variance. The election is scheduled using both + * the Election Timeout and Variance + * + * @return int + */ + public int getElectionTimeVariance(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java new file mode 100644 index 0000000000..c633337226 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +/** + * Default implementation of the ConfigParams + * + * If no implementation is provided for ConfigParams, then this will be used. + */ +public class DefaultConfigParamsImpl implements ConfigParams { + + private static final int SNAPSHOT_BATCH_COUNT = 100000; + + /** + * The maximum election time variance + */ + private static final int ELECTION_TIME_MAX_VARIANCE = 100; + + + /** + * The interval at which a heart beat message will be sent to the remote + * RaftActor + *

+ * Since this is set to 100 milliseconds the Election timeout should be + * at least 200 milliseconds + */ + protected static final FiniteDuration HEART_BEAT_INTERVAL = + new FiniteDuration(100, TimeUnit.MILLISECONDS); + + + @Override + public long getSnapshotBatchCount() { + return SNAPSHOT_BATCH_COUNT; + } + + @Override + public FiniteDuration getHeartBeatInterval() { + return HEART_BEAT_INTERVAL; + } + + + @Override + public FiniteDuration getElectionTimeOutInterval() { + // returns 2 times the heart beat interval + return HEART_BEAT_INTERVAL.$times(2); + } + + @Override + public int getElectionTimeVariance() { + return ELECTION_TIME_MAX_VARIANCE; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 36c86f542d..70b85b4627 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,6 +22,7 @@ import akka.persistence.UntypedPersistentActor; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -80,8 +81,6 @@ public abstract class RaftActor extends UntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); - private static final int SNAPSHOT_ENTRY_COUNT = 100000; - /** * The current state determines the current behavior of a RaftActor * A Raft Actor always starts off in the Follower State @@ -101,10 +100,17 @@ public abstract class RaftActor extends UntypedPersistentActor { public RaftActor(String id, Map peerAddresses) { + this(id, peerAddresses, Optional.absent()); + } + + public RaftActor(String id, Map peerAddresses, + Optional configParams) { + context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), - id, new ElectionTermImpl(), - -1, -1, replicatedLog, peerAddresses, LOG); + this.getContext(), id, new ElectionTermImpl(), + -1, -1, replicatedLog, peerAddresses, + (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), + LOG); } @Override public void onReceiveRecover(Object message) { @@ -369,7 +375,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. deleteSnapshots(new SnapshotSelectionCriteria( - sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000)); + sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); // Trim akka journal deleteMessages(sequenceNumber); @@ -429,7 +435,7 @@ public abstract class RaftActor extends UntypedPersistentActor { new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (journal.size() > SNAPSHOT_ENTRY_COUNT) { + if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) { LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index ec80055459..0eb4b73779 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -161,4 +161,9 @@ public interface RaftActorContext { * @param peerAddress */ void setPeerAddress(String peerId, String peerAddress); + + /** + * @return ConfigParams + */ + public ConfigParams getConfigParams(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 0fbc55d1a4..25da37105c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -19,7 +19,7 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkState; -public class RaftActorContextImpl implements RaftActorContext{ +public class RaftActorContextImpl implements RaftActorContext { private final ActorRef actor; @@ -39,10 +39,14 @@ public class RaftActorContextImpl implements RaftActorContext{ private final LoggingAdapter LOG; + private final ConfigParams configParams; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, - long lastApplied, ReplicatedLog replicatedLog, Map peerAddresses, LoggingAdapter logger) { + long lastApplied, ReplicatedLog replicatedLog, + Map peerAddresses, ConfigParams configParams, + LoggingAdapter logger) { this.actor = actor; this.context = context; this.id = id; @@ -51,6 +55,7 @@ public class RaftActorContextImpl implements RaftActorContext{ this.lastApplied = lastApplied; this.replicatedLog = replicatedLog; this.peerAddresses = peerAddresses; + this.configParams = configParams; this.LOG = logger; } @@ -114,6 +119,10 @@ public class RaftActorContextImpl implements RaftActorContext{ return peerAddresses.get(peerId); } + @Override public ConfigParams getConfigParams() { + return configParams; + } + @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index f7281bb8e3..0a553b40fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -43,27 +43,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected final RaftActorContext context; - /** - * The maximum election time variance - */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - protected static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - /** - * The interval in which a new election would get triggered if no leader is found - */ - private static final long ELECTION_TIME_INTERVAL = - HEART_BEAT_INTERVAL.toMillis() * 2; - /** * */ @@ -208,9 +187,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @return */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, - TimeUnit.MILLISECONDS); + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } /** diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 53e47c2f84..2a44e8b7a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -112,8 +112,8 @@ public class Leader extends AbstractRaftActorBehavior { scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); scheduleInstallSnapshotCheck( - new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, - HEART_BEAT_INTERVAL.unit()) + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000, + context.getConfigParams().getHeartBeatInterval().unit()) ); } @@ -241,7 +241,7 @@ public class Leader extends AbstractRaftActorBehavior { (InstallSnapshotReply) message); } } finally { - scheduleHeartBeat(HEART_BEAT_INTERVAL); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } return super.handleMessage(sender, message); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 2e200cba27..aa50fa7442 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -177,7 +177,10 @@ public class MockRaftActorContext implements RaftActorContext { this.peerAddresses = peerAddresses; } - + @Override + public ConfigParams getConfigParams() { + return new DefaultConfigParamsImpl(); + } public static class SimpleReplicatedLog implements ReplicatedLog { private final List log = new ArrayList<>(); -- 2.36.6