From: Tom Pantelis
Date: Wed, 25 Mar 2015 18:43:11 +0000 (+0000)
Subject: Merge "Do not use ActorSystem.actorFor as it is deprecated"
X-Git-Tag: release/lithium~350
X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=4349b034606957d3e876b82b14a292e6739a986a;hp=fc89f3b66b3e5db73e09792bbecebf3156d70d20
Merge "Do not use ActorSystem.actorFor as it is deprecated"
---
diff --git a/opendaylight/adsal/features/nsf/src/main/resources/features.xml b/opendaylight/adsal/features/nsf/src/main/resources/features.xml
index 3985a69302..56271eb699 100644
--- a/opendaylight/adsal/features/nsf/src/main/resources/features.xml
+++ b/opendaylight/adsal/features/nsf/src/main/resources/features.xml
@@ -14,8 +14,8 @@
odl-adsal-all
- odl-nsf-controller-managers
- odl-adsal-controller-northbound
+ odl-nsf-managers
+ odl-adsal-northbound
@@ -44,43 +44,6 @@
mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}
mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}
-
- mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}
- mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}
- mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}
-
- mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting}
-
- mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1
- mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}
-
-
-
- odl-base-all
- odl-adsal-all
- mvn:org.opendaylight.controller/usermanager/${usermanager.version}
- mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version}
-
- mvn:org.opendaylight.controller/appauth/${appauth.version}
-
- mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version}
- mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version}
-
- mvn:org.opendaylight.controller/containermanager/${containermanager.version}
- mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version}
-
- mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version}
- mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version}
-
- mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version}
- mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version}
-
- mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version}
- mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}
-
- mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}
- mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}
-
mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}
mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}
mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}
@@ -117,26 +80,4 @@
mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}
mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}
-
-
- odl-base-all
- odl-nsf-managers
- mvn:org.ow2.asm/asm-all/${asm.version}
- mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version}
- mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version}
- mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version}
- mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version}
- mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}
- mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}
- mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}
- mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}
- mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}
- mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}
- mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}
- mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}
- mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version}
- mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version}
- mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}
- mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}
-
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
index 603f34bac9..076d1b2fc7 100644
--- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
+++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
@@ -8,16 +8,11 @@
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -57,7 +52,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
@@ -82,11 +80,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
private final RpcRegistration aggregatorRpcReg;
private final EventSourceService eventSourceService;
private final RpcProviderRegistry rpcRegistry;
- private final ExecutorService executorService;
public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
this.dataBroker = dataBroker;
- this.executorService = Executors.newCachedThreadPool();
this.rpcRegistry = rpcRegistry;
aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
@@ -94,14 +91,17 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+
}
private void putData(final LogicalDatastoreType store,
- final InstanceIdentifier path, final T data) {
+ final InstanceIdentifier path,
+ final T data){
final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
tx.put(store, path, data, true);
tx.submit();
+
}
private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) {
@@ -112,7 +112,34 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
}
private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
- executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+
+ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
+
+ final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
+
+ Futures.addCallback(future, new FutureCallback>(){
+
+ @Override
+ public void onSuccess(Optional data) {
+ if(data.isPresent()) {
+ final List nodes = data.get().getNode();
+ for (final Node node : nodes) {
+ if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+ eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+ tx.close();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not notify existing nodes {}", t);
+ tx.close();
+ }
+
+ });
+
}
@Override
@@ -164,45 +191,4 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
// FIXME: Return registration object.
}
- private class NotifyAllNodeExecutor implements Runnable {
-
- private final EventSourceTopic topic;
- private final DataBroker dataBroker;
- private final Pattern nodeIdPatternRegex;
-
- public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
- this.topic = topic;
- this.dataBroker = dataBroker;
- this.nodeIdPatternRegex = nodeIdPatternRegex;
- }
-
- @Override
- public void run() {
- //# Code reader note: Context of Node type is NetworkTopology
- final List nodes = snapshot();
- for (final Node node : nodes) {
- if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
- topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
- }
- }
- }
-
- private List snapshot() {
- try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
- final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
- if(data.isPresent()) {
- final List nodeList = data.get().getNode();
- if(nodeList != null) {
- return nodeList;
- }
- }
- return Collections.emptyList();
- } catch (final ReadFailedException e) {
- LOG.error("Unable to retrieve node list.", e);
- return Collections.emptyList();
- }
- }
- }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/pom.xml b/opendaylight/md-sal/sal-akka-raft/pom.xml
index 1f99a52ed5..0ec83c86b3 100644
--- a/opendaylight/md-sal/sal-akka-raft/pom.xml
+++ b/opendaylight/md-sal/sal-akka-raft/pom.xml
@@ -59,6 +59,11 @@
commons-io
+
+ org.apache.commons
+ commons-lang3
+
+
com.typesafe.akka
akka-slf4j_${scala.version}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
index 0beccd1b2b..07b6b617aa 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
@@ -91,4 +91,10 @@ public interface FollowerLogInformation {
*/
long timeSinceLastActivity();
+ /**
+ * This method checks if it is ok to replicate
+ *
+ * @return true if it is ok to replicate
+ */
+ boolean okToReplicate();
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
index 288a540344..15063cff5b 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
@@ -26,6 +26,11 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
private volatile long matchIndex;
+ private long lastReplicatedIndex = -1L;
+
+ private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
+
+
public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
this.id = id;
this.nextIndex = context.getCommitIndex();
@@ -110,6 +115,28 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
return stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
+ @Override
+ public boolean okToReplicate() {
+ // Return false if we are trying to send duplicate data before the heartbeat interval
+ if(getNextIndex() == lastReplicatedIndex){
+ if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
+ .getHeartBeatInterval().toMillis()){
+ return false;
+ }
+ }
+
+ resetLastReplicated();
+ return true;
+ }
+
+ private void resetLastReplicated(){
+ lastReplicatedIndex = getNextIndex();
+ if(lastReplicatedStopwatch.isRunning()){
+ lastReplicatedStopwatch.reset();
+ }
+ lastReplicatedStopwatch.start();
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -120,6 +147,4 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
.append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
return builder.toString();
}
-
-
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
index 77ff47d0ad..aa72485187 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
@@ -17,14 +17,21 @@ import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -34,11 +41,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +130,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
private int currentRecoveryBatchCount;
+ private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+
public RaftActor(String id, Map peerAddresses) {
this(id, peerAddresses, Optional.absent());
}
@@ -297,9 +310,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
currentBehavior = newBehavior;
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
}
@Override public void handleCommand(Object message) {
@@ -384,31 +397,84 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
} else if (message instanceof CaptureSnapshotReply){
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
-
+ } else if(message instanceof GetOnDemandRaftState) {
+ onGetOnDemandRaftStats();
} else {
- RaftActorBehavior oldBehavior = currentBehavior;
+ reusableBehaviorStateHolder.init(currentBehavior);
+
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- handleBehaviorChange(oldBehavior, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+ }
+ }
+
+ private void onGetOnDemandRaftStats() {
+ // Debugging message to retrieve raft stats.
+
+ OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ .commitIndex(context.getCommitIndex())
+ .currentTerm(context.getTermInformation().getCurrentTerm())
+ .inMemoryJournalDataSize(replicatedLog.dataSize())
+ .inMemoryJournalLogSize(replicatedLog.size())
+ .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .lastApplied(context.getLastApplied())
+ .lastIndex(replicatedLog.lastIndex())
+ .lastTerm(replicatedLog.lastTerm())
+ .leader(getLeaderId())
+ .raftState(currentBehavior.state().toString())
+ .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
+ .snapshotIndex(replicatedLog.getSnapshotIndex())
+ .snapshotTerm(replicatedLog.getSnapshotTerm())
+ .votedFor(context.getTermInformation().getVotedFor())
+ .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
+
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+ if (lastLogEntry != null) {
+ builder.lastLogIndex(lastLogEntry.getIndex());
+ builder.lastLogTerm(lastLogEntry.getTerm());
}
+
+ if(currentBehavior instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader)currentBehavior;
+ Collection followerIds = leader.getFollowerIds();
+ List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
+ for(String id: followerIds) {
+ final FollowerLogInformation info = leader.getFollower(id);
+ followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+ }
+
+ builder.followerInfoList(followerInfoList);
+ }
+
+ sender().tell(builder.build(), self());
+
}
- private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+ RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
+
if (oldBehavior != currentBehavior){
onStateChanged();
}
- String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
- String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+ String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+ String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
// it can happen that the state has not changed but the leader has changed.
- onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ Optional roleChangeNotifier = getRoleChangeNotifier();
+ if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
+ if(roleChangeNotifier.isPresent()) {
+ roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
+ }
+
+ onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+ }
- if (getRoleChangeNotifier().isPresent() &&
+ if (roleChangeNotifier.isPresent() &&
(oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
- getRoleChangeNotifier().get().tell(
- new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
- getSelf());
+ roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ currentBehavior.state().name()), getSelf());
}
}
@@ -998,4 +1064,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
return currentBehavior;
}
+ private static class BehaviorStateHolder {
+ private RaftActorBehavior behavior;
+ private String leaderId;
+
+ void init(RaftActorBehavior behavior) {
+ this.behavior = behavior;
+ this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+ }
+
+ RaftActorBehavior getBehavior() {
+ return behavior;
+ }
+
+ String getLeaderId() {
+ return leaderId;
+ }
+ }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
index a4753a4fe4..a63c62fa30 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
@@ -134,7 +134,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
*
* @return Collection of follower IDs
*/
- protected final Collection getFollowerIds() {
+ public final Collection getFollowerIds() {
return followerToLog.keySet();
}
@@ -460,6 +460,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
if (followerActor != null) {
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ boolean sendAppendEntries = false;
+ List entries = Collections.EMPTY_LIST;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
@@ -467,8 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
sendSnapshotChunk(followerActor, followerId);
} else if(sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.emptyList(), followerId);
+ sendAppendEntries = true;
}
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
@@ -485,10 +486,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
followerNextIndex, followerId);
// FIXME : Sending one entry at a time
- final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
-
+ if(followerLogInformation.okToReplicate()) {
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+ sendAppendEntries = true;
+ }
} else if (isFollowerActive && followerNextIndex >= 0 &&
leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
// if the followers next index is not present in the leaders log, and
@@ -503,19 +504,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
}
// Send heartbeat to follower whenever install snapshot is initiated.
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.emptyList(), followerId);
-
+ sendAppendEntries = true;
initiateCaptureSnapshot(followerId, followerNextIndex);
} else if(sendHeartbeat) {
- //we send an AppendEntries, even if the follower is inactive
+ // we send an AppendEntries, even if the follower is inactive
// in-order to update the followers timestamp, in case it becomes active again
- sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
- Collections.emptyList(), followerId);
+ sendAppendEntries = true;
}
}
+
+ if(sendAppendEntries) {
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ entries, followerId);
+ }
}
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
index 0f251a3012..bdd459ecff 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
@@ -96,6 +96,19 @@ public class Follower extends AbstractRaftActorBehavior {
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
+ if (snapshotTracker != null) {
+ // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ }
+ sender.tell(reply, actor());
+
+ return this;
+ }
+
// 1. Reply false if term < currentTerm (§5.1)
// This is handled in the appendEntries method of the base class
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java
new file mode 100644
index 0000000000..5d2c56a117
--- /dev/null
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * A bean class containing a snapshot of information for a follower returned from GetOnDemandRaftStats.
+ *
+ * @author Thomas Pantelis
+ */
+public class FollowerInfo {
+ private final String id;
+ private final long nextIndex;
+ private final long matchIndex;
+ private final boolean isActive;
+ private final String timeSinceLastActivity;
+
+ @ConstructorProperties({"id","nextIndex", "matchIndex", "isActive", "timeSinceLastActivity"})
+ public FollowerInfo(String id, long nextIndex, long matchIndex, boolean isActive, String timeSinceLastActivity) {
+ this.id = id;
+ this.nextIndex = nextIndex;
+ this.matchIndex = matchIndex;
+ this.isActive = isActive;
+ this.timeSinceLastActivity = timeSinceLastActivity;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public long getNextIndex() {
+ return nextIndex;
+ }
+
+ public long getMatchIndex() {
+ return matchIndex;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public String getTimeSinceLastActivity() {
+ return timeSinceLastActivity;
+ }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java
new file mode 100644
index 0000000000..be043861fb
--- /dev/null
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Local message sent to a RaftActor to obtain a snapshot of statistical information. Returns an
+ * OnDemandRaftState instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetOnDemandRaftState {
+ public static final GetOnDemandRaftState INSTANCE = new GetOnDemandRaftState();
+
+ private GetOnDemandRaftState() {
+ }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
new file mode 100644
index 0000000000..8c2986f6d1
--- /dev/null
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The response to a GetOnDemandRaftState message,
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandRaftState {
+ private long lastLogIndex = -1L;
+ private long lastLogTerm = -1L;
+ private long currentTerm = -1L;
+ private long commitIndex = -1L;
+ private long lastApplied = -1L;
+ private long lastIndex = -1L;
+ private long lastTerm = -1L;
+ private long snapshotIndex = -1L;
+ private long snapshotTerm = -1L;
+ private long replicatedToAllIndex = -1L;
+ private long inMemoryJournalDataSize;
+ private long inMemoryJournalLogSize;
+ private String leader;
+ private String raftState;
+ private String votedFor;
+ private boolean isSnapshotCaptureInitiated;
+
+ private List followerInfoList = Collections.emptyList();
+ private Map peerAddresses = Collections.emptyMap();
+
+ private OnDemandRaftState() {
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public long getCommitIndex() {
+ return commitIndex;
+ }
+
+ public long getLastApplied() {
+ return lastApplied;
+ }
+
+ public long getLastIndex() {
+ return lastIndex;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
+ public long getInMemoryJournalDataSize() {
+ return inMemoryJournalDataSize;
+ }
+
+ public long getInMemoryJournalLogSize() {
+ return inMemoryJournalLogSize;
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ public String getRaftState() {
+ return raftState;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public boolean isSnapshotCaptureInitiated() {
+ return isSnapshotCaptureInitiated;
+ }
+
+ public List getFollowerInfoList() {
+ return followerInfoList;
+ }
+
+ public Map getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public static class Builder {
+ private final OnDemandRaftState stats = new OnDemandRaftState();
+
+ public Builder lastLogIndex(long value) {
+ stats.lastLogIndex = value;
+ return this;
+ }
+
+ public Builder lastLogTerm(long value) {
+ stats.lastLogTerm = value;
+ return this;
+ }
+
+ public Builder currentTerm(long value) {
+ stats.currentTerm = value;
+ return this;
+ }
+
+ public Builder commitIndex(long value) {
+ stats.commitIndex = value;
+ return this;
+ }
+
+ public Builder lastApplied(long value) {
+ stats.lastApplied = value;
+ return this;
+ }
+
+ public Builder lastIndex(long value) {
+ stats.lastIndex = value;
+ return this;
+ }
+
+ public Builder lastTerm(long value) {
+ stats.lastTerm = value;
+ return this;
+ }
+
+ public Builder snapshotIndex(long value) {
+ stats.snapshotIndex = value;
+ return this;
+ }
+
+ public Builder snapshotTerm(long value) {
+ stats.snapshotTerm = value;
+ return this;
+ }
+
+ public Builder replicatedToAllIndex(long value) {
+ stats.replicatedToAllIndex = value;
+ return this;
+ }
+
+ public Builder inMemoryJournalDataSize(long value) {
+ stats.inMemoryJournalDataSize = value;
+ return this;
+ }
+
+ public Builder inMemoryJournalLogSize(long value) {
+ stats.inMemoryJournalLogSize = value;
+ return this;
+ }
+
+ public Builder leader(String value) {
+ stats.leader = value;
+ return this;
+ }
+
+ public Builder raftState(String value) {
+ stats.raftState = value;
+ return this;
+ }
+
+ public Builder votedFor(String value) {
+ stats.votedFor = value;
+ return this;
+ }
+
+ public Builder followerInfoList(List followerInfoList) {
+ stats.followerInfoList = followerInfoList;
+ return this;
+ }
+
+ public Builder peerAddresses(Map peerAddresses) {
+ stats.peerAddresses = peerAddresses;
+ return this;
+ }
+
+ public Builder isSnapshotCaptureInitiated(boolean value) {
+ stats.isSnapshotCaptureInitiated = value;
+ return this;
+ }
+
+ public OnDemandRaftState build() {
+ return stats;
+ }
+ }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
index 5be9030f59..bdfd69ec11 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
@@ -60,4 +60,24 @@ public class FollowerLogInformationImplTest {
stopwatch.stop();
return stopwatch.elapsed(TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testOkToReplicate(){
+ MockRaftActorContext context = new MockRaftActorContext();
+ context.setCommitIndex(9);
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(
+ "follower1", 10, context);
+
+ assertTrue(followerLogInformation.okToReplicate());
+ assertFalse(followerLogInformation.okToReplicate());
+
+ // wait for 150 milliseconds and it should work again
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ assertTrue(followerLogInformation.okToReplicate());
+
+ //increment next index and try immediately and it should work again
+ followerLogInformation.incrNextIndex();
+ assertTrue(followerLogInformation.okToReplicate());
+ }
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
index b192b7cd24..34932c7249 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
@@ -54,6 +54,7 @@ import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -64,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -944,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest {
@Test
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ TestActorRef notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -954,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest {
String persistenceId = factory.generateActorId("notifier-");
- factory.createTestActor(MockRaftActor.props(persistenceId,
+ TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId);
- List matches = null;
- for(int i = 0; i < 5000 / heartBeatInterval; i++) {
- matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
- if(matches.size() == 3) {
- break;
- }
- Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
- }
-
- assertEquals(3, matches.size());
+ List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
@@ -986,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest {
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
+
+ assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+ notifierActor.underlyingActor().clear();
+
+ MockRaftActor raftActor = raftActorRef.underlyingActor();
+ final String newLeaderId = "new-leader";
+ Follower follower = new Follower(raftActor.getRaftActorContext()) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ leaderId = newLeaderId;
+ return this;
+ }
+ };
+
+ raftActor.changeCurrentBehavior(follower);
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(null, leaderStateChange.getLeaderId());
+
+ raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ notifierActor.underlyingActor().clear();
+
+ raftActor.handleCommand("any");
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(newLeaderId, leaderStateChange.getLeaderId());
}};
}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
index 29fb613327..75509bae51 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
@@ -2,8 +2,13 @@ package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
@@ -577,12 +582,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
follower = createBehavior(context);
- HashMap followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
@@ -627,6 +627,57 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
}
+
+ /**
+ * Verify that when an AppendEntries is sent to a follower during a snapshot install
+ * the Follower short-circuits the processing of the AppendEntries message.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(((Follower) follower).getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(((Follower) follower).getSnapshotTracker());
+
+ // Send an append entry
+ AppendEntries appendEntries = mock(AppendEntries.class);
+ doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
+ verify(appendEntries, never()).getPrevLogIndex();
+
+ }
+
@Test
public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
logStart("testInitialSyncUpWithHandleInstallSnapshot");
@@ -635,12 +686,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
follower = createBehavior(context);
- HashMap followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
@@ -692,12 +738,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
follower = createBehavior(context);
- HashMap followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
getNextChunk(bsSnapshot, 10, 50), 3, 3);
@@ -746,6 +787,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
new MockRaftActorContext.MockPayload(data));
}
+ private ByteString createSnapshot(){
+ HashMap followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ return toByteString(followerSnapshot);
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
index 3f085df8dc..383ebefd36 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
@@ -119,6 +119,15 @@ public class LeaderTest extends AbstractLeaderTest {
assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
}
+
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ 1, index, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+ }
+
@Test
public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
@@ -145,8 +154,7 @@ public class LeaderTest extends AbstractLeaderTest {
MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
1, lastIndex + 1, payload);
actorContext.getReplicatedLog().append(newEntry);
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(null, null, newEntry));
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
// State should not change
assertTrue(raftBehavior instanceof Leader);
@@ -160,6 +168,218 @@ public class LeaderTest extends AbstractLeaderTest {
assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
}
+ @Test
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<5;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ }
+
+ List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect only 1 message to be sent because of two reasons,
+ // - an append entries reply was not received
+ // - the heartbeat interval has not expired
+ // In this scenario if multiple messages are sent they would likely be duplicates
+ assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+ }
+
+ @Test
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+ logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+
+ }
+
+ for(int i=3;i<5;i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
+ }
+
+ List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+ // get sent to the follower - but not the 5th
+ assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+ for(int i=0;i<4;i++) {
+ long expected = allMessages.get(i).getEntries().get(0).getIndex();
+ assertEquals(expected, i+2);
+ }
+ }
+
+ @Test
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+ logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ sendReplicate(actorContext, lastIndex+1);
+
+ // Wait slightly longer than heartbeat duration
+ Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(1, allMessages.get(0).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+ }
+
+ @Test
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+ logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+ }
+
+ @Test
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ sendReplicate(actorContext, lastIndex+1);
+
+ List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+ assertEquals(0, allMessages.get(0).getEntries().size());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ }
+
+
@Test
public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java
new file mode 100644
index 0000000000..ec35b03b0a
--- /dev/null
+++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.notifications;
+
+import java.io.Serializable;
+
+/**
+ * A message initiated internally from the RaftActor when some state of a leader has changed
+ *
+ * @author Thomas Pantelis
+ */
+public class LeaderStateChanged implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String memberId;
+ private final String leaderId;
+
+ public LeaderStateChanged(String memberId, String leaderId) {
+ this.memberId = memberId;
+ this.leaderId = leaderId;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId)
+ .append("]");
+ return builder.toString();
+ }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
index d065f6d211..598dfb1fe8 100644
--- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
+++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java
@@ -17,16 +17,17 @@ import java.util.Map;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
/**
- * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying
+ * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying
* the listeners (within the same node), which are registered with it.
*
* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor.
*/
public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable {
- private String memberId;
- private Map registeredListeners = Maps.newHashMap();
+ private final String memberId;
+ private final Map registeredListeners = Maps.newHashMap();
private RoleChangeNotification latestRoleChangeNotification = null;
+ private LeaderStateChanged latestLeaderStateChanged;
public RoleChangeNotifier(String memberId) {
this.memberId = memberId;
@@ -62,6 +63,10 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
getSender().tell(new RegisterRoleChangeListenerReply(), getSelf());
+ if(latestLeaderStateChanged != null) {
+ getSender().tell(latestLeaderStateChanged, getSelf());
+ }
+
if (latestRoleChangeNotification != null) {
getSender().tell(latestRoleChangeNotification, getSelf());
}
@@ -81,6 +86,12 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos
for (ActorRef listener: registeredListeners.values()) {
listener.tell(latestRoleChangeNotification, getSelf());
}
+ } else if (message instanceof LeaderStateChanged) {
+ latestLeaderStateChanged = (LeaderStateChanged)message;
+
+ for (ActorRef listener: registeredListeners.values()) {
+ listener.tell(latestLeaderStateChanged, getSelf());
+ }
}
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java
new file mode 100644
index 0000000000..f2436201d8
--- /dev/null
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.slf4j.Logger;
+
+/**
+ * A factory for creating DOM transactions, either normal or chained.
+ *
+ * @author Thomas Pantelis
+ */
+public class DOMTransactionFactory {
+
+ private final Map transactionChains = new HashMap<>();
+ private final InMemoryDOMDataStore store;
+ private final ShardStats shardMBean;
+ private final Logger log;
+ private final String name;
+
+ public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
+ this.store = store;
+ this.shardMBean = shardMBean;
+ this.log = log;
+ this.name = name;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T newTransaction(TransactionProxy.TransactionType type,
+ String transactionID, String transactionChainID) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainID.isEmpty()) {
+ factory = transactionChains.get(transactionChainID);
+ if(factory == null) {
+ if(log.isDebugEnabled()) {
+ log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
+ transactionChainID);
+ }
+
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainID, transactionChain);
+ factory = transactionChain;
+ }
+ } else {
+ log.debug("{}: Creating transaction with ID {}", name, transactionID);
+ }
+
+ T transaction = null;
+ switch(type) {
+ case READ_ONLY:
+ transaction = (T) factory.newReadOnlyTransaction();
+ shardMBean.incrementReadOnlyTransactionCount();
+ break;
+ case READ_WRITE:
+ transaction = (T) factory.newReadWriteTransaction();
+ shardMBean.incrementReadWriteTransactionCount();
+ break;
+ case WRITE_ONLY:
+ transaction = (T) factory.newWriteOnlyTransaction();
+ shardMBean.incrementWriteOnlyTransactionCount();
+ break;
+ }
+
+ return transaction;
+ }
+
+ public void closeTransactionChain(String transactionChainID) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(transactionChainID);
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
+ public void closeAllTransactionChains() {
+ for(Map.Entry entry : transactionChains.entrySet()){
+ entry.getValue().close();
+ }
+
+ transactionChains.clear();
+ }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
index 7f8a4e779d..d5142c94a6 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
@@ -58,6 +58,7 @@ public class DatastoreContext {
private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
+ private boolean writeOnlyTransactionOptimizationsEnabled = false;
private DatastoreContext() {
setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
@@ -82,6 +83,7 @@ public class DatastoreContext {
this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit;
this.dataStoreType = other.dataStoreType;
this.shardBatchedModificationCount = other.shardBatchedModificationCount;
+ this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -186,6 +188,10 @@ public class DatastoreContext {
return shardBatchedModificationCount;
}
+ public boolean isWriteOnlyTransactionOptimizationsEnabled() {
+ return writeOnlyTransactionOptimizationsEnabled;
+ }
+
public static class Builder {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
@@ -326,6 +332,11 @@ public class DatastoreContext {
return this;
}
+ public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) {
+ datastoreContext.writeOnlyTransactionOptimizationsEnabled = value;
+ return this;
+ }
+
public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
return this;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index 15b97b0f8f..a30b6f7516 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@ -43,6 +43,8 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -66,7 +68,6 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@ -77,8 +78,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -106,7 +105,7 @@ public class Shard extends RaftActor {
private final InMemoryDOMDataStore store;
/// The name of this shard
- private final ShardIdentifier name;
+ private final String name;
private final ShardStats shardMBean;
@@ -143,16 +142,15 @@ public class Shard extends RaftActor {
private ShardRecoveryCoordinator recoveryCoordinator;
private List
*/
public class MessageCollectorActor extends UntypedActor {
- private List messages = new ArrayList<>();
+ private final List messages = new ArrayList<>();
@Override public void onReceive(Object message) throws Exception {
if(message instanceof String){
@@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor {
}
}
+ public void clear() {
+ messages.clear();
+ }
+
public static List getAllMessages(ActorRef actor) throws Exception {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
@@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor {
return output;
}
+ public static T expectFirstMatching(ActorRef actor, Class clazz) {
+ int count = 5000 / 50;
+ for(int i = 0; i < count; i++) {
+ try {
+ T message = (T) getFirstMatching(actor, clazz);
+ if(message != null) {
+ return message;
+ }
+ } catch (Exception e) {}
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ Assert.fail("Did not receive message of type " + clazz);
+ return null;
+ }
}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
index 81b6bccaf0..63878df23c 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
public class MockActorContext extends ActorContext {
@@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext {
return executeRemoteOperationResponse;
}
- @Override public Optional findPrimaryShard(String shardName) {
- return Optional.absent();
- }
-
public void setExecuteShardOperationResponse(Object response){
executeShardOperationResponse = response;
}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
index 360ac68a51..a6fdfd3ff9 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
@@ -53,7 +53,16 @@ public class RemoteRpcImplementation implements DOMRpcImplementation {
@Override
public void onComplete(final Throwable failure, final Object reply) throws Throwable {
if(failure != null) {
- LOG.error("InvokeRpc failed", failure);
+
+ // When we return a failure to the caller they can choose to log it if they like
+ // so here we just do basic warn logging by default and log the stack trace only when debug
+ // is enabled
+
+ LOG.warn("InvokeRpc failed rpc = {}, identifier = {}", rpcMsg.getRpc(), rpcMsg.getIdentifier());
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Detailed Error", failure);
+ }
final String message = String.format("Execution of RPC %s failed", rpcMsg.getRpc());
Collection errors = ((RpcErrorsException)failure).getRpcErrors();