A default implementation for the ConfigParams is provided.
The client can override and provide an impl of ConfigParams.
For now only 4 parameters have been introduced. More can be added.
ExampleActor and TestDriver provide examples.
Change-Id: I42a46a521ad9364d535de479c58873dc70740c9f
Signed-off-by: Kamal Rameshan <kramesha@cisco.com>
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.Creator;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
private long persistIdentifier = 1;
- public ExampleActor(String id, Map<String, String> peerAddresses) {
- super(id, peerAddresses);
+ public ExampleActor(String id, Map<String, String> peerAddresses,
+ Optional<ConfigParams> configParams) {
+ super(id, peerAddresses, configParams);
}
- public static Props props(final String id, final Map<String, String> peerAddresses){
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> configParams){
return Props.create(new Creator<ExampleActor>(){
@Override public ExampleActor create() throws Exception {
- return new ExampleActor(id, peerAddresses);
+ return new ExampleActor(id, peerAddresses, configParams);
}
});
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.example;
+
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+
+/**
+ * Implementation of ConfigParams for Example
+ */
+public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
+ @Override
+ public long getSnapshotBatchCount() {
+ return 50;
+ }
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public static void main(String[] args) throws Exception{
ActorRef example1Actor =
actorSystem.actorOf(ExampleActor.props("example-1",
- withoutPeer("example-1")), "example-1");
+ withoutPeer("example-1"), Optional.<ConfigParams>absent()), "example-1");
ActorRef example2Actor =
actorSystem.actorOf(ExampleActor.props("example-2",
- withoutPeer("example-2")), "example-2");
+ withoutPeer("example-2"), Optional.<ConfigParams>absent()), "example-2");
ActorRef example3Actor =
actorSystem.actorOf(ExampleActor.props("example-3",
- withoutPeer("example-3")), "example-3");
+ withoutPeer("example-3"), Optional.<ConfigParams>absent()), "example-3");
List<ActorRef> examples = Arrays.asList(example1Actor, example2Actor, example3Actor);
String actorName = "example-" + i;
examples.add(i - 1,
actorSystem.actorOf(ExampleActor.props(actorName,
- withoutPeer(actorName)), actorName));
+ withoutPeer(actorName), Optional.<ConfigParams>absent()),
+ actorName));
System.out.println("Created actor : " + actorName);
continue;
}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
private static LogGenerator logGenerator = new LogGenerator();
private int nameCounter = 0;
+ private static ConfigParams configParams = new ExampleConfigParamsImpl();
/**
* Create nodes, add clients and start logging.
}
}
+ public static ActorRef createExampleActor(String name) {
+ return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
+ Optional.of(configParams)), name);
+ }
+
public void createNodes(int num) {
for (int i=0; i < num; i++) {
nameCounter = nameCounter + 1;
}
for (String s : allPeers.keySet()) {
- ActorRef exampleActor = actorSystem.actorOf(
- ExampleActor.props(s, withoutPeer(s)), s);
+ ActorRef exampleActor = createExampleActor(s);
actorRefs.put(s, exampleActor);
System.out.println("Created node:"+s);
}
Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
for (Map.Entry<String, String> entry : newPeers.entrySet()) {
- ActorRef exampleActor = actorSystem.actorOf(
- ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey());
+ ActorRef exampleActor = createExampleActor(entry.getKey());
newActorRefs.put(entry.getKey(), exampleActor);
//now also add these new nodes as peers from the previous nodes
String address = "akka://default/user/"+actorName;
allPeers.put(actorName, address);
- ActorRef exampleActor = actorSystem.actorOf(ExampleActor.props(actorName, withoutPeer(actorName)), actorName);
+ ActorRef exampleActor = createExampleActor(actorName);
for (ActorRef actor : actorRefs.values()) {
actor.tell(new AddRaftPeer(actorName, address), null);
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Configuration Parameter interface for configuring the Raft consensus system
+ * <p/>
+ * Any component using this implementation might want to provide an implementation of
+ * this interface to configure
+ *
+ * A default implementation will be used if none is provided.
+ *
+ * @author Kamal Rameshan
+ */
+public interface ConfigParams {
+ /**
+ * The minimum number of entries to be present in the in-memory Raft log
+ * for a snapshot to be taken
+ *
+ * @return long
+ */
+ public long getSnapshotBatchCount();
+
+ /**
+ * The interval at which a heart beat message will be sent to the remote
+ * RaftActor
+ *
+ * @return FiniteDuration
+ */
+ public FiniteDuration getHeartBeatInterval();
+
+ /**
+ * The interval in which a new election would get triggered if no leader is found
+ *
+ * Normally its set to atleast twice the heart beat interval
+ *
+ * @return FiniteDuration
+ */
+ public FiniteDuration getElectionTimeOutInterval();
+
+ /**
+ * The maximum election time variance. The election is scheduled using both
+ * the Election Timeout and Variance
+ *
+ * @return int
+ */
+ public int getElectionTimeVariance();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default implementation of the ConfigParams
+ *
+ * If no implementation is provided for ConfigParams, then this will be used.
+ */
+public class DefaultConfigParamsImpl implements ConfigParams {
+
+ private static final int SNAPSHOT_BATCH_COUNT = 100000;
+
+ /**
+ * The maximum election time variance
+ */
+ private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+
+ /**
+ * The interval at which a heart beat message will be sent to the remote
+ * RaftActor
+ * <p/>
+ * Since this is set to 100 milliseconds the Election timeout should be
+ * at least 200 milliseconds
+ */
+ protected static final FiniteDuration HEART_BEAT_INTERVAL =
+ new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
+
+ @Override
+ public long getSnapshotBatchCount() {
+ return SNAPSHOT_BATCH_COUNT;
+ }
+
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return HEART_BEAT_INTERVAL;
+ }
+
+
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ // returns 2 times the heart beat interval
+ return HEART_BEAT_INTERVAL.$times(2);
+ }
+
+ @Override
+ public int getElectionTimeVariance() {
+ return ELECTION_TIME_MAX_VARIANCE;
+ }
+}
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
- private static final int SNAPSHOT_ENTRY_COUNT = 100000;
-
/**
* The current state determines the current behavior of a RaftActor
* A Raft Actor always starts off in the Follower State
public RaftActor(String id, Map<String, String> peerAddresses) {
+ this(id, peerAddresses, Optional.<ConfigParams>absent());
+ }
+
+ public RaftActor(String id, Map<String, String> peerAddresses,
+ Optional<ConfigParams> configParams) {
+
context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(),
- id, new ElectionTermImpl(),
- -1, -1, replicatedLog, peerAddresses, LOG);
+ this.getContext(), id, new ElectionTermImpl(),
+ -1, -1, replicatedLog, peerAddresses,
+ (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+ LOG);
}
@Override public void onReceiveRecover(Object message) {
// FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
// For now guessing that it is ANDed.
deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000));
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
// Trim akka journal
deleteMessages(sequenceNumber);
new Procedure<ReplicatedLogEntry>() {
public void apply(ReplicatedLogEntry evt) throws Exception {
// FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
- if (journal.size() > SNAPSHOT_ENTRY_COUNT) {
+ if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;
* @param peerAddress
*/
void setPeerAddress(String peerId, String peerAddress);
+
+ /**
+ * @return ConfigParams
+ */
+ public ConfigParams getConfigParams();
}
import static com.google.common.base.Preconditions.checkState;
-public class RaftActorContextImpl implements RaftActorContext{
+public class RaftActorContextImpl implements RaftActorContext {
private final ActorRef actor;
private final LoggingAdapter LOG;
+ private final ConfigParams configParams;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
- long lastApplied, ReplicatedLog replicatedLog, Map<String, String> peerAddresses, LoggingAdapter logger) {
+ long lastApplied, ReplicatedLog replicatedLog,
+ Map<String, String> peerAddresses, ConfigParams configParams,
+ LoggingAdapter logger) {
this.actor = actor;
this.context = context;
this.id = id;
this.lastApplied = lastApplied;
this.replicatedLog = replicatedLog;
this.peerAddresses = peerAddresses;
+ this.configParams = configParams;
this.LOG = logger;
}
return peerAddresses.get(peerId);
}
+ @Override public ConfigParams getConfigParams() {
+ return configParams;
+ }
+
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
*/
protected final RaftActorContext context;
- /**
- * The maximum election time variance
- */
- private static final int ELECTION_TIME_MAX_VARIANCE = 100;
-
- /**
- * The interval at which a heart beat message will be sent to the remote
- * RaftActor
- * <p/>
- * Since this is set to 100 milliseconds the Election timeout should be
- * at least 200 milliseconds
- */
- protected static final FiniteDuration HEART_BEAT_INTERVAL =
- new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
- /**
- * The interval in which a new election would get triggered if no leader is found
- */
- private static final long ELECTION_TIME_INTERVAL =
- HEART_BEAT_INTERVAL.toMillis() * 2;
-
/**
*
*/
* @return
*/
protected FiniteDuration electionDuration() {
- long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
- return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
- TimeUnit.MILLISECONDS);
+ long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
+ return context.getConfigParams().getElectionTimeOutInterval().$plus(
+ new FiniteDuration(variance, TimeUnit.MILLISECONDS));
}
/**
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
scheduleInstallSnapshotCheck(
- new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
- HEART_BEAT_INTERVAL.unit())
+ new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
+ context.getConfigParams().getHeartBeatInterval().unit())
);
}
(InstallSnapshotReply) message);
}
} finally {
- scheduleHeartBeat(HEART_BEAT_INTERVAL);
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
return super.handleMessage(sender, message);
this.peerAddresses = peerAddresses;
}
-
+ @Override
+ public ConfigParams getConfigParams() {
+ return new DefaultConfigParamsImpl();
+ }
public static class SimpleReplicatedLog implements ReplicatedLog {
private final List<ReplicatedLogEntry> log = new ArrayList<>();