From: Moiz Raja Date: Sat, 2 Aug 2014 04:17:41 +0000 (+0000) Subject: Merge "Ignore generated journal directory" X-Git-Tag: release/helium~365 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=fcb707ea1dadb4775c80327adef7edac67ff9db0;hp=d2305d79b180408faa265ff56e1070820971ba59 Merge "Ignore generated journal directory" --- diff --git a/features/config-netty/src/main/resources/features.xml b/features/config-netty/src/main/resources/features.xml index 3121ca01a2..f1b2d1f753 100644 --- a/features/config-netty/src/main/resources/features.xml +++ b/features/config-netty/src/main/resources/features.xml @@ -12,5 +12,6 @@ mvn:org.opendaylight.controller/threadpool-config-api/${project.version} mvn:org.opendaylight.controller/threadpool-config-impl/${project.version} odl-config-startup + mvn:org.opendaylight.controller/config-netty-config/${config.version}/xml/config \ No newline at end of file diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index be88e4a505..cddff45191 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -1240,6 +1240,26 @@ sal-rest-connector-config ${mdsal.version} + + org.opendaylight.controller + config-netty-config + ${config.version} + + + org.opendaylight.controller + md-sal-config + ${mdsal.version} + + + org.opendaylight.controller + netconf-config + ${netconf.version} + + + org.opendaylight.controller + netconf-connector-config + ${netconf.version} + org.opendaylight.controller sal-rest-docgen @@ -1496,6 +1516,11 @@ sample-toaster-provider ${mdsal.version} + + org.opendaylight.controller.samples + toaster-config + ${mdsal.version} + org.opendaylight.controller.thirdparty com.sun.jersey.jersey-servlet diff --git a/opendaylight/config/config-netty-config/pom.xml b/opendaylight/config/config-netty-config/pom.xml new file mode 100644 index 0000000000..8dc31dcc4e --- /dev/null +++ b/opendaylight/config/config-netty-config/pom.xml @@ -0,0 +1,46 @@ + + + + + 4.0.0 + + org.opendaylight.controller + config-subsystem + 0.2.5-SNAPSHOT + + config-netty-config + Configuration files for sal-rest-connector + jar + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/00-netty.xml + xml + config + + + + + + + + + diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/00-netty.xml b/opendaylight/config/config-netty-config/src/main/resources/initial/00-netty.xml similarity index 100% rename from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/00-netty.xml rename to opendaylight/config/config-netty-config/src/main/resources/initial/00-netty.xml diff --git a/opendaylight/config/pom.xml b/opendaylight/config/pom.xml index 66bb01f051..26fac47e71 100644 --- a/opendaylight/config/pom.xml +++ b/opendaylight/config/pom.xml @@ -39,6 +39,7 @@ shutdown-impl netconf-config-dispatcher config-module-archetype + config-netty-config diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index 541c1300f3..4d0770f8cb 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -745,7 +745,7 @@ generate-resources ${project.build.directory}/configuration - sal-rest-connector-config + sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config **\/*.xml true false @@ -1034,6 +1034,26 @@ org.opendaylight.controller sal-rest-connector-config + + org.opendaylight.controller + config-netty-config + + + org.opendaylight.controller + md-sal-config + + + org.opendaylight.controller + netconf-config + + + org.opendaylight.controller + netconf-connector-config + + + org.opendaylight.controller.samples + toaster-config + org.opendaylight.controller sal-rest-docgen @@ -1318,7 +1338,7 @@ generate-resources ${project.build.directory}/configuration - sal-rest-connector-config + sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config **\/*.xml true false diff --git a/opendaylight/md-sal/md-sal-config/pom.xml b/opendaylight/md-sal/md-sal-config/pom.xml new file mode 100644 index 0000000000..2e19b5a60c --- /dev/null +++ b/opendaylight/md-sal/md-sal-config/pom.xml @@ -0,0 +1,46 @@ + + + + + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.1-SNAPSHOT + + md-sal-config + Configuration files for md-sal + jar + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/01-md-sal.xml + xml + config + + + + + + + + + diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml b/opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml similarity index 100% rename from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml rename to opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index 37da4d2b88..02fbde8f18 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -32,6 +32,9 @@ sal-binding-util + + md-sal-config + samples diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 641ec0582c..aa100df9d0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,10 +11,12 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -31,15 +33,17 @@ public class ExampleActor extends RaftActor { private long persistIdentifier = 1; - public ExampleActor(String id, Map peerAddresses) { - super(id, peerAddresses); + public ExampleActor(String id, Map peerAddresses, + Optional configParams) { + super(id, peerAddresses, configParams); } - public static Props props(final String id, final Map peerAddresses){ + public static Props props(final String id, final Map peerAddresses, + final Optional configParams){ return Props.create(new Creator(){ @Override public ExampleActor create() throws Exception { - return new ExampleActor(id, peerAddresses); + return new ExampleActor(id, peerAddresses, configParams); } }); } @@ -56,10 +60,12 @@ public class ExampleActor extends RaftActor { } } else if (message instanceof PrintState) { - LOG.debug("State of the node:"+getId() + " has = "+state.size() + " entries"); + LOG.debug("State of the node:{} has entries={}, {}", + getId(), state.size(), getReplicatedLogState()); } else if (message instanceof PrintRole) { - LOG.debug(getId() + " = " + getRaftState()); + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers()); + } else { super.onReceiveCommand(message); } @@ -83,6 +89,7 @@ public class ExampleActor extends RaftActor { @Override protected void applySnapshot(Object snapshot) { state.clear(); state.putAll((HashMap) snapshot); + LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size()); } @Override public void onReceiveRecover(Object message) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java new file mode 100644 index 0000000000..d11377dbcb --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.example; + +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; + +/** + * Implementation of ConfigParams for Example + */ +public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl { + @Override + public long getSnapshotBatchCount() { + return 50; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java index a148ed4009..0e5d643a64 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java @@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.example.messages.KeyValue; +import org.opendaylight.controller.cluster.raft.ConfigParams; import java.io.BufferedReader; import java.io.InputStreamReader; @@ -34,15 +36,15 @@ public class Main { public static void main(String[] args) throws Exception{ ActorRef example1Actor = actorSystem.actorOf(ExampleActor.props("example-1", - withoutPeer("example-1")), "example-1"); + withoutPeer("example-1"), Optional.absent()), "example-1"); ActorRef example2Actor = actorSystem.actorOf(ExampleActor.props("example-2", - withoutPeer("example-2")), "example-2"); + withoutPeer("example-2"), Optional.absent()), "example-2"); ActorRef example3Actor = actorSystem.actorOf(ExampleActor.props("example-3", - withoutPeer("example-3")), "example-3"); + withoutPeer("example-3"), Optional.absent()), "example-3"); List examples = Arrays.asList(example1Actor, example2Actor, example3Actor); @@ -74,7 +76,8 @@ public class Main { String actorName = "example-" + i; examples.add(i - 1, actorSystem.actorOf(ExampleActor.props(actorName, - withoutPeer(actorName)), actorName)); + withoutPeer(actorName), Optional.absent()), + actorName)); System.out.println("Created actor : " + actorName); continue; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java index c2d0b3a6b7..fd6e192bf0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java @@ -2,8 +2,10 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; @@ -11,14 +13,13 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; /** * This is a test driver for testing akka-raft implementation * Its uses ExampleActors and threads to push content(key-vals) to these actors * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns - * a thread and starts push logs to the actor its assignged to. + * a thread and starts push logs to the actor its assigned to. */ public class TestDriver { @@ -26,7 +27,9 @@ public class TestDriver { private static Map allPeers = new HashMap<>(); private static Map clientActorRefs = new HashMap(); private static Map actorRefs = new HashMap(); - private static LogGenerator logGenerator = new LogGenerator();; + private static LogGenerator logGenerator = new LogGenerator(); + private int nameCounter = 0; + private static ConfigParams configParams = new ExampleConfigParamsImpl(); /** * Create nodes, add clients and start logging. @@ -35,6 +38,7 @@ public class TestDriver { * createNodes:{num} * addNodes:{num} * stopNode:{nodeName} + * reinstateNode:{nodeName} * addClients:{num} * addClientsToNode:{nodeName, num} * startLogging @@ -83,6 +87,10 @@ public class TestDriver { String[] arr = command.split(":"); td.stopNode(arr[1]); + } else if (command.startsWith("reinstateNode")) { + String[] arr = command.split(":"); + td.reinstateNode(arr[1]); + } else if (command.startsWith("startLogging")) { td.startAllLogging(); @@ -106,15 +114,19 @@ public class TestDriver { } } + public static ActorRef createExampleActor(String name) { + return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name), + Optional.of(configParams)), name); + } + public void createNodes(int num) { for (int i=0; i < num; i++) { - int rand = getUnusedRandom(num); - allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + nameCounter = nameCounter + 1; + allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); } for (String s : allPeers.keySet()) { - ActorRef exampleActor = actorSystem.actorOf( - ExampleActor.props(s, withoutPeer(s)), s); + ActorRef exampleActor = createExampleActor(s); actorRefs.put(s, exampleActor); System.out.println("Created node:"+s); @@ -125,15 +137,14 @@ public class TestDriver { public void addNodes(int num) { Map newPeers = new HashMap<>(); for (int i=0; i < num; i++) { - int rand = getUnusedRandom(num); - newPeers.put("example-"+rand, "akka://default/user/example-"+rand); - allPeers.put("example-"+rand, "akka://default/user/example-"+rand); + nameCounter = nameCounter + 1; + newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); + allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter); } Map newActorRefs = new HashMap(num); for (Map.Entry entry : newPeers.entrySet()) { - ActorRef exampleActor = actorSystem.actorOf( - ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey()); + ActorRef exampleActor = createExampleActor(entry.getKey()); newActorRefs.put(entry.getKey(), exampleActor); //now also add these new nodes as peers from the previous nodes @@ -165,7 +176,7 @@ public class TestDriver { public void addClientsToNode(String actorName, int num) { ActorRef actorRef = actorRefs.get(actorName); for (int i=0; i < num; i++) { - String clientName = "client-" + i + "-" + actorRef; + String clientName = "client-" + i + "-" + actorName; clientActorRefs.put(clientName, actorSystem.actorOf(ClientActor.props(actorRef), clientName)); System.out.println("Added client-node:" + clientName); @@ -174,11 +185,13 @@ public class TestDriver { public void stopNode(String actorName) { ActorRef actorRef = actorRefs.get(actorName); - String clientName = "client-"+actorName; - if(clientActorRefs.containsKey(clientName)) { - actorSystem.stop(clientActorRefs.get(clientName)); - clientActorRefs.remove(clientName); + + for (Map.Entry entry : clientActorRefs.entrySet()) { + if (entry.getKey().endsWith(actorName)) { + actorSystem.stop(entry.getValue()); + } } + actorSystem.stop(actorRef); actorRefs.remove(actorName); @@ -187,7 +200,21 @@ public class TestDriver { } allPeers.remove(actorName); + } + public void reinstateNode(String actorName) { + String address = "akka://default/user/"+actorName; + allPeers.put(actorName, address); + + ActorRef exampleActor = createExampleActor(actorName); + + for (ActorRef actor : actorRefs.values()) { + actor.tell(new AddRaftPeer(actorName, address), null); + } + + actorRefs.put(actorName, exampleActor); + + addClientsToNode(actorName, 1); } public void startAllLogging() { @@ -232,14 +259,6 @@ public class TestDriver { return null; } - private int getUnusedRandom(int num) { - int rand = -1; - do { - rand = (new Random()).nextInt(num * num); - } while (allPeers.keySet().contains("example-"+rand)); - - return rand; - } private static Map withoutPeer(String peerId) { Map without = new ConcurrentHashMap<>(allPeers); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java new file mode 100644 index 0000000000..24bfa3de21 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract class handling the mapping of + * logical LogEntry Index and the physical list index. + */ +public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { + + protected final List journal; + protected final Object snapshot; + protected long snapshotIndex = -1; + protected long snapshotTerm = -1; + + public AbstractReplicatedLogImpl(Object state, long snapshotIndex, + long snapshotTerm, List unAppliedEntries) { + this.snapshot = state; + this.snapshotIndex = snapshotIndex; + this.snapshotTerm = snapshotTerm; + this.journal = new ArrayList<>(unAppliedEntries); + } + + + public AbstractReplicatedLogImpl() { + this.snapshot = null; + this.journal = new ArrayList<>(); + } + + protected int adjustedIndex(long logEntryIndex) { + if(snapshotIndex < 0){ + return (int) logEntryIndex; + } + return (int) (logEntryIndex - (snapshotIndex + 1)); + } + + @Override + public ReplicatedLogEntry get(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); + + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + // physical index should be less than list size and >= 0 + return null; + } + + return journal.get(adjustedIndex); + } + + @Override + public ReplicatedLogEntry last() { + if (journal.isEmpty()) { + return null; + } + // get the last entry directly from the physical index + return journal.get(journal.size() - 1); + } + + @Override + public long lastIndex() { + if (journal.isEmpty()) { + // it can happen that after snapshot, all the entries of the + // journal are trimmed till lastApplied, so lastIndex = snapshotIndex + return snapshotIndex; + } + return last().getIndex(); + } + + @Override + public long lastTerm() { + if (journal.isEmpty()) { + // it can happen that after snapshot, all the entries of the + // journal are trimmed till lastApplied, so lastTerm = snapshotTerm + return snapshotTerm; + } + return last().getTerm(); + } + + @Override + public void removeFrom(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); + if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + // physical index should be less than list size and >= 0 + return; + } + journal.subList(adjustedIndex , journal.size()).clear(); + } + + @Override + public void append(ReplicatedLogEntry replicatedLogEntry) { + journal.add(replicatedLogEntry); + } + + @Override + public List getFrom(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); + int size = journal.size(); + List entries = new ArrayList<>(100); + if (adjustedIndex >= 0 && adjustedIndex < size) { + // physical index should be less than list size and >= 0 + entries.addAll(journal.subList(adjustedIndex, size)); + } + return entries; + } + + @Override + public long size() { + return journal.size(); + } + + @Override + public boolean isPresent(long logEntryIndex) { + if (logEntryIndex > lastIndex()) { + // if the request logical index is less than the last present in the list + return false; + } + int adjustedIndex = adjustedIndex(logEntryIndex); + return (adjustedIndex >= 0); + } + + @Override + public boolean isInSnapshot(long logEntryIndex) { + return logEntryIndex <= snapshotIndex; + } + + @Override + public Object getSnapshot() { + return snapshot; + } + + @Override + public long getSnapshotIndex() { + return snapshotIndex; + } + + @Override + public long getSnapshotTerm() { + return snapshotTerm; + } + + @Override + public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry); + + @Override + public abstract void removeFromAndPersist(long index); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java new file mode 100644 index 0000000000..4c6434aec4 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import scala.concurrent.duration.FiniteDuration; + +/** + * Configuration Parameter interface for configuring the Raft consensus system + *

+ * Any component using this implementation might want to provide an implementation of + * this interface to configure + * + * A default implementation will be used if none is provided. + * + * @author Kamal Rameshan + */ +public interface ConfigParams { + /** + * The minimum number of entries to be present in the in-memory Raft log + * for a snapshot to be taken + * + * @return long + */ + public long getSnapshotBatchCount(); + + /** + * The interval at which a heart beat message will be sent to the remote + * RaftActor + * + * @return FiniteDuration + */ + public FiniteDuration getHeartBeatInterval(); + + /** + * The interval in which a new election would get triggered if no leader is found + * + * Normally its set to atleast twice the heart beat interval + * + * @return FiniteDuration + */ + public FiniteDuration getElectionTimeOutInterval(); + + /** + * The maximum election time variance. The election is scheduled using both + * the Election Timeout and Variance + * + * @return int + */ + public int getElectionTimeVariance(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java new file mode 100644 index 0000000000..c633337226 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +/** + * Default implementation of the ConfigParams + * + * If no implementation is provided for ConfigParams, then this will be used. + */ +public class DefaultConfigParamsImpl implements ConfigParams { + + private static final int SNAPSHOT_BATCH_COUNT = 100000; + + /** + * The maximum election time variance + */ + private static final int ELECTION_TIME_MAX_VARIANCE = 100; + + + /** + * The interval at which a heart beat message will be sent to the remote + * RaftActor + *

+ * Since this is set to 100 milliseconds the Election timeout should be + * at least 200 milliseconds + */ + protected static final FiniteDuration HEART_BEAT_INTERVAL = + new FiniteDuration(100, TimeUnit.MILLISECONDS); + + + @Override + public long getSnapshotBatchCount() { + return SNAPSHOT_BATCH_COUNT; + } + + @Override + public FiniteDuration getHeartBeatInterval() { + return HEART_BEAT_INTERVAL; + } + + + @Override + public FiniteDuration getElectionTimeOutInterval() { + // returns 2 times the heart beat interval + return HEART_BEAT_INTERVAL.$times(2); + } + + @Override + public int getElectionTimeVariance() { + return ELECTION_TIME_MAX_VARIANCE; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b8e9653bc5..70b85b4627 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -22,6 +22,7 @@ import akka.persistence.UntypedPersistentActor; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import com.google.common.base.Optional; import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -33,7 +34,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -100,14 +100,22 @@ public abstract class RaftActor extends UntypedPersistentActor { public RaftActor(String id, Map peerAddresses) { + this(id, peerAddresses, Optional.absent()); + } + + public RaftActor(String id, Map peerAddresses, + Optional configParams) { + context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), - id, new ElectionTermImpl(), - -1, -1, replicatedLog, peerAddresses, LOG); + this.getContext(), id, new ElectionTermImpl(), + -1, -1, replicatedLog, peerAddresses, + (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), + LOG); } @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { + LOG.debug("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -116,6 +124,13 @@ public abstract class RaftActor extends UntypedPersistentActor { // when we need to install it on a peer replicatedLog = new ReplicatedLogImpl(snapshot); + context.setReplicatedLog(replicatedLog); + + LOG.debug("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", + replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, + replicatedLog.size()); + // Apply the snapshot to the actors state applySnapshot(snapshot.getState()); @@ -127,7 +142,11 @@ public abstract class RaftActor extends UntypedPersistentActor { context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor()); } else if (message instanceof RecoveryCompleted) { LOG.debug( - "Last index in log : " + replicatedLog.lastIndex()); + "RecoveryCompleted - Switching actor to Follower - " + + "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "journal-size={}", + replicatedLog.lastIndex(), replicatedLog.snapshotIndex, + replicatedLog.snapshotTerm, replicatedLog.size()); currentBehavior = switchBehavior(RaftState.Follower); } } @@ -191,6 +210,15 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + public java.util.Set getPeers() { + return context.getPeerAddresses().keySet(); + } + + protected String getReplicatedLogState() { + return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + context.getReplicatedLog().size(); + } /** @@ -343,85 +371,33 @@ public abstract class RaftActor extends UntypedPersistentActor { } private void trimPersistentData(long sequenceNumber) { - // Trim snapshots + // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. deleteSnapshots(new SnapshotSelectionCriteria( - sequenceNumber - 100000, 43200000)); + sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); - // Trim journal + // Trim akka journal deleteMessages(sequenceNumber); } - private class ReplicatedLogImpl implements ReplicatedLog { - private final List journal; - private final Object snapshot; - private long snapshotIndex = -1; - private long snapshotTerm = -1; + private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - this.snapshot = snapshot.getState(); - this.snapshotIndex = snapshot.getLastAppliedIndex(); - this.snapshotTerm = snapshot.getLastAppliedTerm(); - - this.journal = new ArrayList<>(snapshot.getUnAppliedEntries()); + super(snapshot.getState(), + snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + snapshot.getUnAppliedEntries()); } public ReplicatedLogImpl() { - this.snapshot = null; - this.journal = new ArrayList<>(); + super(); } - @Override public ReplicatedLogEntry get(long index) { - int adjustedIndex = adjustedIndex(index); + @Override public void removeFromAndPersist(long logEntryIndex) { + int adjustedIndex = adjustedIndex(logEntryIndex); - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return null; - } - - return journal.get(adjustedIndex); - } - - @Override public ReplicatedLogEntry last() { - if (journal.size() == 0) { - return null; - } - return get(journal.size() - 1); - } - - @Override public long lastIndex() { - if (journal.size() == 0) { - return -1; - } - - return last().getIndex(); - } - - @Override public long lastTerm() { - if (journal.size() == 0) { - return -1; - } - - return last().getTerm(); - } - - - @Override public void removeFrom(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return; - } - - journal.subList(adjustedIndex , journal.size()).clear(); - } - - - @Override public void removeFromAndPersist(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { + if (adjustedIndex < 0) { return; } @@ -435,29 +411,6 @@ public abstract class RaftActor extends UntypedPersistentActor { //FIXME : Doing nothing for now } }); - - - } - - @Override public void append( - final ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); - } - - @Override public List getFrom(long index) { - int adjustedIndex = adjustedIndex(index); - - List entries = new ArrayList<>(100); - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return entries; - } - - - for (int i = adjustedIndex; - i < journal.size(); i++) { - entries.add(journal.get(i)); - } - return entries; } @Override public void appendAndPersist( @@ -482,20 +435,42 @@ public abstract class RaftActor extends UntypedPersistentActor { new Procedure() { public void apply(ReplicatedLogEntry evt) throws Exception { // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned. - if (size() > 100000) { - ReplicatedLogEntry lastAppliedEntry = - get(context.getLastApplied()); + if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) { + LOG.info("Initiating Snapshot Capture.."); long lastAppliedIndex = -1; long lastAppliedTerm = -1; + + ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied()); if (lastAppliedEntry != null) { lastAppliedIndex = lastAppliedEntry.getIndex(); lastAppliedTerm = lastAppliedEntry.getTerm(); } - saveSnapshot(Snapshot.create(createSnapshot(), + LOG.debug("Snapshot Capture logSize: {}", journal.size()); + LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied()); + LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex); + LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm); + + // create a snapshot object from the state provided and save it + // when snapshot is saved async, SaveSnapshotSuccess is raised. + Snapshot sn = Snapshot.create(createSnapshot(), getFrom(context.getLastApplied() + 1), lastIndex(), lastTerm(), lastAppliedIndex, - lastAppliedTerm)); + lastAppliedTerm); + saveSnapshot(sn); + + LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); + + //be greedy and remove entries from in-mem journal which are in the snapshot + // and update snapshotIndex and snapshotTerm without waiting for the success, + // TODO: damage-recovery to be done on failure + journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear(); + snapshotIndex = lastAppliedIndex; + snapshotTerm = lastAppliedTerm; + + LOG.info("Removed in-memory snapshotted entries, " + + "adjusted snaphsotIndex:{}" + + "and term:{}", snapshotIndex, lastAppliedTerm); } // Send message for replication if (clientActor != null) { @@ -509,46 +484,8 @@ public abstract class RaftActor extends UntypedPersistentActor { ); } - @Override public long size() { - return journal.size() + snapshotIndex + 1; - } - - @Override public boolean isPresent(long index) { - int adjustedIndex = adjustedIndex(index); - - if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { - return false; - } - return true; - } - - @Override public boolean isInSnapshot(long index) { - return index <= snapshotIndex; - } - - @Override public Object getSnapshot() { - return snapshot; - } - - @Override public long getSnapshotIndex() { - return snapshotIndex; - } - - @Override public long getSnapshotTerm() { - return snapshotTerm; - } - - private int adjustedIndex(long index) { - if(snapshotIndex < 0){ - return (int) index; - } - return (int) (index - snapshotIndex); - } } - - - private static class DeleteEntries implements Serializable { private final int fromIndex; @@ -609,6 +546,17 @@ public abstract class RaftActor extends UntypedPersistentActor { public long getLastAppliedTerm() { return lastAppliedTerm; } + + public String getLogMessage() { + StringBuilder sb = new StringBuilder(); + return sb.append("Snapshot={") + .append("lastTerm:" + this.getLastTerm() + ", ") + .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ") + .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ") + .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}") + .toString(); + + } } private class ElectionTermImpl implements ElectionTerm { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java index ae9431a43a..0eb4b73779 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java @@ -85,6 +85,12 @@ public interface RaftActorContext { */ void setLastApplied(long lastApplied); + /** + * + * @param replicatedLog + */ + public void setReplicatedLog(ReplicatedLog replicatedLog); + /** * @return A representation of the log */ @@ -155,4 +161,9 @@ public interface RaftActorContext { * @param peerAddress */ void setPeerAddress(String peerId, String peerAddress); + + /** + * @return ConfigParams + */ + public ConfigParams getConfigParams(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 833c8a9e8a..25da37105c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -19,7 +19,7 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkState; -public class RaftActorContextImpl implements RaftActorContext{ +public class RaftActorContextImpl implements RaftActorContext { private final ActorRef actor; @@ -33,16 +33,20 @@ public class RaftActorContextImpl implements RaftActorContext{ private long lastApplied; - private final ReplicatedLog replicatedLog; + private ReplicatedLog replicatedLog; private final Map peerAddresses; private final LoggingAdapter LOG; + private final ConfigParams configParams; + public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id, ElectionTerm termInformation, long commitIndex, - long lastApplied, ReplicatedLog replicatedLog, Map peerAddresses, LoggingAdapter logger) { + long lastApplied, ReplicatedLog replicatedLog, + Map peerAddresses, ConfigParams configParams, + LoggingAdapter logger) { this.actor = actor; this.context = context; this.id = id; @@ -51,6 +55,7 @@ public class RaftActorContextImpl implements RaftActorContext{ this.lastApplied = lastApplied; this.replicatedLog = replicatedLog; this.peerAddresses = peerAddresses; + this.configParams = configParams; this.LOG = logger; } @@ -90,6 +95,10 @@ public class RaftActorContextImpl implements RaftActorContext{ this.lastApplied = lastApplied; } + @Override public void setReplicatedLog(ReplicatedLog replicatedLog) { + this.replicatedLog = replicatedLog; + } + @Override public ReplicatedLog getReplicatedLog() { return replicatedLog; } @@ -110,6 +119,10 @@ public class RaftActorContextImpl implements RaftActorContext{ return peerAddresses.get(peerId); } + @Override public ConfigParams getConfigParams() { + return configParams; + } + @Override public void addToPeers(String name, String address) { peerAddresses.put(name, address); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index f7281bb8e3..0a553b40fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -43,27 +43,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { */ protected final RaftActorContext context; - /** - * The maximum election time variance - */ - private static final int ELECTION_TIME_MAX_VARIANCE = 100; - - /** - * The interval at which a heart beat message will be sent to the remote - * RaftActor - *

- * Since this is set to 100 milliseconds the Election timeout should be - * at least 200 milliseconds - */ - protected static final FiniteDuration HEART_BEAT_INTERVAL = - new FiniteDuration(100, TimeUnit.MILLISECONDS); - - /** - * The interval in which a new election would get triggered if no leader is found - */ - private static final long ELECTION_TIME_INTERVAL = - HEART_BEAT_INTERVAL.toMillis() * 2; - /** * */ @@ -208,9 +187,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { * @return */ protected FiniteDuration electionDuration() { - long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE); - return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, - TimeUnit.MILLISECONDS); + long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance()); + return context.getConfigParams().getElectionTimeOutInterval().$plus( + new FiniteDuration(variance, TimeUnit.MILLISECONDS)); } /** diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index db62dfc2ac..c8cd41dfa1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -126,15 +126,10 @@ public class Follower extends AbstractRaftActorBehavior { int addEntriesFrom = 0; if (context.getReplicatedLog().size() > 0) { - // Find the entry up until which the one that is not in the - // follower's log - for (int i = 0; - i < appendEntries.getEntries() - .size(); i++, addEntriesFrom++) { - ReplicatedLogEntry matchEntry = - appendEntries.getEntries().get(i); - ReplicatedLogEntry newEntry = context.getReplicatedLog() - .get(matchEntry.getIndex()); + // Find the entry up until which the one that is not in the follower's log + for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) { + ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i); + ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex()); if (newEntry == null) { //newEntry not found in the log diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 53e47c2f84..2a44e8b7a5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -112,8 +112,8 @@ public class Leader extends AbstractRaftActorBehavior { scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); scheduleInstallSnapshotCheck( - new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, - HEART_BEAT_INTERVAL.unit()) + new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000, + context.getConfigParams().getHeartBeatInterval().unit()) ); } @@ -241,7 +241,7 @@ public class Leader extends AbstractRaftActorBehavior { (InstallSnapshotReply) message); } } finally { - scheduleHeartBeat(HEART_BEAT_INTERVAL); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } return super.handleMessage(sender, message); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java new file mode 100644 index 0000000000..ae8e525233 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import static org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry; +/** +* +*/ +public class AbstractReplicatedLogImplTest { + + private MockAbstractReplicatedLogImpl replicatedLogImpl; + + @Before + public void setUp() { + replicatedLogImpl = new MockAbstractReplicatedLogImpl(); + } + + @After + public void tearDown() { + replicatedLogImpl.journal.clear(); + replicatedLogImpl.setSnapshotIndex(-1); + replicatedLogImpl.setSnapshotTerm(-1); + replicatedLogImpl = null; + } + + @Test + public void testIndexOperations() { + // create a set of initial entries in the in-memory log + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 0, new MockPayload("A"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 1, new MockPayload("B"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(1, 2, new MockPayload("C"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 3, new MockPayload("D"))); + + // check if the values returned are correct, with snapshotIndex = -1 + assertEquals("B", replicatedLogImpl.get(1).getData().toString()); + assertEquals("D", replicatedLogImpl.last().getData().toString()); + assertEquals(3, replicatedLogImpl.lastIndex()); + assertEquals(2, replicatedLogImpl.lastTerm()); + assertEquals(2, replicatedLogImpl.getFrom(2).size()); + assertEquals(4, replicatedLogImpl.size()); + assertTrue(replicatedLogImpl.isPresent(2)); + assertFalse(replicatedLogImpl.isPresent(4)); + assertFalse(replicatedLogImpl.isInSnapshot(2)); + + // now create a snapshot of 3 entries, with 1 unapplied entry left in the log + // It removes the entries which have made it to snapshot + // and updates the snapshot index and term + Map state = takeSnapshot(3); + + // check the values after the snapshot. + // each index value passed in the test is the logical index (log entry index) + // which gets mapped to the list's physical index + assertEquals("D", replicatedLogImpl.get(3).getData().toString()); + assertEquals("D", replicatedLogImpl.last().getData().toString()); + assertNull(replicatedLogImpl.get(1)); + assertEquals(3, replicatedLogImpl.lastIndex()); + assertEquals(2, replicatedLogImpl.lastTerm()); + assertEquals(0, replicatedLogImpl.getFrom(2).size()); + assertEquals(1, replicatedLogImpl.size()); + assertFalse(replicatedLogImpl.isPresent(2)); + assertTrue(replicatedLogImpl.isPresent(3)); + assertFalse(replicatedLogImpl.isPresent(4)); + assertTrue(replicatedLogImpl.isInSnapshot(2)); + + // append few more entries + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G"))); + replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H"))); + + // check their values as well + assertEquals(5, replicatedLogImpl.size()); + assertEquals("D", replicatedLogImpl.get(3).getData().toString()); + assertEquals("E", replicatedLogImpl.get(4).getData().toString()); + assertEquals("H", replicatedLogImpl.last().getData().toString()); + assertEquals(3, replicatedLogImpl.lastTerm()); + assertEquals(7, replicatedLogImpl.lastIndex()); + assertTrue(replicatedLogImpl.isPresent(7)); + assertFalse(replicatedLogImpl.isInSnapshot(7)); + assertEquals(1, replicatedLogImpl.getFrom(7).size()); + assertEquals(2, replicatedLogImpl.getFrom(6).size()); + + // take a second snapshot with 5 entries with 0 unapplied entries left in the log + state = takeSnapshot(5); + + assertEquals(0, replicatedLogImpl.size()); + assertNull(replicatedLogImpl.last()); + assertNull(replicatedLogImpl.get(7)); + assertNull(replicatedLogImpl.get(1)); + assertFalse(replicatedLogImpl.isPresent(7)); + assertTrue(replicatedLogImpl.isInSnapshot(7)); + assertEquals(0, replicatedLogImpl.getFrom(7).size()); + assertEquals(0, replicatedLogImpl.getFrom(6).size()); + + } + + // create a snapshot for test + public Map takeSnapshot(int numEntries) { + Map map = new HashMap(numEntries); + List entries = replicatedLogImpl.getEntriesTill(numEntries); + for (ReplicatedLogEntry entry : entries) { + map.put(entry.getIndex(), entry.getData().toString()); + } + + int term = (int) replicatedLogImpl.lastTerm(); + int lastIndex = (int) entries.get(entries.size() - 1).getIndex(); + entries.clear(); + replicatedLogImpl.setSnapshotTerm(term); + replicatedLogImpl.setSnapshotIndex(lastIndex); + + return map; + + } + class MockAbstractReplicatedLogImpl extends AbstractReplicatedLogImpl { + @Override + public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry) { + } + + @Override + public void removeFromAndPersist(long index) { + } + + public void setSnapshotIndex(long snapshotIndex) { + this.snapshotIndex = snapshotIndex; + } + + public void setSnapshotTerm(long snapshotTerm) { + this.snapshotTerm = snapshotTerm; + } + + public List getEntriesTill(int index) { + return journal.subList(0, index); + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 2e200cba27..aa50fa7442 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -177,7 +177,10 @@ public class MockRaftActorContext implements RaftActorContext { this.peerAddresses = peerAddresses; } - + @Override + public ConfigParams getConfigParams() { + return new DefaultConfigParamsImpl(); + } public static class SimpleReplicatedLog implements ReplicatedLog { private final List log = new ArrayList<>(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java new file mode 100644 index 0000000000..1f1a0f5cc6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +public class ActorConstants { + public static final String RPC_BROKER = "rpc-broker"; + public static final String RPC_REGISTRY = "rpc-registry"; + public static final String RPC_MANAGER = "rpc"; + + public static final String RPC_BROKER_PATH= "/user/rpc/rpc-broker"; + public static final String RPC_REGISTRY_PATH = "/user/rpc/rpc-registry"; +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java index 3df572d7c2..ac50b8fe5b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -68,7 +68,7 @@ public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContext SchemaService schemaService = brokerSession.getService(SchemaService.class); schemaContext = schemaService.getGlobalContext(); - rpcManager = actorSystem.actorOf(RpcManager.props(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry), "rpc"); + rpcManager = actorSystem.actorOf(RpcManager.props(clusterWrapper, schemaContext, brokerSession, rpcProvisionRegistry), ActorConstants.RPC_MANAGER); LOG.debug("Rpc actors are created."); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 4925a17c13..5c56455bd0 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -72,14 +72,14 @@ public class RpcManager extends AbstractUntypedActor { private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); - rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry"); - rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker"); + rpcRegistry = getContext().actorOf(RpcRegistry.props(clusterWrapper), ActorConstants.RPC_REGISTRY); + rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER); } private void startListeners() { LOG.debug("Registers rpc listeners"); - String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc/rpc-broker"; + String rpcBrokerPath = clusterWrapper.getAddress().toString() + ActorConstants.RPC_BROKER_PATH; rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath); routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath); rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 7cb505aa98..e36060cc13 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -14,6 +14,7 @@ import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.japi.Creator; import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.ActorConstants; import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; import org.opendaylight.controller.remote.rpc.messages.AddRpc; import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; @@ -171,7 +172,7 @@ public class RpcRegistry extends AbstractUntypedActor { } if(i == index) { if(!currentNodeAddress.equals(member.address())) { - actor = this.context().actorSelection(member.address() + "/user/rpc-registry"); + actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH); break; } else if(index < memberSize-1){ // pick the next element in the set index++; @@ -180,7 +181,7 @@ public class RpcRegistry extends AbstractUntypedActor { i++; } if(actor == null && previousMember != null) { - actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry"); + actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH); } } return actor; diff --git a/opendaylight/md-sal/samples/pom.xml b/opendaylight/md-sal/samples/pom.xml index c601647a2e..ae7d323480 100644 --- a/opendaylight/md-sal/samples/pom.xml +++ b/opendaylight/md-sal/samples/pom.xml @@ -15,6 +15,7 @@ toaster toaster-consumer toaster-provider + toaster-config l2switch diff --git a/opendaylight/md-sal/samples/toaster-config/pom.xml b/opendaylight/md-sal/samples/toaster-config/pom.xml new file mode 100644 index 0000000000..b30c4ba12f --- /dev/null +++ b/opendaylight/md-sal/samples/toaster-config/pom.xml @@ -0,0 +1,46 @@ + + + + + 4.0.0 + + org.opendaylight.controller.samples + sal-samples + 1.1-SNAPSHOT + + toaster-config + Configuration files for toaster + jar + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/03-toaster-sample.xml + xml + config + + + + + + + + + diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/03-toaster-sample.xml b/opendaylight/md-sal/samples/toaster-config/src/main/resources/initial/03-toaster-sample.xml similarity index 99% rename from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/03-toaster-sample.xml rename to opendaylight/md-sal/samples/toaster-config/src/main/resources/initial/03-toaster-sample.xml index 3958e18560..2e8c7d5ce6 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/03-toaster-sample.xml +++ b/opendaylight/md-sal/samples/toaster-config/src/main/resources/initial/03-toaster-sample.xml @@ -26,7 +26,7 @@ binding:binding-async-data-broker binding-data-broker - + binding:binding-notification-service @@ -54,7 +54,7 @@ - + diff --git a/opendaylight/netconf/netconf-config/pom.xml b/opendaylight/netconf/netconf-config/pom.xml new file mode 100644 index 0000000000..db5d14d75a --- /dev/null +++ b/opendaylight/netconf/netconf-config/pom.xml @@ -0,0 +1,46 @@ + + + + + 4.0.0 + + org.opendaylight.controller + netconf-subsystem + 0.2.5-SNAPSHOT + + netconf-config + Configuration files for netconf + jar + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/01-netconf.xml + xml + config + + + + + + + + + diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml b/opendaylight/netconf/netconf-config/src/main/resources/initial/01-netconf.xml similarity index 100% rename from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml rename to opendaylight/netconf/netconf-config/src/main/resources/initial/01-netconf.xml diff --git a/opendaylight/netconf/netconf-connector-config/pom.xml b/opendaylight/netconf/netconf-connector-config/pom.xml new file mode 100644 index 0000000000..d9cc5eab43 --- /dev/null +++ b/opendaylight/netconf/netconf-connector-config/pom.xml @@ -0,0 +1,46 @@ + + + + + 4.0.0 + + org.opendaylight.controller + netconf-subsystem + 0.2.5-SNAPSHOT + + netconf-connector-config + Configuration files for netconf-connector + jar + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/99-netconf-connector.xml + xml + config + + + + + + + + + diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/99-netconf-connector.xml b/opendaylight/netconf/netconf-connector-config/src/main/resources/initial/99-netconf-connector.xml similarity index 100% rename from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/99-netconf-connector.xml rename to opendaylight/netconf/netconf-connector-config/src/main/resources/initial/99-netconf-connector.xml diff --git a/opendaylight/netconf/pom.xml b/opendaylight/netconf/pom.xml index 937949a17e..c72705d50e 100644 --- a/opendaylight/netconf/pom.xml +++ b/opendaylight/netconf/pom.xml @@ -20,6 +20,7 @@ netconf-api netconf-cli + netconf-config netconf-impl config-netconf-connector netconf-util @@ -32,6 +33,7 @@ netconf-monitoring ietf-netconf-monitoring ietf-netconf-monitoring-extension + netconf-connector-config