From: Tony Tkacik
Date: Mon, 30 Mar 2015 08:25:17 +0000 (+0000)
Subject: Merge "Increase default negotiation timeout for netconf server to 30s"
X-Git-Tag: release/lithium~334
X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=95115ca49f3b16b936e0f6c88aedfc17cd0ee92c;hp=b03c5a20673792f0fb8df847fbaf9c359c7cce1b
Merge "Increase default negotiation timeout for netconf server to 30s"
---
diff --git a/opendaylight/adsal/features/nsf/src/main/resources/features.xml b/opendaylight/adsal/features/nsf/src/main/resources/features.xml
index 3985a69302..56271eb699 100644
--- a/opendaylight/adsal/features/nsf/src/main/resources/features.xml
+++ b/opendaylight/adsal/features/nsf/src/main/resources/features.xml
@@ -14,8 +14,8 @@
odl-adsal-all
- odl-nsf-controller-managers
- odl-adsal-controller-northbound
+ odl-nsf-managers
+ odl-adsal-northbound
@@ -44,43 +44,6 @@
mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}
mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}
-
- mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}
- mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}
- mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}
-
- mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting}
-
- mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1
- mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}
-
-
-
- odl-base-all
- odl-adsal-all
- mvn:org.opendaylight.controller/usermanager/${usermanager.version}
- mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version}
-
- mvn:org.opendaylight.controller/appauth/${appauth.version}
-
- mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version}
- mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version}
-
- mvn:org.opendaylight.controller/containermanager/${containermanager.version}
- mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version}
-
- mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version}
- mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version}
-
- mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version}
- mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version}
-
- mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version}
- mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}
-
- mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}
- mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}
-
mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}
mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}
mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}
@@ -117,26 +80,4 @@
mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}
mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}
-
-
- odl-base-all
- odl-nsf-managers
- mvn:org.ow2.asm/asm-all/${asm.version}
- mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version}
- mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version}
- mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version}
- mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version}
- mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}
- mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}
- mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}
- mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}
- mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}
- mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}
- mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}
- mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}
- mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version}
- mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version}
- mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}
- mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}
-
diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml
index c5adb28db7..4afbedb76e 100644
--- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml
+++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/features/pom.xml
@@ -70,6 +70,13 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
${artifactId}-impl
${symbol_dollar}{project.version}
+
+ ${symbol_dollar}{project.groupId}
+ ${artifactId}-impl
+ ${symbol_dollar}{project.version}
+ xml
+ config
+
${symbol_dollar}{project.groupId}
${artifactId}-api
diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml
index 130cb11c5a..2915a8dfd9 100644
--- a/opendaylight/commons/opendaylight/pom.xml
+++ b/opendaylight/commons/opendaylight/pom.xml
@@ -187,7 +187,7 @@
2013.09.07.7-SNAPSHOT
1.1.0-SNAPSHOT
0.7.0-SNAPSHOT
- 0.12.0
+ 0.14.0
0.9.7
3.3.0
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
index 603f34bac9..076d1b2fc7 100644
--- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
+++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
@@ -8,16 +8,11 @@
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -57,7 +52,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
@@ -82,11 +80,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
private final RpcRegistration aggregatorRpcReg;
private final EventSourceService eventSourceService;
private final RpcProviderRegistry rpcRegistry;
- private final ExecutorService executorService;
public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
this.dataBroker = dataBroker;
- this.executorService = Executors.newCachedThreadPool();
this.rpcRegistry = rpcRegistry;
aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
@@ -94,14 +91,17 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+
}
private void putData(final LogicalDatastoreType store,
- final InstanceIdentifier path, final T data) {
+ final InstanceIdentifier path,
+ final T data){
final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
tx.put(store, path, data, true);
tx.submit();
+
}
private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) {
@@ -112,7 +112,34 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
}
private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
- executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+
+ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
+
+ final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
+
+ Futures.addCallback(future, new FutureCallback>(){
+
+ @Override
+ public void onSuccess(Optional data) {
+ if(data.isPresent()) {
+ final List nodes = data.get().getNode();
+ for (final Node node : nodes) {
+ if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+ eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+ tx.close();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not notify existing nodes {}", t);
+ tx.close();
+ }
+
+ });
+
}
@Override
@@ -164,45 +191,4 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
// FIXME: Return registration object.
}
- private class NotifyAllNodeExecutor implements Runnable {
-
- private final EventSourceTopic topic;
- private final DataBroker dataBroker;
- private final Pattern nodeIdPatternRegex;
-
- public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
- this.topic = topic;
- this.dataBroker = dataBroker;
- this.nodeIdPatternRegex = nodeIdPatternRegex;
- }
-
- @Override
- public void run() {
- //# Code reader note: Context of Node type is NetworkTopology
- final List nodes = snapshot();
- for (final Node node : nodes) {
- if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
- topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
- }
- }
- }
-
- private List snapshot() {
- try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
- final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
- if(data.isPresent()) {
- final List nodeList = data.get().getNode();
- if(nodeList != null) {
- return nodeList;
- }
- }
- return Collections.emptyList();
- } catch (final ReadFailedException e) {
- LOG.error("Unable to retrieve node list.", e);
- return Collections.emptyList();
- }
- }
- }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
index 15063cff5b..bcfd472bf6 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
@@ -10,21 +10,17 @@ package org.opendaylight.controller.cluster.raft;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class FollowerLogInformationImpl implements FollowerLogInformation {
- private static final AtomicLongFieldUpdater NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
- private static final AtomicLongFieldUpdater MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
-
private final String id;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RaftActorContext context;
- private volatile long nextIndex;
+ private long nextIndex;
- private volatile long matchIndex;
+ private long matchIndex;
private long lastReplicatedIndex = -1L;
@@ -39,13 +35,13 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
}
@Override
- public long incrNextIndex(){
- return NEXT_INDEX_UPDATER.incrementAndGet(this);
+ public long incrNextIndex() {
+ return nextIndex++;
}
@Override
public long decrNextIndex() {
- return NEXT_INDEX_UPDATER.decrementAndGet(this);
+ return nextIndex--;
}
@Override
@@ -60,7 +56,7 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
@Override
public long incrMatchIndex(){
- return MATCH_INDEX_UPDATER.incrementAndGet(this);
+ return matchIndex++;
}
@Override
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
index 9faffb9395..b74259d485 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
@@ -17,11 +17,11 @@ import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -38,7 +39,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -102,6 +102,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
public void apply(ApplyJournalEntries param) throws Exception {
}
};
+ private static final String COMMIT_SNAPSHOT = "commit_snapshot";
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -117,17 +118,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
*/
private final RaftActorContextImpl context;
+ private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure();
+
/**
* The in-memory journal
*/
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
- private CaptureSnapshot captureSnapshot = null;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
+ private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
public RaftActor(String id, Map peerAddresses) {
this(id, peerAddresses, Optional.absent());
}
@@ -306,9 +309,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
currentBehavior = newBehavior;
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
@Override public void handleCommand(Object message) {
@@ -375,31 +378,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
persistenceId(), saveSnapshotFailure.cause());
- context.getReplicatedLog().snapshotRollback();
-
- LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
- "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- context.getReplicatedLog().size());
+ context.getSnapshotManager().rollback();
} else if (message instanceof CaptureSnapshot) {
LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
- if(captureSnapshot == null) {
- captureSnapshot = (CaptureSnapshot)message;
- createSnapshot();
- }
+ context.getSnapshotManager().create(createSnapshotProcedure);
- } else if (message instanceof CaptureSnapshotReply){
+ } else if (message instanceof CaptureSnapshotReply) {
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
+ } else if (message.equals(COMMIT_SNAPSHOT)) {
+ commitSnapshot(-1);
} else {
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
+
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
}
@@ -411,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog.dataSize())
.inMemoryJournalLogSize(replicatedLog.size())
- .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
.lastApplied(context.getLastApplied())
.lastIndex(replicatedLog.lastIndex())
.lastTerm(replicatedLog.lastTerm())
@@ -446,22 +443,30 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
- private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
if (oldBehavior != currentBehavior){
onStateChanged();
}
- String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
- String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+ String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+ String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
// it can happen that the state has not changed but the leader has changed.
- onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ Optional roleChangeNotifier = getRoleChangeNotifier();
+ if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+ if(roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+ }
+
+ onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ }
- if (getRoleChangeNotifier().isPresent() &&
+ if (roleChangeNotifier.isPresent() &&
(oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
- getRoleChangeNotifier().get().tell(
- new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
- getSelf());
+ roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ currentBehavior.state().name()), getSelf());
}
}
@@ -502,15 +507,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
// the state to durable storage
self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!context.isSnapshotCaptureInitiated()){
- raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
- raftContext.getTermInformation().getCurrentTerm());
- raftContext.getReplicatedLog().snapshotCommit();
- } else {
- LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
- persistenceId(), getId());
- }
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
} else if (clientActor != null) {
// Send message for replication
currentBehavior.handleMessage(getSelf(),
@@ -608,10 +606,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
protected void commitSnapshot(long sequenceNumber) {
- context.getReplicatedLog().snapshotCommit();
-
- // TODO: Not sure if we want to be this aggressive with trimming stuff
- trimPersistentData(sequenceNumber);
+ context.getSnapshotManager().commit(persistence(), sequenceNumber);
}
/**
@@ -703,17 +698,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected void onLeaderChanged(String oldLeader, String newLeader){};
- private void trimPersistentData(long sequenceNumber) {
- // Trim akka snapshots
- // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
- // For now guessing that it is ANDed.
- persistence().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
-
- // Trim akka journal
- persistence().deleteMessages(sequenceNumber);
- }
-
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
@@ -734,67 +718,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- persistence().saveSnapshot(sn);
-
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
- long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
- persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
- captureSnapshot.getLastAppliedIndex());
- }
-
- // if memory is less, clear the log based on lastApplied.
- // this could/should only happen if one of the followers is down
- // as normally we keep removing from the log when its replicated to all.
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
- // install snapshot to a follower.
- if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- }
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
- // clear the log based on replicatedToAllIndex
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
- captureSnapshot.getReplicatedToAllTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else {
- // The replicatedToAllIndex was not found in the log
- // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
- // In this scenario we may need to save the snapshot to the akka persistence
- // snapshot for recovery but we do not need to do the replicated log trimming.
- context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
- }
-
-
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
- "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
-
- if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
- ByteString.copyFrom(snapshotBytes)));
- }
-
- captureSnapshot = null;
- context.setSnapshotCaptureInitiated(false);
+ context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
}
protected long getTotalMemory() {
@@ -806,9 +730,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0;
+ private long dataSizeSinceLastSnapshot = 0L;
+
public ReplicatedLogImpl(Snapshot snapshot) {
super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
@@ -874,9 +798,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex() + 1;
- if(!hasFollowers()) {
+ if (!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
// due to this the journalSize will never become anything close to the
// snapshot batch count. In fact will mostly be 1.
@@ -890,51 +813,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
// as if we were maintaining a real snapshot
dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
}
-
+ long journalSize = replicatedLogEntry.getIndex() + 1;
long dataThreshold = getTotalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- // when a snaphsot is being taken, captureSnapshot != null
- if (!context.isSnapshotCaptureInitiated() &&
- ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
- dataSizeForCheck > dataThreshold)) {
-
- dataSizeSinceLastSnapshot = 0;
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
- " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || dataSizeForCheck > dataThreshold)) {
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
- ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
- if (!hasFollowers()) {
- lastAppliedIndex = replicatedLogEntry.getIndex();
- lastAppliedTerm = replicatedLogEntry.getTerm();
- } else if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
+ if(started){
+ dataSizeSinceLastSnapshot = 0;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
- LOG.debug("{}: Snapshot Capture lastApplied:{} ",
- persistenceId(), context.getLastApplied());
- LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
- lastAppliedIndex);
- LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
- lastAppliedTerm);
- }
-
- // send a CaptureSnapshot to self to make the expensive operation async.
- long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
- null);
- context.setSnapshotCaptureInitiated(true);
}
+
if (callback != null){
callback.apply(replicatedLogEntry);
}
@@ -1038,7 +932,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
@Override
public void saveSnapshot(Object o) {
// Make saving Snapshot successful
- commitSnapshot(-1L);
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(COMMIT_SNAPSHOT, self());
+ }
+ }
+
+
+ private class CreateSnapshotProcedure implements Procedure {
+
+ @Override
+ public void apply(Void aVoid) throws Exception {
+ createSnapshot();
}
}
@@ -1051,4 +956,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
return currentBehavior;
}
+ private static class BehaviorStateHolder {
+ private RaftActorBehavior behavior;
+ private String leaderId;
+
+ void init(RaftActorBehavior behavior) {
+ this.behavior = behavior;
+ this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+ }
+
+ RaftActorBehavior getBehavior() {
+ return behavior;
+ }
+
+ String getLeaderId() {
+ return leaderId;
+ }
+ }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
index 9d391a1588..2e7eb5eb3a 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
@@ -166,8 +166,6 @@ public interface RaftActorContext {
*/
ConfigParams getConfigParams();
- void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
- boolean isSnapshotCaptureInitiated();
+ SnapshotManager getSnapshotManager();
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
index 6fc5e4369b..eb059d60fb 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
@@ -41,6 +41,10 @@ public class RaftActorContextImpl implements RaftActorContext {
private boolean snapshotCaptureInitiated;
+ // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+ // be passed to it in the constructor
+ private SnapshotManager snapshotManager;
+
public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
String id,
ElectionTerm termInformation, long commitIndex,
@@ -134,16 +138,6 @@ public class RaftActorContextImpl implements RaftActorContext {
return configParams;
}
- @Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
- }
-
@Override public void addToPeers(String name, String address) {
peerAddresses.put(name, address);
}
@@ -166,4 +160,11 @@ public class RaftActorContextImpl implements RaftActorContext {
peerAddresses.put(peerId, peerAddress);
}
+
+ public SnapshotManager getSnapshotManager() {
+ if(snapshotManager == null){
+ snapshotManager = new SnapshotManager(this, LOG);
+ }
+ return snapshotManager;
+ }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
new file mode 100644
index 0000000000..432d678491
--- /dev/null
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
@@ -0,0 +1,431 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+public class SnapshotManager implements SnapshotState {
+
+
+ private final SnapshotState IDLE = new Idle();
+ private final SnapshotState CAPTURING = new Capturing();
+ private final SnapshotState PERSISTING = new Persisting();
+ private final SnapshotState CREATING = new Creating();
+
+ private final Logger LOG;
+ private final RaftActorContext context;
+ private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
+ new LastAppliedTermInformationReader();
+ private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
+ new ReplicatedToAllTermInformationReader();
+
+
+ private SnapshotState currentState = IDLE;
+ private CaptureSnapshot captureSnapshot;
+
+ public SnapshotManager(RaftActorContext context, Logger logger) {
+ this.context = context;
+ this.LOG = logger;
+ }
+
+ @Override
+ public boolean isCapturing() {
+ return currentState.isCapturing();
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return currentState.capture(lastLogEntry, replicatedToAllIndex);
+ }
+
+ @Override
+ public void create(Procedure callback) {
+ currentState.create(callback);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ currentState.commit(persistenceProvider, sequenceNumber);
+ }
+
+ @Override
+ public void rollback() {
+ currentState.rollback();
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return currentState.trimLog(desiredTrimIndex, currentBehavior);
+ }
+
+ private boolean hasFollowers(){
+ return context.getPeerAddresses().keySet().size() > 0;
+ }
+
+ private String persistenceId(){
+ return context.getId();
+ }
+
+ private class AbstractSnapshotState implements SnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return false;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ LOG.debug("capture should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ LOG.debug("captureToInstall should not be called in state {}", this);
+ return false;
+ }
+
+ @Override
+ public void create(Procedure callback) {
+ LOG.debug("create should not be called in state {}", this);
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ LOG.debug("persist should not be called in state {}", this);
+ }
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ LOG.debug("commit should not be called in state {}", this);
+ }
+
+ @Override
+ public void rollback() {
+ LOG.debug("rollback should not be called in state {}", this);
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ LOG.debug("trimLog should not be called in state {}", this);
+ return -1;
+ }
+
+ protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+ // we would want to keep the lastApplied as its used while capturing snapshots
+ long lastApplied = context.getLastApplied();
+ long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+ persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+ }
+
+ if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
+ LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
+ context.getTermInformation().getCurrentTerm());
+
+ //use the term of the temp-min, since we check for isPresent, entry will not be null
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+ context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
+ context.getReplicatedLog().snapshotCommit();
+ return tempMin;
+ } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+ // It's possible a follower was lagging and an install snapshot advanced its match index past
+ // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+ // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+ // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+ // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+ currentBehavior.setReplicatedToAllIndex(tempMin);
+ }
+ return -1;
+ }
+ }
+
+ private class Idle extends AbstractSnapshotState {
+
+ private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ TermInformationReader lastAppliedTermInfoReader =
+ lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+ lastLogEntry, hasFollowers());
+
+ long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+ long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+ TermInformationReader replicatedToAllTermInfoReader =
+ replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+ long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+ long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+ // send a CaptureSnapshot to self to make the expensive operation async.
+ captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
+ lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
+ newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+
+ SnapshotManager.this.currentState = CAPTURING;
+
+ if(targetFollower != null){
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {} to install on {}",
+ persistenceId(), captureSnapshot, targetFollower);
+ }
+
+ context.getActor().tell(captureSnapshot, context.getActor());
+
+ return true;
+ }
+
+ @Override
+ public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+ return capture(lastLogEntry, replicatedToAllIndex, null);
+ }
+
+ @Override
+ public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+ return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+ }
+
+ @Override
+ public String toString() {
+ return "Idle";
+ }
+
+ @Override
+ public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+ return doTrimLog(desiredTrimIndex, currentBehavior);
+ }
+ }
+
+ private class Capturing extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void create(Procedure callback) {
+ try {
+ callback.apply(null);
+ SnapshotManager.this.currentState = CREATING;
+ } catch (Exception e) {
+ LOG.error("Unexpected error occurred", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Capturing";
+ }
+
+ }
+
+ private class Creating extends AbstractSnapshotState {
+
+ @Override
+ public boolean isCapturing() {
+ return true;
+ }
+
+ @Override
+ public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+ RaftActorBehavior currentBehavior, long totalMemory) {
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+ Snapshot sn = Snapshot.create(snapshotBytes,
+ context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+ persistenceProvider.saveSnapshot(sn);
+
+ LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+
+ long dataThreshold = totalMemory *
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+ persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+ captureSnapshot.getLastAppliedIndex());
+ }
+
+ // if memory is less, clear the log based on lastApplied.
+ // this could/should only happen if one of the followers is down
+ // as normally we keep removing from the log when its replicated to all.
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+ // install snapshot to a follower.
+ if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ }
+
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+
+ currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm());
+ }
+
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ if (context.getId().equals(currentBehavior.getLeaderId())
+ && captureSnapshot.isInstallSnapshotInitiated()) {
+ // this would be call straight to the leader and won't initiate in serialization
+ currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
+ ByteString.copyFrom(snapshotBytes)));
+ }
+
+ captureSnapshot = null;
+ SnapshotManager.this.currentState = PERSISTING;
+ }
+
+ @Override
+ public String toString() {
+ return "Creating";
+ }
+
+ }
+
+ private class Persisting extends AbstractSnapshotState {
+
+ @Override
+ public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+ context.getReplicatedLog().snapshotCommit();
+ persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+ persistenceProvider.deleteMessages(sequenceNumber);
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public void rollback() {
+ context.getReplicatedLog().snapshotRollback();
+
+ LOG.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle." +
+ "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().size());
+
+ SnapshotManager.this.currentState = IDLE;
+ }
+
+ @Override
+ public String toString() {
+ return "Persisting";
+ }
+
+ }
+
+ private static interface TermInformationReader {
+ long getIndex();
+ long getTerm();
+ }
+
+ private static class LastAppliedTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ public LastAppliedTermInformationReader init(ReplicatedLog log, long originalIndex,
+ ReplicatedLogEntry lastLogEntry, boolean hasFollowers){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+ if (!hasFollowers) {
+ if(lastLogEntry != null) {
+ index = lastLogEntry.getIndex();
+ term = lastLogEntry.getTerm();
+ }
+ } else if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ } else if(log.getSnapshotIndex() > -1){
+ index = log.getSnapshotIndex();
+ term = log.getSnapshotTerm();
+ }
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+
+ private static class ReplicatedToAllTermInformationReader implements TermInformationReader{
+ private long index;
+ private long term;
+
+ ReplicatedToAllTermInformationReader init(ReplicatedLog log, long originalIndex){
+ ReplicatedLogEntry entry = log.get(originalIndex);
+ this.index = -1L;
+ this.term = -1L;
+
+ if (entry != null) {
+ index = entry.getIndex();
+ term = entry.getTerm();
+ }
+
+ return this;
+ }
+
+ @Override
+ public long getIndex(){
+ return this.index;
+ }
+
+ @Override
+ public long getTerm(){
+ return this.term;
+ }
+ }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
new file mode 100644
index 0000000000..9a9bf1c774
--- /dev/null
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+public interface SnapshotState {
+ /**
+ * Should return true when a snapshot is being captured
+ * @return
+ */
+ boolean isCapturing();
+
+ /**
+ * Initiate capture snapshot
+ *
+ * @param lastLogEntry the last entry in the replicated log
+ * @param replicatedToAllIndex the current replicatedToAllIndex
+ *
+ * @return true if capture was started
+ */
+ boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+
+ /**
+ * Initiate capture snapshot for the purposing of installing that snapshot
+ *
+ * @param lastLogEntry
+ * @param replicatedToAllIndex
+ * @param targetFollower
+ *
+ * @return true if capture was started
+ */
+ boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
+
+ /**
+ * Create the snapshot
+ *
+ * @param callback a procedure to be called which should create the snapshot
+ */
+ void create(Procedure callback);
+
+ /**
+ * Persist the snapshot
+ *
+ * @param persistenceProvider
+ * @param snapshotBytes
+ * @param currentBehavior
+ * @param totalMemory
+ */
+ void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+ ,long totalMemory);
+
+ /**
+ * Commit the snapshot by trimming the log
+ *
+ * @param persistenceProvider
+ * @param sequenceNumber
+ */
+ void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber);
+
+ /**
+ * Rollback the snapshot
+ */
+ void rollback();
+
+ /**
+ * Trim the log
+ *
+ * @param desiredTrimIndex
+ * @return the actual trim index
+ */
+ long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
index a63c62fa30..2c433f9007 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
@@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
applyLogToStateMachine(context.getCommitIndex());
}
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
purgeInMemoryLog();
}
@@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
followerToSnapshot.markSendStatus(false);
}
- if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+ if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
// Since the follower is now caught up try to purge the log.
purgeInMemoryLog();
} else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
@@ -491,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
sendAppendEntries = true;
}
} else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+ leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
// if the followers next index is not present in the leaders log, and
// if the follower is just not starting and if leader's index is more than followers index
// then snapshot should be sent
@@ -562,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
final ActorSelection followerActor = context.getPeerActorSelection(followerId);
sendSnapshotChunk(followerActor, followerId);
- } else if (!context.isSnapshotCaptureInitiated()) {
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- long replicatedToAllIndex = super.getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
- CaptureSnapshot captureSnapshot = new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
- isInstallSnapshotInitiated);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
- captureSnapshot);
- }
-
- actor().tell(captureSnapshot, actor());
- context.setSnapshotCaptureInitiated(true);
+ } else {
+ context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+ this.getReplicatedToAllIndex(), followerId);
}
}
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
index a1bcf8541c..c276d32cce 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
@@ -39,6 +39,8 @@ import scala.concurrent.duration.FiniteDuration;
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+ protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
/**
* Information about the RaftActor whose behavior this class represents
*/
@@ -254,7 +256,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
// message is sent to itself
electionCancel =
context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
+ context.getActor(), ELECTION_TIMEOUT,
context.getActorSystem().dispatcher(), context.getActor());
}
@@ -460,31 +462,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
-
- if(LOG.isTraceEnabled()) {
- LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
- logName, snapshotCapturedIndex, lastApplied, tempMin);
- }
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
- context.getTermInformation().getCurrentTerm());
-
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
- } else if(tempMin > getReplicatedToAllIndex()) {
- // It's possible a follower was lagging and an install snapshot advanced its match index past
- // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
- // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
- // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
- // trim the log to the last applied index even if previous entries weren't replicated to all followers.
- setReplicatedToAllIndex(tempMin);
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
index b36c41abf2..74bede171f 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
@@ -58,7 +58,14 @@ public class Candidate extends AbstractRaftActorBehavior {
votesRequired = getMajorityVoteCount(peers.size());
startNewTerm();
- scheduleElection(electionDuration());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
+
}
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
index 0f251a3012..a6722e6ff9 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
@@ -46,9 +46,14 @@ public class Follower extends AbstractRaftActorBehavior {
public Follower(RaftActorContext context) {
super(context, RaftState.Follower);
- scheduleElection(electionDuration());
-
initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor());
+
+ if(context.getPeerAddresses().isEmpty()){
+ actor().tell(ELECTION_TIMEOUT, actor());
+ } else {
+ scheduleElection(electionDuration());
+ }
+
}
private boolean isLogEntryPresent(long index){
@@ -96,6 +101,19 @@ public class Follower extends AbstractRaftActorBehavior {
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
+ if (snapshotTracker != null) {
+ // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ }
+ sender.tell(reply, actor());
+
+ return this;
+ }
+
// 1. Reply false if term < currentTerm (§5.1)
// This is handled in the appendEntries method of the base class
@@ -242,7 +260,7 @@ public class Follower extends AbstractRaftActorBehavior {
sender.tell(reply, actor());
- if (!context.isSnapshotCaptureInitiated()) {
+ if (!context.getSnapshotManager().isCapturing()) {
super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
index 120a3a16a9..13445b0b26 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
@@ -261,4 +261,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
assertEquals("ReplicatedLogEntry getIndex", expIndex, replicatedLogEntry.getIndex());
assertEquals("ReplicatedLogEntry getData", payload, replicatedLogEntry.getData());
}
+
+ protected String testActorPath(String id){
+ return "akka://test/user" + id;
+ }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
index 1cc7b5f576..53cca23741 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
@@ -35,6 +35,7 @@ public class MockRaftActorContext implements RaftActorContext {
private Map peerAddresses = new HashMap<>();
private ConfigParams configParams;
private boolean snapshotCaptureInitiated;
+ private SnapshotManager snapshotManager;
public MockRaftActorContext(){
electionTerm = new ElectionTerm() {
@@ -191,13 +192,11 @@ public class MockRaftActorContext implements RaftActorContext {
}
@Override
- public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
- this.snapshotCaptureInitiated = snapshotCaptureInitiated;
- }
-
- @Override
- public boolean isSnapshotCaptureInitiated() {
- return snapshotCaptureInitiated;
+ public SnapshotManager getSnapshotManager() {
+ if(this.snapshotManager == null){
+ this.snapshotManager = new SnapshotManager(this, getLogger());
+ }
+ return this.snapshotManager;
}
public void setConfigParams(ConfigParams configParams) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
index b192b7cd24..0a4a2c7717 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
@@ -31,6 +31,7 @@ import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
@@ -54,16 +55,17 @@ import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -157,6 +159,16 @@ public class RaftActorTest extends AbstractActorTest {
}
}
+
+ public void waitUntilLeader(){
+ for(int i = 0;i < 10; i++){
+ if(isLeader()){
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
public List
*/
public class MessageCollectorActor extends UntypedActor {
- private List messages = new ArrayList<>();
+ private final List messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
@@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor {
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
@@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor {
return output;
}
+ public static T expectFirstMatching(ActorRef actor, Class clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
index 81b6bccaf0..63878df23c 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
public class MockActorContext extends ActorContext {
@@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext {
return executeRemoteOperationResponse;
}
- @Override public Optional findPrimaryShard(String shardName) {
- return Optional.absent();
- }
-
public void setExecuteShardOperationResponse(Object response){
executeShardOperationResponse = response;
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
index fe40aa0fd4..810b270cfc 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
@@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString;
import akka.cluster.ClusterEvent;
import akka.cluster.MemberStatus;
import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
import java.util.HashSet;
import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
public class MockClusterWrapper implements ClusterWrapper{
private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+ private String currentMemberName = "member-1";
+
+ public MockClusterWrapper() {
+ }
+
+ public MockClusterWrapper(String currentMemberName) {
+ this.currentMemberName = currentMemberName;
+ }
@Override
public void subscribeToMemberEvents(ActorRef actorRef) {
@@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{
@Override
public String getCurrentMemberName() {
- return "member-1";
+ return currentMemberName;
}
@Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
index 4ef7d65857..0bc561f1bd 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
@@ -9,6 +9,8 @@
package org.opendaylight.controller.cluster.datastore.utils;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
public class MockConfiguration implements Configuration{
- @Override public List getMemberShardNames(final String memberName) {
- return Arrays.asList("default");
+ private Map> shardMembers = ImmutableMap.>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+ public MockConfiguration() {
+ }
+
+ public MockConfiguration(Map> shardMembers) {
+ this.shardMembers = shardMembers;
}
- @Override public Optional getModuleNameFromNameSpace(
+ @Override
+ public List getMemberShardNames(final String memberName) {
+ return new ArrayList<>(shardMembers.keySet());
+ }
+ @Override
+ public Optional getModuleNameFromNameSpace(
final String nameSpace) {
return Optional.absent();
}
@@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{
return Arrays.asList("member-2", "member-3");
}
- return Collections.emptyList();
+ List members = shardMembers.get(shardName);
+ return members != null ? members : Collections.emptyList();
}
@Override public Set getAllShardNames() {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
index 9761ed8615..4240608036 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
@@ -21,6 +21,10 @@ public class TestModel {
public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
"test");
+ public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+ "junk");
+
+
public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
@@ -31,6 +35,7 @@ public class TestModel {
private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+ public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
node(OUTER_LIST_QNAME).build();
public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
index badec6f831..03634627d6 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
@@ -34,3 +34,105 @@ bounded-mailbox {
mailbox-capacity = 1000
mailbox-push-timeout-time = 100ms
}
+
+Member1 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ loglevel = "DEBUG"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2558
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-1"
+ ]
+ }
+ }
+}
+
+Member2 {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
+ in-memory-journal {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+ }
+
+ in-memory-snapshot-store {
+ class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+ plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+ }
+
+ akka {
+ persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+ persistence.journal.plugin = "in-memory-journal"
+
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ proto = "akka.remote.serialization.ProtobufSerializer"
+ }
+
+ serialization-bindings {
+ "com.google.protobuf.Message" = proto
+ }
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2559
+ }
+ }
+
+ cluster {
+ auto-down-unreachable-after = 100s
+
+ roles = [
+ "member-2"
+ ]
+ }
+ }
+}
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
index cdce946aba..88dd0e55c5 100644
--- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
+++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
@@ -414,7 +414,9 @@ public final class NetconfDevice implements RemoteDevice errors = ((RpcErrorsException)failure).getRpcErrors();
diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java
index bc84734190..50676c57c1 100644
--- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java
+++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/editconfig/EditConfig.java
@@ -243,8 +243,6 @@ public class EditConfig extends AbstractConfigNetconfOperation {
}
Date revision = module.getRevision();
- Preconditions.checkState(!revisionsByNamespace.containsKey(revision),
- "Duplicate revision %s for namespace %s", revision, namespace);
IdentityMapping identityMapping = revisionsByNamespace.get(revision);
if(identityMapping == null) {
diff --git a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java
index 0d3370548a..283ec424ba 100644
--- a/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java
+++ b/opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreSnapshot.java
@@ -8,11 +8,16 @@
package org.opendaylight.controller.netconf.confignetconfconnector.osgi;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import org.opendaylight.controller.config.yangjmxgenerator.ModuleMXBeanEntry;
import org.opendaylight.controller.config.yangjmxgenerator.PackageTranslator;
@@ -23,6 +28,7 @@ import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleIdentifierImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,12 +105,31 @@ final class YangStoreSnapshot implements YangStoreContext {
@Override
public Set getModules() {
- return schemaContext.getModules();
+ final Set modules = Sets.newHashSet(schemaContext.getModules());
+ for (final Module module : schemaContext.getModules()) {
+ modules.addAll(module.getSubmodules());
+ }
+ return modules;
}
@Override
public String getModuleSource(final org.opendaylight.yangtools.yang.model.api.ModuleIdentifier moduleIdentifier) {
- return schemaContext.getModuleSource(moduleIdentifier).get();
+ final Optional moduleSource = schemaContext.getModuleSource(moduleIdentifier);
+ if(moduleSource.isPresent()) {
+ return moduleSource.get();
+ } else {
+ try {
+ return Iterables.find(getModules(), new Predicate() {
+ @Override
+ public boolean apply(final Module input) {
+ final ModuleIdentifierImpl id = new ModuleIdentifierImpl(input.getName(), Optional.fromNullable(input.getNamespace()), Optional.fromNullable(input.getRevision()));
+ return id.equals(moduleIdentifier);
+ }
+ }).getSource();
+ } catch (final NoSuchElementException e) {
+ throw new IllegalArgumentException("Source for yang module " + moduleIdentifier + " not found", e);
+ }
+ }
}
@Override
diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java
index 89ce149e12..499ae01ed6 100644
--- a/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java
+++ b/opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java
@@ -8,6 +8,7 @@
package org.opendaylight.controller.netconf.mdsal.connector;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.HashSet;
import java.util.Set;
@@ -50,25 +51,39 @@ public class MdsalNetconfOperationServiceFactory implements NetconfOperationServ
return transformCapabilities(currentSchemaContext.getCurrentContext());
}
- static Set transformCapabilities(final SchemaContext currentContext1) {
+ static Set transformCapabilities(final SchemaContext currentContext) {
final Set capabilities = new HashSet<>();
// [RFC6241] 8.3. Candidate Configuration Capability
capabilities.add(new BasicCapability("urn:ietf:params:netconf:capability:candidate:1.0"));
- final SchemaContext currentContext = currentContext1;
final Set modules = currentContext.getModules();
for (final Module module : modules) {
- if(currentContext.getModuleSource(module).isPresent()) {
- capabilities.add(new YangModuleCapability(module, currentContext.getModuleSource(module).get()));
- } else {
- LOG.warn("Missing source for module {}. This module will not be available from netconf server",
- module);
+ Optional cap = moduleToCapability(module);
+ if(cap.isPresent()) {
+ capabilities.add(cap.get());
+ }
+ for (final Module submodule : module.getSubmodules()) {
+ cap = moduleToCapability(submodule);
+ if(cap.isPresent()) {
+ capabilities.add(cap.get());
+ }
}
}
return capabilities;
}
+ private static Optional moduleToCapability(final Module module) {
+ final String source = module.getSource();
+ if(source !=null) {
+ return Optional.of(new YangModuleCapability(module, source));
+ } else {
+ LOG.warn("Missing source for module {}. This module will not be available from netconf server",
+ module);
+ }
+ return Optional.absent();
+ }
+
@Override
public AutoCloseable registerCapabilityListener(final CapabilityListener listener) {
return currentSchemaContext.registerCapabilityListener(listener);
diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java
index 06c695c25a..f4017fbe58 100644
--- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java
+++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java
@@ -8,7 +8,7 @@
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -45,6 +45,9 @@ public class NetconfClientSessionNegotiator extends
private static final XPathExpression sessionIdXPath = XMLNetconfUtil
.compileXPath("/netconf:hello/netconf:session-id");
+ private static final XPathExpression sessionIdXPathNoNamespace = XMLNetconfUtil
+ .compileXPath("/hello/session-id");
+
private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0";
protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences,
@@ -113,16 +116,22 @@ public class NetconfClientSessionNegotiator extends
}
private long extractSessionId(final Document doc) {
- final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
- Preconditions.checkState(sessionIdNode != null, "");
- String textContent = sessionIdNode.getTextContent();
- if (textContent == null || textContent.equals("")) {
- throw new IllegalStateException("Session id not received from server");
+ String textContent = getSessionIdWithXPath(doc, sessionIdXPath);
+ if (Strings.isNullOrEmpty(textContent)) {
+ textContent = getSessionIdWithXPath(doc, sessionIdXPathNoNamespace);
+ if (Strings.isNullOrEmpty(textContent)) {
+ throw new IllegalStateException("Session id not received from server, hello message: " + XmlUtil.toString(doc));
+ }
}
return Long.valueOf(textContent);
}
+ private String getSessionIdWithXPath(final Document doc, final XPathExpression sessionIdXPath) {
+ final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
+ return sessionIdNode != null ? sessionIdNode.getTextContent() : null;
+ }
+
@Override
protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel,
final NetconfHelloMessage message) throws NetconfDocumentedException {
diff --git a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java
index ab73126021..d7d8660ae4 100644
--- a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java
+++ b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java
@@ -61,7 +61,7 @@ public class SSHTest {
@AfterClass
public static void tearDown() throws Exception {
hashedWheelTimer.stop();
- nettyGroup.shutdownGracefully().await();
+ nettyGroup.shutdownGracefully().await(5, TimeUnit.SECONDS);
minaTimerEx.shutdownNow();
nioExec.shutdownNow();
}
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java
index 5cd17a2331..404885db7e 100644
--- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java
+++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfHelloMessage.java
@@ -94,9 +94,9 @@ public final class NetconfHelloMessage extends NetconfMessage {
private static boolean isHelloMessage(final Document document) {
XmlElement element = XmlElement.fromDomElement(document.getDocumentElement());
try {
+ // accept even if hello has no namespace
return element.getName().equals(HELLO_TAG) &&
- element.hasNamespace() &&
- element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ (!element.hasNamespace() || element.getNamespace().equals(XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0));
} catch (MissingNameSpaceException e) {
// Cannot happen, since we check for hasNamespace
throw new IllegalStateException(e);
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java
index 61b23202c3..3c6b6ccab9 100644
--- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java
+++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageUtil.java
@@ -9,6 +9,7 @@
package org.opendaylight.controller.netconf.util.messages;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import java.util.Collection;
import java.util.List;
@@ -59,9 +60,13 @@ public final class NetconfMessageUtil {
public static Collection extractCapabilitiesFromHello(Document doc) throws NetconfDocumentedException {
XmlElement responseElement = XmlElement.fromDomDocument(doc);
- XmlElement capabilitiesElement = responseElement
- .getOnlyChildElementWithSameNamespace(XmlNetconfConstants.CAPABILITIES);
- List caps = capabilitiesElement.getChildElements(XmlNetconfConstants.CAPABILITY);
+ // Extract child element from with or without(fallback) the same namespace
+ Optional capabilitiesElement = responseElement
+ .getOnlyChildElementWithSameNamespaceOptionally(XmlNetconfConstants.CAPABILITIES)
+ .or(responseElement
+ .getOnlyChildElementOptionally(XmlNetconfConstants.CAPABILITIES));
+
+ List caps = capabilitiesElement.get().getChildElements(XmlNetconfConstants.CAPABILITY);
return Collections2.transform(caps, new Function() {
@Override