From: Tom Pantelis Date: Wed, 25 Mar 2015 18:43:11 +0000 (+0000) Subject: Merge "Do not use ActorSystem.actorFor as it is deprecated" X-Git-Tag: release/lithium~350 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=4349b034606957d3e876b82b14a292e6739a986a;hp=fc89f3b66b3e5db73e09792bbecebf3156d70d20 Merge "Do not use ActorSystem.actorFor as it is deprecated" --- diff --git a/opendaylight/adsal/features/nsf/src/main/resources/features.xml b/opendaylight/adsal/features/nsf/src/main/resources/features.xml index 3985a69302..56271eb699 100644 --- a/opendaylight/adsal/features/nsf/src/main/resources/features.xml +++ b/opendaylight/adsal/features/nsf/src/main/resources/features.xml @@ -14,8 +14,8 @@ odl-adsal-all - odl-nsf-controller-managers - odl-adsal-controller-northbound + odl-nsf-managers + odl-adsal-northbound @@ -44,43 +44,6 @@ mvn:org.opendaylight.controller/topologymanager/${topologymanager.version} mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version} - - mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version} - mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version} - mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version} - - mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting} - - mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1 - mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version} - - - - odl-base-all - odl-adsal-all - mvn:org.opendaylight.controller/usermanager/${usermanager.version} - mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version} - - mvn:org.opendaylight.controller/appauth/${appauth.version} - - mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version} - mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version} - - mvn:org.opendaylight.controller/containermanager/${containermanager.version} - mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version} - - mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version} - mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version} - - mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version} - mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version} - - mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version} - mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version} - - mvn:org.opendaylight.controller/topologymanager/${topologymanager.version} - mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version} - mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version} mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version} mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version} @@ -117,26 +80,4 @@ mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version} mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version} - - - odl-base-all - odl-nsf-managers - mvn:org.ow2.asm/asm-all/${asm.version} - mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version} - mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version} - mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version} - mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version} - mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version} - mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version} - mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version} - mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version} - mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version} - mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version} - mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version} - mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version} - mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version} - mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version} - mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version} - mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version} - diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 603f34bac9..076d1b2fc7 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,16 +8,11 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Futures; - -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.regex.Pattern; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; @@ -57,7 +52,10 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.regex.Pattern; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; public class EventSourceTopology implements EventAggregatorService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -82,11 +80,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl private final RpcRegistration aggregatorRpcReg; private final EventSourceService eventSourceService; private final RpcProviderRegistry rpcRegistry; - private final ExecutorService executorService; public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + this.dataBroker = dataBroker; - this.executorService = Executors.newCachedThreadPool(); this.rpcRegistry = rpcRegistry; aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); @@ -94,14 +91,17 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + } private void putData(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { + final InstanceIdentifier path, + final T data){ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); tx.put(store, path, data, true); tx.submit(); + } private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { @@ -112,7 +112,34 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl } private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ - executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic)); + + final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); + + final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ + + @Override + public void onSuccess(Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + for (final Node node : nodes) { + if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { + eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + tx.close(); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Can not notify existing nodes {}", t); + tx.close(); + } + + }); + } @Override @@ -164,45 +191,4 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl // FIXME: Return registration object. } - private class NotifyAllNodeExecutor implements Runnable { - - private final EventSourceTopic topic; - private final DataBroker dataBroker; - private final Pattern nodeIdPatternRegex; - - public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) { - this.topic = topic; - this.dataBroker = dataBroker; - this.nodeIdPatternRegex = nodeIdPatternRegex; - } - - @Override - public void run() { - //# Code reader note: Context of Node type is NetworkTopology - final List nodes = snapshot(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } - - private List snapshot() { - try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) { - - final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet(); - - if(data.isPresent()) { - final List nodeList = data.get().getNode(); - if(nodeList != null) { - return nodeList; - } - } - return Collections.emptyList(); - } catch (final ReadFailedException e) { - LOG.error("Unable to retrieve node list.", e); - return Collections.emptyList(); - } - } - } } diff --git a/opendaylight/md-sal/sal-akka-raft/pom.xml b/opendaylight/md-sal/sal-akka-raft/pom.xml index 1f99a52ed5..0ec83c86b3 100644 --- a/opendaylight/md-sal/sal-akka-raft/pom.xml +++ b/opendaylight/md-sal/sal-akka-raft/pom.xml @@ -59,6 +59,11 @@ commons-io + + org.apache.commons + commons-lang3 + + com.typesafe.akka akka-slf4j_${scala.version} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 0beccd1b2b..07b6b617aa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -91,4 +91,10 @@ public interface FollowerLogInformation { */ long timeSinceLastActivity(); + /** + * This method checks if it is ok to replicate + * + * @return true if it is ok to replicate + */ + boolean okToReplicate(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 288a540344..15063cff5b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -26,6 +26,11 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private volatile long matchIndex; + private long lastReplicatedIndex = -1L; + + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); + + public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; this.nextIndex = context.getCommitIndex(); @@ -110,6 +115,28 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { return stopwatch.elapsed(TimeUnit.MILLISECONDS); } + @Override + public boolean okToReplicate() { + // Return false if we are trying to send duplicate data before the heartbeat interval + if(getNextIndex() == lastReplicatedIndex){ + if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams() + .getHeartBeatInterval().toMillis()){ + return false; + } + } + + resetLastReplicated(); + return true; + } + + private void resetLastReplicated(){ + lastReplicatedIndex = getNextIndex(); + if(lastReplicatedStopwatch.isRunning()){ + lastReplicatedStopwatch.reset(); + } + lastReplicatedStopwatch.start(); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -120,6 +147,4 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]"); return builder.toString(); } - - } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 77ff47d0ad..aa72485187 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -17,14 +17,21 @@ import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import java.io.Serializable; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -34,11 +41,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +130,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private int currentRecoveryBatchCount; + private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); + public RaftActor(String id, Map peerAddresses) { this(id, peerAddresses, Optional.absent()); } @@ -297,9 +310,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ - RaftActorBehavior oldBehavior = currentBehavior; + reusableBehaviorStateHolder.init(currentBehavior); currentBehavior = newBehavior; - handleBehaviorChange(oldBehavior, currentBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior); } @Override public void handleCommand(Object message) { @@ -384,31 +397,84 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof CaptureSnapshotReply){ handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); - + } else if(message instanceof GetOnDemandRaftState) { + onGetOnDemandRaftStats(); } else { - RaftActorBehavior oldBehavior = currentBehavior; + reusableBehaviorStateHolder.init(currentBehavior); + currentBehavior = currentBehavior.handleMessage(getSender(), message); - handleBehaviorChange(oldBehavior, currentBehavior); + handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior); + } + } + + private void onGetOnDemandRaftStats() { + // Debugging message to retrieve raft stats. + + OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + .commitIndex(context.getCommitIndex()) + .currentTerm(context.getTermInformation().getCurrentTerm()) + .inMemoryJournalDataSize(replicatedLog.dataSize()) + .inMemoryJournalLogSize(replicatedLog.size()) + .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated()) + .lastApplied(context.getLastApplied()) + .lastIndex(replicatedLog.lastIndex()) + .lastTerm(replicatedLog.lastTerm()) + .leader(getLeaderId()) + .raftState(currentBehavior.state().toString()) + .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex()) + .snapshotIndex(replicatedLog.getSnapshotIndex()) + .snapshotTerm(replicatedLog.getSnapshotTerm()) + .votedFor(context.getTermInformation().getVotedFor()) + .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses())); + + ReplicatedLogEntry lastLogEntry = getLastLogEntry(); + if (lastLogEntry != null) { + builder.lastLogIndex(lastLogEntry.getIndex()); + builder.lastLogTerm(lastLogEntry.getTerm()); } + + if(currentBehavior instanceof AbstractLeader) { + AbstractLeader leader = (AbstractLeader)currentBehavior; + Collection followerIds = leader.getFollowerIds(); + List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); + for(String id: followerIds) { + final FollowerLogInformation info = leader.getFollower(id); + followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()))); + } + + builder.followerInfoList(followerInfoList); + } + + sender().tell(builder.build(), self()); + } - private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { + private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) { + RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); + if (oldBehavior != currentBehavior){ onStateChanged(); } - String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId(); - String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name(); + String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId(); + String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name(); // it can happen that the state has not changed but the leader has changed. - onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + Optional roleChangeNotifier = getRoleChangeNotifier(); + if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) { + if(roleChangeNotifier.isPresent()) { + roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf()); + } + + onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + } - if (getRoleChangeNotifier().isPresent() && + if (roleChangeNotifier.isPresent() && (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { - getRoleChangeNotifier().get().tell( - new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()), - getSelf()); + roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , + currentBehavior.state().name()), getSelf()); } } @@ -998,4 +1064,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return currentBehavior; } + private static class BehaviorStateHolder { + private RaftActorBehavior behavior; + private String leaderId; + + void init(RaftActorBehavior behavior) { + this.behavior = behavior; + this.leaderId = behavior != null ? behavior.getLeaderId() : null; + } + + RaftActorBehavior getBehavior() { + return behavior; + } + + String getLeaderId() { + return leaderId; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a4753a4fe4..a63c62fa30 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -134,7 +134,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * * @return Collection of follower IDs */ - protected final Collection getFollowerIds() { + public final Collection getFollowerIds() { return followerToLog.keySet(); } @@ -460,6 +460,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (followerActor != null) { long followerNextIndex = followerLogInformation.getNextIndex(); boolean isFollowerActive = followerLogInformation.isFollowerActive(); + boolean sendAppendEntries = false; + List entries = Collections.EMPTY_LIST; if (mapFollowerToSnapshot.get(followerId) != null) { // if install snapshot is in process , then sent next chunk if possible @@ -467,8 +469,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); @@ -485,10 +486,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerNextIndex, followerId); // FIXME : Sending one entry at a time - final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); - + if(followerLogInformation.okToReplicate()) { + entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + sendAppendEntries = true; + } } else if (isFollowerActive && followerNextIndex >= 0 && leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { // if the followers next index is not present in the leaders log, and @@ -503,19 +504,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } // Send heartbeat to follower whenever install snapshot is initiated. - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); - + sendAppendEntries = true; initiateCaptureSnapshot(followerId, followerNextIndex); } else if(sendHeartbeat) { - //we send an AppendEntries, even if the follower is inactive + // we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again - sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), - Collections.emptyList(), followerId); + sendAppendEntries = true; } } + + if(sendAppendEntries) { + sendAppendEntriesToFollower(followerActor, followerNextIndex, + entries, followerId); + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 0f251a3012..bdd459ecff 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -96,6 +96,19 @@ public class Follower extends AbstractRaftActorBehavior { // to make it easier to read. Before refactoring ensure tests // cover the code properly + if (snapshotTracker != null) { + // if snapshot install is in progress, follower should just acknowledge append entries with a reply. + AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, + lastIndex(), lastTerm()); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); + } + sender.tell(reply, actor()); + + return this; + } + // 1. Reply false if term < currentTerm (§5.1) // This is handled in the appendEntries method of the base class diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java new file mode 100644 index 0000000000..5d2c56a117 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +import java.beans.ConstructorProperties; + +/** + * A bean class containing a snapshot of information for a follower returned from GetOnDemandRaftStats. + * + * @author Thomas Pantelis + */ +public class FollowerInfo { + private final String id; + private final long nextIndex; + private final long matchIndex; + private final boolean isActive; + private final String timeSinceLastActivity; + + @ConstructorProperties({"id","nextIndex", "matchIndex", "isActive", "timeSinceLastActivity"}) + public FollowerInfo(String id, long nextIndex, long matchIndex, boolean isActive, String timeSinceLastActivity) { + this.id = id; + this.nextIndex = nextIndex; + this.matchIndex = matchIndex; + this.isActive = isActive; + this.timeSinceLastActivity = timeSinceLastActivity; + } + + public String getId() { + return id; + } + + public long getNextIndex() { + return nextIndex; + } + + public long getMatchIndex() { + return matchIndex; + } + + public boolean isActive() { + return isActive; + } + + public String getTimeSinceLastActivity() { + return timeSinceLastActivity; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java new file mode 100644 index 0000000000..be043861fb --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Local message sent to a RaftActor to obtain a snapshot of statistical information. Returns an + * OnDemandRaftState instance. + * + * @author Thomas Pantelis + */ +public class GetOnDemandRaftState { + public static final GetOnDemandRaftState INSTANCE = new GetOnDemandRaftState(); + + private GetOnDemandRaftState() { + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java new file mode 100644 index 0000000000..8c2986f6d1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * The response to a GetOnDemandRaftState message, + * + * @author Thomas Pantelis + */ +public class OnDemandRaftState { + private long lastLogIndex = -1L; + private long lastLogTerm = -1L; + private long currentTerm = -1L; + private long commitIndex = -1L; + private long lastApplied = -1L; + private long lastIndex = -1L; + private long lastTerm = -1L; + private long snapshotIndex = -1L; + private long snapshotTerm = -1L; + private long replicatedToAllIndex = -1L; + private long inMemoryJournalDataSize; + private long inMemoryJournalLogSize; + private String leader; + private String raftState; + private String votedFor; + private boolean isSnapshotCaptureInitiated; + + private List followerInfoList = Collections.emptyList(); + private Map peerAddresses = Collections.emptyMap(); + + private OnDemandRaftState() { + } + + public static Builder builder() { + return new Builder(); + } + + public long getLastLogIndex() { + return lastLogIndex; + } + + public long getLastLogTerm() { + return lastLogTerm; + } + + public long getCurrentTerm() { + return currentTerm; + } + + public long getCommitIndex() { + return commitIndex; + } + + public long getLastApplied() { + return lastApplied; + } + + public long getLastIndex() { + return lastIndex; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getSnapshotIndex() { + return snapshotIndex; + } + + public long getSnapshotTerm() { + return snapshotTerm; + } + + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + + public long getInMemoryJournalDataSize() { + return inMemoryJournalDataSize; + } + + public long getInMemoryJournalLogSize() { + return inMemoryJournalLogSize; + } + + public String getLeader() { + return leader; + } + + public String getRaftState() { + return raftState; + } + + public String getVotedFor() { + return votedFor; + } + + public boolean isSnapshotCaptureInitiated() { + return isSnapshotCaptureInitiated; + } + + public List getFollowerInfoList() { + return followerInfoList; + } + + public Map getPeerAddresses() { + return peerAddresses; + } + + public static class Builder { + private final OnDemandRaftState stats = new OnDemandRaftState(); + + public Builder lastLogIndex(long value) { + stats.lastLogIndex = value; + return this; + } + + public Builder lastLogTerm(long value) { + stats.lastLogTerm = value; + return this; + } + + public Builder currentTerm(long value) { + stats.currentTerm = value; + return this; + } + + public Builder commitIndex(long value) { + stats.commitIndex = value; + return this; + } + + public Builder lastApplied(long value) { + stats.lastApplied = value; + return this; + } + + public Builder lastIndex(long value) { + stats.lastIndex = value; + return this; + } + + public Builder lastTerm(long value) { + stats.lastTerm = value; + return this; + } + + public Builder snapshotIndex(long value) { + stats.snapshotIndex = value; + return this; + } + + public Builder snapshotTerm(long value) { + stats.snapshotTerm = value; + return this; + } + + public Builder replicatedToAllIndex(long value) { + stats.replicatedToAllIndex = value; + return this; + } + + public Builder inMemoryJournalDataSize(long value) { + stats.inMemoryJournalDataSize = value; + return this; + } + + public Builder inMemoryJournalLogSize(long value) { + stats.inMemoryJournalLogSize = value; + return this; + } + + public Builder leader(String value) { + stats.leader = value; + return this; + } + + public Builder raftState(String value) { + stats.raftState = value; + return this; + } + + public Builder votedFor(String value) { + stats.votedFor = value; + return this; + } + + public Builder followerInfoList(List followerInfoList) { + stats.followerInfoList = followerInfoList; + return this; + } + + public Builder peerAddresses(Map peerAddresses) { + stats.peerAddresses = peerAddresses; + return this; + } + + public Builder isSnapshotCaptureInitiated(boolean value) { + stats.isSnapshotCaptureInitiated = value; + return this; + } + + public OnDemandRaftState build() { + return stats; + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java index 5be9030f59..bdfd69ec11 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java @@ -60,4 +60,24 @@ public class FollowerLogInformationImplTest { stopwatch.stop(); return stopwatch.elapsed(TimeUnit.MILLISECONDS); } + + @Test + public void testOkToReplicate(){ + MockRaftActorContext context = new MockRaftActorContext(); + context.setCommitIndex(9); + FollowerLogInformation followerLogInformation = + new FollowerLogInformationImpl( + "follower1", 10, context); + + assertTrue(followerLogInformation.okToReplicate()); + assertFalse(followerLogInformation.okToReplicate()); + + // wait for 150 milliseconds and it should work again + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + assertTrue(followerLogInformation.okToReplicate()); + + //increment next index and try immediately and it should work again + followerLogInformation.incrNextIndex(); + assertTrue(followerLogInformation.okToReplicate()); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index b192b7cd24..34932c7249 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -54,6 +54,7 @@ import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; @@ -64,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; @@ -944,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest { @Test public void testRaftRoleChangeNotifier() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class)); + TestActorRef notifierActor = factory.createTestActor( + Props.create(MessageCollectorActor.class)); MessageCollectorActor.waitUntilReady(notifierActor); DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); @@ -954,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest { String persistenceId = factory.generateActorId("notifier-"); - factory.createTestActor(MockRaftActor.props(persistenceId, + TestActorRef raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config), notifierActor), persistenceId); - List matches = null; - for(int i = 0; i < 5000 / heartBeatInterval; i++) { - matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); - assertNotNull(matches); - if(matches.size() == 3) { - break; - } - Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS); - } - - assertEquals(3, matches.size()); + List matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3); // check if the notifier got a role change from null to Follower RoleChanged raftRoleChanged = matches.get(0); @@ -986,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest { assertEquals(persistenceId, raftRoleChanged.getMemberId()); assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + + LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching( + notifierActor, LeaderStateChanged.class); + + assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId()); + + notifierActor.underlyingActor().clear(); + + MockRaftActor raftActor = raftActorRef.underlyingActor(); + final String newLeaderId = "new-leader"; + Follower follower = new Follower(raftActor.getRaftActorContext()) { + @Override + public RaftActorBehavior handleMessage(ActorRef sender, Object message) { + leaderId = newLeaderId; + return this; + } + }; + + raftActor.changeCurrentBehavior(follower); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(null, leaderStateChange.getLeaderId()); + + raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole()); + + notifierActor.underlyingActor().clear(); + + raftActor.handleCommand("any"); + + leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class); + assertEquals(persistenceId, leaderStateChange.getMemberId()); + assertEquals(newLeaderId, leaderStateChange.getLeaderId()); }}; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 29fb613327..75509bae51 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -2,8 +2,13 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; @@ -577,12 +582,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -627,6 +627,57 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } + + /** + * Verify that when an AppendEntries is sent to a follower during a snapshot install + * the Follower short-circuits the processing of the AppendEntries message. + * + * @throws Exception + */ + @Test + public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(((Follower) follower).getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(((Follower) follower).getSnapshotTracker()); + + // Send an append entry + AppendEntries appendEntries = mock(AppendEntries.class); + doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + // We should not hit the code that needs to look at prevLogIndex because we are short circuiting + verify(appendEntries, never()).getPrevLogIndex(); + + } + @Test public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { logStart("testInitialSyncUpWithHandleInstallSnapshot"); @@ -635,12 +686,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -692,12 +738,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, getNextChunk(bsSnapshot, 10, 50), 3, 3); @@ -746,6 +787,15 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { new MockRaftActorContext.MockPayload(data)); } + private ByteString createSnapshot(){ + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + return toByteString(followerSnapshot); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 3f085df8dc..383ebefd36 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -119,6 +119,15 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); + MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( + 1, index, payload); + actorContext.getReplicatedLog().append(newEntry); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + } + @Test public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); @@ -145,8 +154,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( 1, lastIndex + 1, payload); actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(null, null, newEntry)); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -160,6 +168,218 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); } + @Test + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<5;i++) { + sendReplicate(actorContext, lastIndex+i+1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect only 1 message to be sent because of two reasons, + // - an append entries reply was not received + // - the heartbeat interval has not expired + // In this scenario if multiple messages are sent they would likely be duplicates + assertEquals("The number of append entries collected should be 1", 1, allMessages.size()); + } + + @Test + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + sendReplicate(actorContext, lastIndex+i+1); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex + i + 1, term)); + + } + + for(int i=3;i<5;i++) { + sendReplicate(actorContext, lastIndex + i + 1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would + // get sent to the follower - but not the 5th + assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + + for(int i=0;i<4;i++) { + long expected = allMessages.get(i).getEntries().get(0).getIndex(); + assertEquals(expected, i+2); + } + } + + @Test + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(500, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + sendReplicate(actorContext, lastIndex+1); + + // Wait slightly longer than heartbeat duration + Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(1, allMessages.get(0).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(1, allMessages.get(1).getEntries().size()); + assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + + } + + @Test + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for(int i=0;i<3;i++) { + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 3", 3, allMessages.size()); + } + + @Test + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + sendReplicate(actorContext, lastIndex+1); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(0, allMessages.get(0).getEntries().size()); + assertEquals(1, allMessages.get(1).getEntries().size()); + } + + @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java new file mode 100644 index 0000000000..ec35b03b0a --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/LeaderStateChanged.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.notifications; + +import java.io.Serializable; + +/** + * A message initiated internally from the RaftActor when some state of a leader has changed + * + * @author Thomas Pantelis + */ +public class LeaderStateChanged implements Serializable { + private static final long serialVersionUID = 1L; + + private final String memberId; + private final String leaderId; + + public LeaderStateChanged(String memberId, String leaderId) { + this.memberId = memberId; + this.leaderId = leaderId; + } + + public String getMemberId() { + return memberId; + } + + public String getLeaderId() { + return leaderId; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("LeaderStateChanged [memberId=").append(memberId).append(", leaderId=").append(leaderId) + .append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java index d065f6d211..598dfb1fe8 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/notifications/RoleChangeNotifier.java @@ -17,16 +17,17 @@ import java.util.Map; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; /** - * The RoleChangeNotifier is responsible for receiving Raft role change messages and notifying + * The RoleChangeNotifier is responsible for receiving Raft role and leader state change messages and notifying * the listeners (within the same node), which are registered with it. *

* The RoleChangeNotifier is instantiated by the Shard and injected into the RaftActor. */ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoCloseable { - private String memberId; - private Map registeredListeners = Maps.newHashMap(); + private final String memberId; + private final Map registeredListeners = Maps.newHashMap(); private RoleChangeNotification latestRoleChangeNotification = null; + private LeaderStateChanged latestLeaderStateChanged; public RoleChangeNotifier(String memberId) { this.memberId = memberId; @@ -62,6 +63,10 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos getSender().tell(new RegisterRoleChangeListenerReply(), getSelf()); + if(latestLeaderStateChanged != null) { + getSender().tell(latestLeaderStateChanged, getSelf()); + } + if (latestRoleChangeNotification != null) { getSender().tell(latestRoleChangeNotification, getSelf()); } @@ -81,6 +86,12 @@ public class RoleChangeNotifier extends AbstractUntypedActor implements AutoClos for (ActorRef listener: registeredListeners.values()) { listener.tell(latestRoleChangeNotification, getSelf()); } + } else if (message instanceof LeaderStateChanged) { + latestLeaderStateChanged = (LeaderStateChanged)message; + + for (ActorRef listener: registeredListeners.values()) { + listener.tell(latestLeaderStateChanged, getSelf()); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java new file mode 100644 index 0000000000..f2436201d8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.HashMap; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; +import org.slf4j.Logger; + +/** + * A factory for creating DOM transactions, either normal or chained. + * + * @author Thomas Pantelis + */ +public class DOMTransactionFactory { + + private final Map transactionChains = new HashMap<>(); + private final InMemoryDOMDataStore store; + private final ShardStats shardMBean; + private final Logger log; + private final String name; + + public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) { + this.store = store; + this.shardMBean = shardMBean; + this.log = log; + this.name = name; + } + + @SuppressWarnings("unchecked") + public T newTransaction(TransactionProxy.TransactionType type, + String transactionID, String transactionChainID) { + + DOMStoreTransactionFactory factory = store; + + if(!transactionChainID.isEmpty()) { + factory = transactionChains.get(transactionChainID); + if(factory == null) { + if(log.isDebugEnabled()) { + log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID, + transactionChainID); + } + + DOMStoreTransactionChain transactionChain = store.createTransactionChain(); + transactionChains.put(transactionChainID, transactionChain); + factory = transactionChain; + } + } else { + log.debug("{}: Creating transaction with ID {}", name, transactionID); + } + + T transaction = null; + switch(type) { + case READ_ONLY: + transaction = (T) factory.newReadOnlyTransaction(); + shardMBean.incrementReadOnlyTransactionCount(); + break; + case READ_WRITE: + transaction = (T) factory.newReadWriteTransaction(); + shardMBean.incrementReadWriteTransactionCount(); + break; + case WRITE_ONLY: + transaction = (T) factory.newWriteOnlyTransaction(); + shardMBean.incrementWriteOnlyTransactionCount(); + break; + } + + return transaction; + } + + public void closeTransactionChain(String transactionChainID) { + DOMStoreTransactionChain chain = + transactionChains.remove(transactionChainID); + + if(chain != null) { + chain.close(); + } + } + + public void closeAllTransactionChains() { + for(Map.Entry entry : transactionChains.entrySet()){ + entry.getValue().close(); + } + + transactionChains.clear(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 7f8a4e779d..d5142c94a6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -58,6 +58,7 @@ public class DatastoreContext { private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl(); private String dataStoreType = UNKNOWN_DATA_STORE_TYPE; private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT; + private boolean writeOnlyTransactionOptimizationsEnabled = false; private DatastoreContext() { setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE); @@ -82,6 +83,7 @@ public class DatastoreContext { this.transactionCreationInitialRateLimit = other.transactionCreationInitialRateLimit; this.dataStoreType = other.dataStoreType; this.shardBatchedModificationCount = other.shardBatchedModificationCount; + this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -186,6 +188,10 @@ public class DatastoreContext { return shardBatchedModificationCount; } + public boolean isWriteOnlyTransactionOptimizationsEnabled() { + return writeOnlyTransactionOptimizationsEnabled; + } + public static class Builder { private final DatastoreContext datastoreContext; private int maxShardDataChangeExecutorPoolSize = @@ -326,6 +332,11 @@ public class DatastoreContext { return this; } + public Builder writeOnlyTransactionOptimizationsEnabled(boolean value) { + datastoreContext.writeOnlyTransactionOptimizationsEnabled = value; + return this; + } + public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) { this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize; return this; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 15b97b0f8f..a30b6f7516 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -43,6 +43,8 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; @@ -66,7 +68,6 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; @@ -77,8 +78,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -106,7 +105,7 @@ public class Shard extends RaftActor { private final InMemoryDOMDataStore store; /// The name of this shard - private final ShardIdentifier name; + private final String name; private final ShardStats shardMBean; @@ -143,16 +142,15 @@ public class Shard extends RaftActor { private ShardRecoveryCoordinator recoveryCoordinator; private List currentLogRecoveryBatch; - private final Map transactionChains = new HashMap<>(); + private final DOMTransactionFactory transactionFactory; private final String txnDispatcherPath; - protected Shard(final ShardIdentifier name, final Map peerAddresses, + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - super(name.toString(), mapPeerAddresses(peerAddresses), - Optional.of(datastoreContext.getShardRaftConfig())); + super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); - this.name = name; + this.name = name.toString(); this.datastoreContext = datastoreContext; this.schemaContext = schemaContext; this.dataPersistenceProvider = (datastoreContext.isPersistent()) @@ -173,13 +171,17 @@ public class Shard extends RaftActor { shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); + shardMBean.setShardActor(getSelf()); if (isMetricsCaptureEnabled()) { getContext().become(new MeteringBehavior(this)); } - commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES), - datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString()); + transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); + + commitCoordinator = new ShardCommitCoordinator(transactionFactory, + TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), + datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); setTransactionCommitTimeout(); @@ -195,20 +197,8 @@ public class Shard extends RaftActor { datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); } - private static Map mapPeerAddresses( - final Map peerAddresses) { - Map map = new HashMap<>(); - - for (Map.Entry entry : peerAddresses - .entrySet()) { - map.put(entry.getKey().toString(), entry.getValue()); - } - - return map; - } - public static Props props(final ShardIdentifier name, - final Map peerAddresses, + final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); @@ -272,6 +262,8 @@ public class Shard extends RaftActor { try { if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) { handleCreateTransaction(message); + } else if (BatchedModifications.class.isInstance(message)) { + handleBatchedModifications((BatchedModifications)message); } else if (message instanceof ForwardedReadyTransaction) { handleForwardedReadyTransaction((ForwardedReadyTransaction) message); } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) { @@ -374,9 +366,10 @@ public class Shard extends RaftActor { // currently uses a same thread executor anyway. cohortEntry.getCohort().preCommit().get(); - // If we do not have any followers and we are not using persistence we can - // apply modification to the state immediately - if(!hasFollowers() && !persistence().isRecoveryApplicable()){ + // If we do not have any followers and we are not using persistence + // or if cohortEntry has no modifications + // we can apply modification to the state immediately + if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); } else { Shard.this.persistData(getSender(), transactionID, @@ -450,6 +443,47 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit, getSender(), self()); } + private void handleBatchedModifications(BatchedModifications batched) { + // This message is sent to prepare the modificationsa transaction directly on the Shard as an + // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last + // BatchedModifications message, the caller sets the ready flag in the message indicating + // modifications are complete. The reply contains the cohort actor path (this actor) for the caller + // to initiate the 3-phase commit. This also avoids the overhead of sending an additional + // ReadyTransaction message. + + // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't + // normally get here if we're not the leader as the front-end (TransactionProxy) should determine + // the primary/leader shard. However with timing and caching on the front-end, there's a small + // window where it could have a stale leader during leadership transitions. + // + if(isLeader()) { + try { + BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched); + sender().tell(reply, self()); + } catch (Exception e) { + LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), + batched.getTransactionID(), e); + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } + } else { + ActorSelection leader = getLeader(); + if(leader != null) { + // TODO: what if this is not the first batch and leadership changed in between batched messages? + // We could check if the commitCoordinator already has a cached entry and forward all the previous + // batched modifications. + LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); + leader.forward(batched, getContext()); + } else { + // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make + // it more resilient in case we're in the process of electing a new leader. + getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + "Could not find the leader for shard %s. This typically happens" + + " when the system is coming up or recovering and a leader is being elected. Try again" + + " later.", persistenceId()))), getSelf()); + } + } + } + private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(), ready.getTransactionID(), ready.getTxnClientVersion()); @@ -458,7 +492,7 @@ public class Shard extends RaftActor { // commitCoordinator in preparation for the subsequent three phase commit initiated by // the front-end. commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), - ready.getModification()); + (MutableCompositeModification) ready.getModification()); // Return our actor path as we'll handle the three phase commit, except if the Tx client // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version @@ -535,56 +569,18 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - DOMStoreTransactionChain chain = - transactionChains.remove(closeTransactionChain.getTransactionChainId()); - - if(chain != null) { - chain.close(); - } + transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); } private ActorRef createTypedTransactionActor(int transactionType, ShardTransactionIdentifier transactionId, String transactionChainId, short clientVersion ) { - DOMStoreTransactionFactory factory = store; - - if(!transactionChainId.isEmpty()) { - factory = transactionChains.get(transactionChainId); - if(factory == null){ - DOMStoreTransactionChain transactionChain = store.createTransactionChain(); - transactionChains.put(transactionChainId, transactionChain); - factory = transactionChain; - } - } - - if(this.schemaContext == null) { - throw new IllegalStateException("SchemaContext is not set"); - } - - if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { - - shardMBean.incrementWriteOnlyTransactionCount(); + DOMStoreTransaction transaction = transactionFactory.newTransaction( + TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(), + transactionChainId); - return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion); - - } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { - - shardMBean.incrementReadWriteTransactionCount(); - - return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion); - - } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { - - shardMBean.incrementReadOnlyTransactionCount(); - - return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion); - - } else { - throw new IllegalArgumentException( - "Shard="+name + ":CreateTransaction message has unidentified transaction type=" - + transactionType); - } + return createShardTransaction(transaction, transactionId, clientVersion); } private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId, @@ -792,7 +788,6 @@ public class Shard extends RaftActor { recoveryCoordinator = null; currentLogRecoveryBatch = null; - updateJournalStats(); //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); @@ -831,9 +826,6 @@ public class Shard extends RaftActor { persistenceId(), data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()); } - - updateJournalStats(); - } private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { @@ -851,19 +843,6 @@ public class Shard extends RaftActor { } } - private void updateJournalStats() { - ReplicatedLogEntry lastLogEntry = getLastLogEntry(); - - if (lastLogEntry != null) { - shardMBean.setLastLogIndex(lastLogEntry.getIndex()); - shardMBean.setLastLogTerm(lastLogEntry.getTerm()); - } - - shardMBean.setCommitIndex(getCommitIndex()); - shardMBean.setLastApplied(getLastApplied()); - shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize()); - } - @Override protected void createSnapshot() { // Create a transaction actor. We are really going to treat the transaction as a worker @@ -921,21 +900,15 @@ public class Shard extends RaftActor { delayedListenerRegistrations.clear(); } - shardMBean.setRaftState(getRaftState().name()); - shardMBean.setCurrentTerm(getCurrentTerm()); - // If this actor is no longer the leader close all the transaction chains - if(!isLeader){ - for(Map.Entry entry : transactionChains.entrySet()){ - if(LOG.isDebugEnabled()) { - LOG.debug( - "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", - persistenceId(), entry.getKey(), getId()); - } - entry.getValue().close(); + if(!isLeader) { + if(LOG.isDebugEnabled()) { + LOG.debug( + "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader", + persistenceId(), getId()); } - transactionChains.clear(); + transactionFactory.closeAllTransactionChains(); } } @@ -944,12 +917,8 @@ public class Shard extends RaftActor { return dataPersistenceProvider; } - @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { - shardMBean.setLeader(newLeader); - } - @Override public String persistenceId() { - return this.name.toString(); + return this.name; } @VisibleForTesting @@ -957,16 +926,22 @@ public class Shard extends RaftActor { return dataPersistenceProvider; } + @VisibleForTesting + ShardCommitCoordinator getCommitCoordinator() { + return commitCoordinator; + } + + private static class ShardCreator implements Creator { private static final long serialVersionUID = 1L; final ShardIdentifier name; - final Map peerAddresses; + final Map peerAddresses; final DatastoreContext datastoreContext; final SchemaContext schemaContext; - ShardCreator(final ShardIdentifier name, final Map peerAddresses, + ShardCreator(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { this.name = name; this.peerAddresses = peerAddresses; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 951bc22545..54f15fcb4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -9,16 +9,26 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Status; +import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.slf4j.Logger; /** @@ -28,10 +38,17 @@ import org.slf4j.Logger; */ public class ShardCommitCoordinator { + // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + public interface CohortDecorator { + DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + } + private final Cache cohortCache; private CohortEntry currentCohortEntry; + private final DOMTransactionFactory transactionFactory; + private final Queue queuedCohortEntries; private int queueCapacity; @@ -40,14 +57,33 @@ public class ShardCommitCoordinator { private final String name; - public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log, - String name) { - cohortCache = CacheBuilder.newBuilder().expireAfterAccess( - cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build(); + private final String shardActorPath; + + private final RemovalListener cacheRemovalListener = + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + if(notification.getCause() == RemovalCause.EXPIRED) { + log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey()); + } + } + }; + + // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. + private CohortDecorator cohortDecorator; + + public ShardCommitCoordinator(DOMTransactionFactory transactionFactory, + long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) { this.queueCapacity = queueCapacity; this.log = log; this.name = name; + this.transactionFactory = transactionFactory; + + shardActorPath = Serialization.serializedActorPath(shardActor); + + cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS). + removalListener(cacheRemovalListener).build(); // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls // since this should only be accessed on the shard's dispatcher. @@ -59,19 +95,62 @@ public class ShardCommitCoordinator { } /** - * This method caches a cohort entry for the given transactions ID in preparation for the - * subsequent 3-phase commit. + * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches + * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit. * * @param transactionID the ID of the transaction * @param cohort the cohort to participate in the transaction commit - * @param modification the modification made by the transaction + * @param modification the modifications made by the transaction */ public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + MutableCompositeModification modification) { cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification)); } + /** + * This method handles a BatchedModifications message for a transaction being prepared directly on the + * Shard actor instead of via a ShardTransaction actor. If there's no currently cached + * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If + * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created. + * + * @param batched the BatchedModifications + * @param shardActor the transaction's shard actor + * + * @throws ExecutionException if an error occurs loading the cache + */ + public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched) + throws ExecutionException { + CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); + if(cohortEntry == null) { + cohortEntry = new CohortEntry(batched.getTransactionID(), + transactionFactory.newTransaction( + TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(), + batched.getTransactionChainID())); + cohortCache.put(batched.getTransactionID(), cohortEntry); + } + + if(log.isDebugEnabled()) { + log.debug("{}: Applying {} batched modifications for Tx {}", name, + batched.getModifications().size(), batched.getTransactionID()); + } + + cohortEntry.applyModifications(batched.getModifications()); + + String cohortPath = null; + if(batched.isReady()) { + if(log.isDebugEnabled()) { + log.debug("{}: Readying Tx {}, client version {}", name, + batched.getTransactionID(), batched.getVersion()); + } + + cohortEntry.ready(cohortDecorator); + cohortPath = shardActorPath; + } + + return new BatchedModificationsReply(batched.getModifications().size(), cohortPath); + } + /** * This method handles the canCommit phase for a transaction. * @@ -215,19 +294,33 @@ public class ShardCommitCoordinator { } } + @VisibleForTesting + void setCohortDecorator(CohortDecorator cohortDecorator) { + this.cohortDecorator = cohortDecorator; + } + + static class CohortEntry { private final String transactionID; - private final DOMStoreThreePhaseCommitCohort cohort; - private final Modification modification; + private DOMStoreThreePhaseCommitCohort cohort; + private final MutableCompositeModification compositeModification; + private final DOMStoreWriteTransaction transaction; private ActorRef canCommitSender; private ActorRef shard; private long lastAccessTime; + CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) { + this.compositeModification = new MutableCompositeModification(); + this.transaction = transaction; + this.transactionID = transactionID; + } + CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification) { + MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.modification = modification; + this.compositeModification = compositeModification; + this.transaction = null; } void updateLastAccessTime() { @@ -246,8 +339,26 @@ public class ShardCommitCoordinator { return cohort; } - Modification getModification() { - return modification; + MutableCompositeModification getModification() { + return compositeModification; + } + + void applyModifications(Iterable modifications) { + for(Modification modification: modifications) { + compositeModification.addModification(modification); + modification.apply(transaction); + } + } + + void ready(CohortDecorator cohortDecorator) { + Preconditions.checkState(cohort == null, "cohort was already set"); + + cohort = transaction.ready(); + + if(cohortDecorator != null) { + // Call the hook for unit tests. + cohort = cohortDecorator.decorate(transactionID, cohort); + } } ActorRef getCanCommitSender() { @@ -265,5 +376,9 @@ public class ShardCommitCoordinator { void setShard(ActorRef shard) { this.shard = shard; } + + boolean hasModifications(){ + return compositeModification.getModifications().size() > 0; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 136c6813ea..bc4c825351 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; @@ -20,24 +21,28 @@ import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; +import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; @@ -53,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; @@ -74,7 +80,7 @@ import scala.concurrent.duration.Duration; */ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { - private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); // Stores a mapping between a member name and the address of the member // Member names look like "member-1", "member-2" etc and are as specified @@ -172,15 +178,45 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onRoleChangeNotification((RoleChangeNotification) message); } else if(message instanceof FollowerInitialSyncUpStatus){ onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); - } else{ + } else if(message instanceof ShardNotInitializedTimeout) { + onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); + } else if(message instanceof LeaderStateChanged) { + onLeaderStateChanged((LeaderStateChanged)message); + } else { unknownMessage(message); } } + private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) { + LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); + + ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setLeaderId(leaderStateChanged.getLeaderId()); + } else { + LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId()); + } + } + + private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) { + ShardInformation shardInfo = message.getShardInfo(); + + LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), + shardInfo.getShardId()); + + shardInfo.removeOnShardInitialized(message.getOnShardInitialized()); + + if(!shardInfo.isShardInitialized()) { + message.getSender().tell(new ActorNotInitialized(), getSelf()); + } else { + message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf()); + } + } + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { - LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), - status.isInitialSyncDone()); + LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(), + status.getName(), status.isInitialSyncDone()); ShardInformation shardInformation = findShardInformation(status.getName()); @@ -193,7 +229,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onRoleChangeNotification(RoleChangeNotification roleChanged) { - LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), + LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(), roleChanged.getOldRole(), roleChanged.getNewRole()); ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); @@ -201,8 +237,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.setRole(roleChanged.getNewRole()); if (isReady()) { - LOG.info("All Shards are ready - data store {} is ready, available count is {}", type, - waitTillReadyCountdownLatch.getCount()); + LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", + persistenceId(), type, waitTillReadyCountdownLatch.getCount()); waitTillReadyCountdownLatch.countDown(); } @@ -225,7 +261,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private boolean isReady() { boolean isReady = true; for (ShardInformation info : localShards.values()) { - if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){ + if(!info.isShardReady()){ isReady = false; break; } @@ -256,14 +292,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (shardId.getShardName() == null) { return; } + markShardAsInitialized(shardId.getShardName()); } private void markShardAsInitialized(String shardName) { LOG.debug("Initializing shard [{}]", shardName); + ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { shardInformation.setActorInitialized(); + + shardInformation.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -300,7 +340,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -308,20 +348,36 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, - final Supplier messageSupplier) { - if (!shardInformation.isShardInitialized()) { - if(waitUntilInitialized) { + private void sendResponse(ShardInformation shardInformation, boolean doWait, + boolean wantShardReady, final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) { + if(doWait) { final ActorRef sender = getSender(); final ActorRef self = self(); - shardInformation.addRunnableOnInitialized(new Runnable() { + + Runnable replyRunnable = new Runnable() { @Override public void run() { sender.tell(messageSupplier.get(), self); } - }); - } else { + }; + + OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) : + new OnShardInitialized(replyRunnable); + + shardInformation.addOnShardInitialized(onShardInitialized); + + Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( + datastoreContext.getShardInitializationTimeout().duration(), getSelf(), + new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender), + getContext().dispatcher(), getSelf()); + + onShardInitialized.setTimeoutSchedule(timeoutSchedule); + + } else if (!shardInformation.isShardInitialized()) { getSender().tell(new ActorNotInitialized(), getSelf()); + } else { + getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf()); } return; @@ -330,6 +386,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } + private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + return new NoShardLeaderException(String.format( + "Could not find a leader for shard %s. This typically happens when the system is coming up or " + + "recovering and a leader is being elected. Try again later.", shardId)); + } + private void memberRemoved(ClusterEvent.MemberRemoved message) { memberNameToAddress.remove(message.member().roles().head()); } @@ -341,8 +403,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); - info.updatePeerAddress(getShardIdentifier(memberName, shardName), - getShardActorPath(shardName, memberName)); + info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(), + getShardActorPath(shardName, memberName), getSelf()); } } @@ -384,13 +446,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Sending new SchemaContext to Shards"); for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { - info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString())); + info.setActor(newShardActor(schemaContext, info)); } else { info.getActor().tell(message, getSelf()); } - info.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -402,16 +461,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } + @VisibleForTesting + protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { + return getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext) + .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + } + private void findPrimary(FindPrimary message) { - String shardName = message.getShardName(); + final String shardName = message.getShardName(); // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + sendResponse(info, message.isWaitUntilReady(), true, new Supplier() { @Override public Object get() { - return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable(); + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found primary for {}: {}", shardName, found); + } + + return found; } }); @@ -481,7 +553,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); - Map peerAddresses = getPeerAddresses(shardName); + Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } @@ -496,22 +568,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName){ - Map peerAddresses = new HashMap<>(); + Map peerAddresses = new HashMap<>(); - List members = - this.configuration.getMembersFromShardName(shardName); + List members = this.configuration.getMembersFromShardName(shardName); String currentMemberName = this.cluster.getCurrentMemberName(); for(String memberName : members){ if(!currentMemberName.equals(memberName)){ - ShardIdentifier shardId = getShardIdentifier(memberName, - shardName); - String path = - getShardActorPath(shardName, currentMemberName); - peerAddresses.put(shardId, path); + ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + String path = getShardActorPath(shardName, currentMemberName); + peerAddresses.put(shardId.toString(), path); } } return peerAddresses; @@ -552,23 +621,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } - private class ShardInformation { + @VisibleForTesting + protected static class ShardInformation { private final ShardIdentifier shardId; private final String shardName; private ActorRef actor; private ActorPath actorPath; - private final Map peerAddresses; + private final Map peerAddresses; // flag that determines if the actor is ready for business private boolean actorInitialized = false; private boolean followerSyncStatus = false; - private final List runnablesOnInitialized = Lists.newArrayList(); + private final Set onShardInitializedSet = Sets.newHashSet(); private String role ; + private String leaderId; private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; @@ -595,11 +666,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return shardId; } - Map getPeerAddresses() { + Map getPeerAddresses() { return peerAddresses; } - void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ @@ -611,42 +682,87 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { peerId, peerAddress, actor.path()); } - actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender); } + + notifyOnShardInitializedCallbacks(); } } + boolean isShardReady() { + return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role); + } + + boolean isShardReadyWithLeaderId() { + return isShardReady() && (isLeader() || peerAddresses.containsKey(leaderId)); + } + boolean isShardInitialized() { return getActor() != null && actorInitialized; } + boolean isLeader() { + return Objects.equal(leaderId, shardId.toString()); + } + + String getSerializedLeaderActor() { + if(isLeader()) { + return Serialization.serializedActorPath(getActor()); + } else { + return peerAddresses.get(leaderId); + } + } + void setActorInitialized() { + LOG.debug("Shard {} is initialized", shardId); + this.actorInitialized = true; - for(Runnable runnable: runnablesOnInitialized) { - runnable.run(); + notifyOnShardInitializedCallbacks(); + } + + private void notifyOnShardInitializedCallbacks() { + if(onShardInitializedSet.isEmpty()) { + return; } - runnablesOnInitialized.clear(); + boolean ready = isShardReadyWithLeaderId(); + + if(LOG.isDebugEnabled()) { + LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId, + ready ? "ready" : "initialized", onShardInitializedSet.size()); + } + + Iterator iter = onShardInitializedSet.iterator(); + while(iter.hasNext()) { + OnShardInitialized onShardInitialized = iter.next(); + if(!(onShardInitialized instanceof OnShardReady) || ready) { + iter.remove(); + onShardInitialized.getTimeoutSchedule().cancel(); + onShardInitialized.getReplyRunnable().run(); + } + } } - void addRunnableOnInitialized(Runnable runnable) { - runnablesOnInitialized.add(runnable); + void addOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.add(onShardInitialized); } - public void setRole(String newRole) { - this.role = newRole; + void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + onShardInitializedSet.remove(onShardInitialized); } - public String getRole(){ - return this.role; + void setRole(String newRole) { + this.role = newRole; + + notifyOnShardInitializedCallbacks(); } - public void setFollowerSyncStatus(boolean syncStatus){ + void setFollowerSyncStatus(boolean syncStatus){ this.followerSyncStatus = syncStatus; } - public boolean isInSync(){ + boolean isInSync(){ if(RaftState.Follower.name().equals(this.role)){ return followerSyncStatus; } else if(RaftState.Leader.name().equals(this.role)){ @@ -656,6 +772,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + void setLeaderId(String leaderId) { + this.leaderId = leaderId; + + notifyOnShardInitializedCallbacks(); + } } private static class ShardManagerCreator implements Creator { @@ -680,6 +801,57 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private static class OnShardInitialized { + private final Runnable replyRunnable; + private Cancellable timeoutSchedule; + + OnShardInitialized(Runnable replyRunnable) { + this.replyRunnable = replyRunnable; + } + + Runnable getReplyRunnable() { + return replyRunnable; + } + + Cancellable getTimeoutSchedule() { + return timeoutSchedule; + } + + void setTimeoutSchedule(Cancellable timeoutSchedule) { + this.timeoutSchedule = timeoutSchedule; + } + } + + private static class OnShardReady extends OnShardInitialized { + OnShardReady(Runnable replyRunnable) { + super(replyRunnable); + } + } + + private static class ShardNotInitializedTimeout { + private final ActorRef sender; + private final ShardInformation shardInfo; + private final OnShardInitialized onShardInitialized; + + ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + this.sender = sender; + this.shardInfo = shardInfo; + this.onShardInitialized = onShardInitialized; + } + + ActorRef getSender() { + return sender; + } + + ShardInformation getShardInfo() { + return shardInfo; + } + + OnShardInitialized getOnShardInitialized() { + return onShardInitialized; + } + } + static class SchemaContextModules implements Serializable { private static final long serialVersionUID = -8884620101025936590L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index c1f9c78e69..3a209630c3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -16,6 +16,7 @@ import com.google.common.util.concurrent.SettableFuture; import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; @@ -40,8 +41,8 @@ import scala.concurrent.Future; public class TransactionContextImpl extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); + private final String transactionChainId; private final ActorContext actorContext; - private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; @@ -49,12 +50,12 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; - protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, - ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { + protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); - this.transactionPath = transactionPath; this.actor = actor; + this.transactionChainId = transactionChainId; this.actorContext = actorContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; @@ -71,6 +72,10 @@ public class TransactionContextImpl extends AbstractTransactionContext { return actor; } + protected ActorContext getActorContext() { + return actorContext; + } + protected short getRemoteTransactionVersion() { return remoteTransactionVersion; } @@ -93,21 +98,24 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // Send the ReadyTransaction message to the Tx actor. - final Future replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + return combineRecordedOperationsFutures(readyReplyFuture); + } + + protected Future combineRecordedOperationsFutures(final Future withLastReplyFuture) { // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined // Future will fail. We need all prior operations and the ready operation to succeed // in order to attempt commit. - List> futureList = - Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1); + List> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1); futureList.addAll(recordedOperationFutures); - futureList.add(replyFuture); + futureList.add(withLastReplyFuture); Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); @@ -129,28 +137,15 @@ public class TransactionContextImpl extends AbstractTransactionContext { // de-serializing each reply. // Note the Future get call here won't block as it's complete. - Object serializedReadyReply = replyFuture.value().get().get(); + Object serializedReadyReply = withLastReplyFuture.value().get().get(); if (serializedReadyReply instanceof ReadyTransactionReply) { return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - + } else if(serializedReadyReply instanceof BatchedModificationsReply) { + return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = reply.getCohortPath(); - - // In Helium we used to return the local path of the actor which represented - // a remote ThreePhaseCommitCohort. The local path would then be converted to - // a remote path using this resolvePath method. To maintain compatibility with - // a Helium node we need to continue to do this conversion. - // At some point in the future when upgrades from Helium are not supported - // we could remove this code to resolvePath and just use the cohortPath as the - // resolved cohortPath - if(TransactionContextImpl.this.remoteTransactionVersion < - DataStoreVersions.HELIUM_1_VERSION) { - cohortPath = actorContext.resolvePath(transactionPath, cohortPath); - } - + String cohortPath = deserializeCohortPath(reply.getCohortPath()); return actorContext.actorSelection(cohortPath); - } else { // Throwing an exception here will fail the Future. throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", @@ -160,27 +155,51 @@ public class TransactionContextImpl extends AbstractTransactionContext { }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } + protected String deserializeCohortPath(String cohortPath) { + return cohortPath; + } + private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(remoteTransactionVersion); + batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion, + transactionChainId); } batchedModifications.addModification(modification); if(batchedModifications.getModifications().size() >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { - sendBatchedModifications(); + sendAndRecordBatchedModifications(); } } - private void sendBatchedModifications() { + private void sendAndRecordBatchedModifications() { + Future sentFuture = sendBatchedModifications(); + if(sentFuture != null) { + recordedOperationFutures.add(sentFuture); + } + } + + protected Future sendBatchedModifications() { + return sendBatchedModifications(false); + } + + protected Future sendBatchedModifications(boolean ready) { + Future sent = null; if(batchedModifications != null) { - LOG.debug("Tx {} sending {} batched modifications", identifier, - batchedModifications.getModifications().size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier, + batchedModifications.getModifications().size(), ready); + } - recordedOperationFutures.add(executeOperationAsync(batchedModifications)); - batchedModifications = null; + batchedModifications.setReady(ready); + sent = executeOperationAsync(batchedModifications); + + batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion, + transactionChainId); } + + return sent; } @Override @@ -212,7 +231,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read @@ -297,7 +316,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 64b9086c25..64f914b19f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -70,7 +70,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public static enum TransactionType { READ_ONLY, WRITE_ONLY, - READ_WRITE + READ_WRITE; + + public static TransactionType fromInt(int type) { + if(type == WRITE_ONLY.ordinal()) { + return WRITE_ONLY; + } else if(type == READ_WRITE.ordinal()) { + return READ_WRITE; + } else if(type == READ_ONLY.ordinal()) { + return READ_ONLY; + } else { + throw new IllegalArgumentException("In TransactionType enum value" + type); + } + } } static final Mapper SAME_FAILURE_TRANSFORMER = @@ -473,7 +485,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public void onComplete(Throwable failure, ActorSelection primaryShard) { if(failure != null) { - newTxFutureCallback.onComplete(failure, null); + newTxFutureCallback.createTransactionContext(failure, null); } else { newTxFutureCallback.setPrimaryShard(primaryShard); } @@ -550,10 +562,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Sets the target primary shard and initiates a CreateTransaction try. */ void setPrimaryShard(ActorSelection primaryShard) { - LOG.debug("Tx {} Primary shard found - trying create transaction", identifier); - this.primaryShard = primaryShard; - tryCreateTransaction(); + + if(transactionType == TransactionType.WRITE_ONLY && + actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", + identifier, primaryShard); + + // For write-only Tx's we prepare the transaction modifications directly on the shard actor + // to avoid the overhead of creating a separate transaction actor. + // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. + executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard, + this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); + } else { + tryCreateTransaction(); + } } /** @@ -563,7 +586,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { boolean invokeOperation = true; synchronized(txOperationsOnComplete) { if(transactionContext == null) { - LOG.debug("Tx {} Adding operation on complete {}", identifier); + LOG.debug("Tx {} Adding operation on complete", identifier); invokeOperation = false; txOperationsOnComplete.add(operation); @@ -590,6 +613,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard); + } + Object serializedCreateMessage = new CreateTransaction(identifier.toString(), TransactionProxy.this.transactionType.ordinal(), getTransactionChainId()).toSerializable(); @@ -621,6 +648,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } + createTransactionContext(failure, response); + } + + private void createTransactionContext(Throwable failure, Object response) { // Mainly checking for state violation here to perform a volatile read of "initialized" to // ensure updates to operationLimter et al are visible to this thread (ie we're doing // "piggy-back" synchronization here). @@ -636,8 +667,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // TransactionContext until after we've executed all cached TransactionOperations. TransactionContext localTransactionContext; if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, - failure.getMessage()); + LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure); localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { @@ -687,11 +717,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { - String transactionPath = reply.getTransactionPath(); - LOG.debug("Tx {} Received {}", identifier, reply); - ActorSelection transactionActor = actorContext.actorSelection(transactionPath); + return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()), + reply.getTransactionPath(), reply.getVersion()); + } + + private TransactionContext createValidTransactionContext(ActorSelection transactionActor, + String transactionPath, short remoteTransactionVersion) { if (transactionType == TransactionType.READ_ONLY) { // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference @@ -720,12 +753,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) { - return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); - } else { + if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); + transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, + operationCompleter); + } else if (transactionType == TransactionType.WRITE_ONLY && + actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { + return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); + } else { + return new TransactionContextImpl(transactionActor, identifier, transactionChainId, + actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java new file mode 100644 index 0000000000..3b4a190a9e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * Context for a write-only transaction. + * + * @author Thomas Pantelis + */ +public class WriteOnlyTransactionContextImpl extends TransactionContextImpl { + private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class); + + public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + short remoteTransactionVersion, OperationCompleter operationCompleter) { + super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal, + remoteTransactionVersion, operationCompleter); + } + + @Override + public Future readyTransaction() { + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + identifier, recordedOperationFutures.size()); + + // Send the remaining batched modifications if any. + + Future lastModificationsFuture = sendBatchedModifications(true); + + return combineRecordedOperationsFutures(lastModificationsFuture); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index e407c7cc47..ccfb329692 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -8,16 +8,21 @@ package org.opendaylight.controller.cluster.datastore.compat; import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.OperationCompleter; import org.opendaylight.controller.cluster.datastore.TransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * Implementation of TransactionContextImpl used when talking to a pre-Lithium controller that doesn't @@ -26,12 +31,16 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * @author Thomas Pantelis */ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { + private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class); + + private final String transactionPath; public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, - ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { - super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal, - remoteTransactionVersion, operationCompleter); + super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal, + remoteTransactionVersion, operationCompleter); + this.transactionPath = transactionPath; } @Override @@ -51,4 +60,32 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { recordedOperationFutures.add(executeOperationAsync( new WriteData(path, data, getRemoteTransactionVersion()))); } + + @Override + public Future readyTransaction() { + LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", + identifier, recordedOperationFutures.size()); + + // Send the ReadyTransaction message to the Tx actor. + + Future lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + + return combineRecordedOperationsFutures(lastReplyFuture); + } + + @Override + protected String deserializeCohortPath(String cohortPath) { + // In base Helium we used to return the local path of the actor which represented + // a remote ThreePhaseCommitCohort. The local path would then be converted to + // a remote path using this resolvePath method. To maintain compatibility with + // a Helium node we need to continue to do this conversion. + // At some point in the future when upgrades from Helium are not supported + // we could remove this code to resolvePath and just use the cohortPath as the + // resolved cohortPath + if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { + return getActorContext().resolvePath(transactionPath, cohortPath); + } + + return cohortPath; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java index d1f9495d86..fa1525c574 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java @@ -12,10 +12,14 @@ import com.google.common.base.Preconditions; public class ShardTransactionIdentifier { private final String remoteTransactionId; + private final String stringRepresentation; public ShardTransactionIdentifier(String remoteTransactionId) { this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null"); + + stringRepresentation = new StringBuilder(remoteTransactionId.length() + 6).append("shard-"). + append(remoteTransactionId).toString(); } public String getRemoteTransactionId() { @@ -46,9 +50,7 @@ public class ShardTransactionIdentifier { } @Override public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("shard-").append(remoteTransactionId); - return sb.toString(); + return stringRepresentation; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java index 577a03c3a3..fb59b7643f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java @@ -8,10 +8,21 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard; +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Stopwatch; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats; @@ -19,6 +30,9 @@ import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXB import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; /** * Maintains statistics for a shard. @@ -28,6 +42,13 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { public static String JMX_CATEGORY_SHARD = "Shards"; + private static final Logger LOG = LoggerFactory.getLogger(ShardStats.class); + + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + private static final Cache onDemandRaftStateCache = + CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build(); + private long committedTransactionsCount; private long readOnlyTransactionCount; @@ -36,20 +57,6 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { private long readWriteTransactionCount; - private String leader; - - private String raftState; - - private long lastLogTerm = -1L; - - private long lastLogIndex = -1L; - - private long currentTerm = -1L; - - private long commitIndex = -1L; - - private long lastApplied = -1L; - private long lastCommittedTransactionTime; private long failedTransactionsCount; @@ -62,12 +69,13 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean; - private long dataSize = 0; + private boolean followerInitialSyncStatus = false; - private final SimpleDateFormat sdf = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private ActorRef shardActor; - private boolean followerInitialSyncStatus = false; + private String statRetrievalError; + + private String statRetrievalTime; public ShardStats(final String shardName, final String mxBeanType) { super(shardName, mxBeanType, JMX_CATEGORY_SHARD); @@ -80,6 +88,38 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor()); } + public void setShardActor(ActorRef shardActor) { + this.shardActor = shardActor; + } + + private OnDemandRaftState getOnDemandRaftState() { + String name = getShardName(); + OnDemandRaftState state = onDemandRaftStateCache.getIfPresent(name); + if(state == null) { + statRetrievalError = null; + statRetrievalTime = null; + + if(shardActor != null) { + Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + try { + Stopwatch timer = Stopwatch.createStarted(); + + state = (OnDemandRaftState) Await.result(Patterns.ask(shardActor, + GetOnDemandRaftState.INSTANCE, timeout), timeout.duration()); + + statRetrievalTime = timer.stop().toString(); + onDemandRaftStateCache.put(name, state); + } catch (Exception e) { + statRetrievalError = e.toString(); + } + } + + state = state != null ? state : OnDemandRaftState.builder().build(); + } + + return state; + } + @Override public String getShardName() { return getMBeanName(); @@ -92,12 +132,12 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { @Override public String getLeader() { - return leader; + return getOnDemandRaftState().getLeader(); } @Override public String getRaftState() { - return raftState; + return getOnDemandRaftState().getRaftState(); } @Override @@ -117,33 +157,67 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { @Override public long getLastLogIndex() { - return lastLogIndex; + return getOnDemandRaftState().getLastLogIndex(); } @Override public long getLastLogTerm() { - return lastLogTerm; + return getOnDemandRaftState().getLastLogTerm(); } @Override public long getCurrentTerm() { - return currentTerm; + return getOnDemandRaftState().getCurrentTerm(); } @Override public long getCommitIndex() { - return commitIndex; + return getOnDemandRaftState().getCommitIndex(); } @Override public long getLastApplied() { - return lastApplied; + return getOnDemandRaftState().getLastApplied(); } @Override - public String getLastCommittedTransactionTime() { + public long getLastIndex() { + return getOnDemandRaftState().getLastIndex(); + } - return sdf.format(new Date(lastCommittedTransactionTime)); + @Override + public long getLastTerm() { + return getOnDemandRaftState().getLastTerm(); + } + + @Override + public long getSnapshotIndex() { + return getOnDemandRaftState().getSnapshotIndex(); + } + + @Override + public long getSnapshotTerm() { + return getOnDemandRaftState().getSnapshotTerm(); + } + + @Override + public long getReplicatedToAllIndex() { + return getOnDemandRaftState().getReplicatedToAllIndex(); + } + + @Override + public String getVotedFor() { + return getOnDemandRaftState().getVotedFor(); + } + + @Override + public boolean isSnapshotCaptureInitiated() { + return getOnDemandRaftState().isSnapshotCaptureInitiated(); + } + + @Override + public String getLastCommittedTransactionTime() { + return DATE_FORMAT.format(new Date(lastCommittedTransactionTime)); } @Override @@ -190,45 +264,18 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { return ++abortTransactionsCount; } - public void setLeader(final String leader) { - this.leader = leader; - } - - public void setRaftState(final String raftState) { - this.raftState = raftState; - } - - public void setLastLogTerm(final long lastLogTerm) { - this.lastLogTerm = lastLogTerm; - } - - public void setLastLogIndex(final long lastLogIndex) { - this.lastLogIndex = lastLogIndex; - } - - public void setCurrentTerm(final long currentTerm) { - this.currentTerm = currentTerm; - } - - public void setCommitIndex(final long commitIndex) { - this.commitIndex = commitIndex; - } - - public void setLastApplied(final long lastApplied) { - this.lastApplied = lastApplied; - } - public void setLastCommittedTransactionTime(final long lastCommittedTransactionTime) { this.lastCommittedTransactionTime = lastCommittedTransactionTime; } - public void setInMemoryJournalDataSize(long dataSize){ - this.dataSize = dataSize; + @Override + public long getInMemoryJournalDataSize(){ + return getOnDemandRaftState().getInMemoryJournalDataSize(); } @Override - public long getInMemoryJournalDataSize(){ - return dataSize; + public long getInMemoryJournalLogSize() { + return getOnDemandRaftState().getInMemoryJournalLogSize(); } @Override @@ -287,4 +334,36 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { public boolean getFollowerInitialSyncStatus() { return followerInitialSyncStatus; } + + @Override + public List getFollowerInfo() { + return getOnDemandRaftState().getFollowerInfoList(); + } + + @Override + public String getPeerAddresses() { + StringBuilder builder = new StringBuilder(); + int i = 0; + for(Map.Entry e: getOnDemandRaftState().getPeerAddresses().entrySet()) { + if(i++ > 0) { + builder.append(", "); + } + + builder.append(e.getKey()).append(": ").append(e.getValue()); + } + + return builder.toString(); + } + + @Override + public String getStatRetrievalTime() { + getOnDemandRaftState(); + return statRetrievalTime; + } + + @Override + public String getStatRetrievalError() { + getOnDemandRaftState(); + return statRetrievalError; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java index 0281cdd8ce..1c0c83b699 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java @@ -1,7 +1,7 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard; import java.util.List; - +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats; import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats; @@ -12,6 +12,10 @@ public interface ShardStatsMXBean { String getShardName(); + String getStatRetrievalTime(); + + String getStatRetrievalError(); + long getCommittedTransactionsCount(); long getReadOnlyTransactionCount(); @@ -30,6 +34,16 @@ public interface ShardStatsMXBean { long getLastApplied(); + long getLastIndex(); + + long getLastTerm(); + + long getSnapshotIndex(); + + long getSnapshotTerm(); + + long getReplicatedToAllIndex(); + String getLastCommittedTransactionTime(); long getFailedTransactionsCount(); @@ -42,6 +56,10 @@ public interface ShardStatsMXBean { String getRaftState(); + String getVotedFor(); + + boolean isSnapshotCaptureInitiated(); + ThreadExecutorStats getDataStoreExecutorStats(); ThreadExecutorStats getNotificationMgrExecutorStats(); @@ -54,5 +72,11 @@ public interface ShardStatsMXBean { long getInMemoryJournalDataSize(); + long getInMemoryJournalLogSize(); + boolean getFollowerInitialSyncStatus(); + + List getFollowerInfo(); + + String getPeerAddresses(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java index 670641f6ac..a9ce94b033 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; /** @@ -17,15 +21,61 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi public class BatchedModifications extends MutableCompositeModification implements SerializableMessage { private static final long serialVersionUID = 1L; + private boolean ready; + private String transactionID; + private String transactionChainID; + public BatchedModifications() { } - public BatchedModifications(short version) { + public BatchedModifications(String transactionID, short version, String transactionChainID) { super(version); + this.transactionID = Preconditions.checkNotNull(transactionID, "transactionID can't be null"); + this.transactionChainID = transactionChainID != null ? transactionChainID : ""; + } + + public boolean isReady() { + return ready; + } + + public void setReady(boolean ready) { + this.ready = ready; + } + + public String getTransactionID() { + return transactionID; + } + + public String getTransactionChainID() { + return transactionChainID; + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + transactionID = in.readUTF(); + transactionChainID = in.readUTF(); + ready = in.readBoolean(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + out.writeUTF(transactionID); + out.writeUTF(transactionChainID); + out.writeBoolean(ready); } @Override public Object toSerializable() { return this; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready) + .append(", modifications size=").append(getModifications().size()).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java index 33c5733fdb..a10c6ac3fb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java @@ -19,7 +19,11 @@ import java.io.ObjectOutput; public class BatchedModificationsReply extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; + private static final byte COHORT_PATH_NOT_PRESENT = 0; + private static final byte COHORT_PATH_PRESENT = 1; + private int numBatched; + private String cohortPath; public BatchedModificationsReply() { } @@ -28,25 +32,52 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage { this.numBatched = numBatched; } + public BatchedModificationsReply(int numBatched, String cohortPath) { + this.numBatched = numBatched; + this.cohortPath = cohortPath; + } public int getNumBatched() { return numBatched; } + public String getCohortPath() { + return cohortPath; + } + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); numBatched = in.readInt(); + + if(in.readByte() == COHORT_PATH_PRESENT) { + cohortPath = in.readUTF(); + } } @Override public void writeExternal(ObjectOutput out) throws IOException { super.writeExternal(out); out.writeInt(numBatched); + + if(cohortPath != null) { + out.writeByte(COHORT_PATH_PRESENT); + out.writeUTF(cohortPath); + } else { + out.writeByte(COHORT_PATH_NOT_PRESENT); + } } @Override public Object toSerializable() { return this; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=") + .append(cohortPath).append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java index a34330bcf6..d51d6800a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java @@ -18,22 +18,22 @@ public class FindPrimary implements SerializableMessage{ public static final Class SERIALIZABLE_CLASS = FindPrimary.class; private final String shardName; - private final boolean waitUntilInitialized; + private final boolean waitUntilReady; - public FindPrimary(String shardName, boolean waitUntilInitialized){ + public FindPrimary(String shardName, boolean waitUntilReady){ Preconditions.checkNotNull(shardName, "shardName should not be null"); this.shardName = shardName; - this.waitUntilInitialized = waitUntilInitialized; + this.waitUntilReady = waitUntilReady; } public String getShardName() { return shardName; } - public boolean isWaitUntilInitialized() { - return waitUntilInitialized; + public boolean isWaitUntilReady() { + return waitUntilReady; } @Override @@ -44,4 +44,12 @@ public class FindPrimary implements SerializableMessage{ public static FindPrimary fromSerializable(Object message){ return (FindPrimary) message; } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("FindPrimary [shardName=").append(shardName).append(", waitUntilReady=").append(waitUntilReady) + .append("]"); + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java index 346519ed5a..82f3649939 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java @@ -8,18 +8,17 @@ package org.opendaylight.controller.cluster.datastore.messages; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; public class PeerAddressResolved { - private final ShardIdentifier peerId; + private final String peerId; private final String peerAddress; - public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) { + public PeerAddressResolved(String peerId, String peerAddress) { this.peerId = peerId; this.peerAddress = peerAddress; } - public ShardIdentifier getPeerId() { + public String getPeerId() { return peerId; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java index 2a660fa4b2..b34737be54 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/VersionedExternalizableMessage.java @@ -11,6 +11,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; /** * Abstract base class for a versioned Externalizable message. @@ -20,7 +21,7 @@ import java.io.ObjectOutput; public abstract class VersionedExternalizableMessage implements Externalizable, SerializableMessage { private static final long serialVersionUID = 1L; - private short version; + private short version = DataStoreVersions.CURRENT_VERSION; public VersionedExternalizableMessage() { } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 0fb09d8231..6f9bb7fc9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -17,6 +17,7 @@ import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Futures; import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.codahale.metrics.JmxReporter; @@ -35,6 +36,7 @@ import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; @@ -98,8 +100,9 @@ public class ActorContext { private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build(); private final int transactionOutstandingOperationLimit; private Timeout transactionCommitOperationTimeout; + private Timeout shardInitializationTimeout; private final Dispatchers dispatchers; - private final Cache> primaryShardActorSelectionCache; + private Cache> primaryShardActorSelectionCache; private volatile SchemaContext schemaContext; private volatile boolean updated; @@ -121,14 +124,6 @@ public class ActorContext { this.dispatchers = new Dispatchers(actorSystem.dispatchers()); setCachedProperties(); - primaryShardActorSelectionCache = CacheBuilder.newBuilder() - .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) - .build(); - - operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); - operationTimeout = new Timeout(operationDuration); - transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(), - TimeUnit.SECONDS)); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { @@ -150,6 +145,12 @@ public class ActorContext { transactionCommitOperationTimeout = new Timeout(Duration.create( datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS)); + + shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2)); + + primaryShardActorSelectionCache = CacheBuilder.newBuilder() + .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS) + .build(); } public DatastoreContext getDatastoreContext() { @@ -202,28 +203,13 @@ public class ActorContext { return schemaContext; } - /** - * Finds the primary shard for the given shard name - * - * @param shardName - * @return - */ - public Optional findPrimaryShard(String shardName) { - String path = findPrimaryPathOrNull(shardName); - if (path == null){ - return Optional.absent(); - } - return Optional.of(actorSystem.actorSelection(path)); - } - public Future findPrimaryShardAsync(final String shardName) { Future ret = primaryShardActorSelectionCache.getIfPresent(shardName); if(ret != null){ return ret; } Future future = executeOperationAsync(shardManager, - new FindPrimary(shardName, true).toSerializable(), - datastoreContext.getShardInitializationTimeout()); + new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout); return future.transform(new Mapper() { @Override @@ -242,6 +228,8 @@ public class ActorContext { } else if(response instanceof PrimaryNotFound) { throw new PrimaryNotFoundException( String.format("No primary shard found for %S.", shardName)); + } else if(response instanceof NoShardLeaderException) { + throw (NoShardLeaderException)response; } throw new UnknownMessageException(String.format( @@ -277,7 +265,7 @@ public class ActorContext { */ public Future findLocalShardAsync( final String shardName) { Future future = executeOperationAsync(shardManager, - new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout()); + new FindLocalShard(shardName, true), shardInitializationTimeout); return future.map(new Mapper() { @Override @@ -301,26 +289,6 @@ public class ActorContext { }, getClientDispatcher()); } - private String findPrimaryPathOrNull(String shardName) { - Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable()); - - if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound found = PrimaryFound.fromSerializable(result); - - LOG.debug("Primary found {}", found.getPrimaryPath()); - return found.getPrimaryPath(); - - } else if (result.getClass().equals(ActorNotInitialized.class)){ - throw new NotInitializedException( - String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName) - ); - - } else { - return null; - } - } - - /** * Executes an operation on a local actor and wait for it's response * @@ -428,16 +396,21 @@ public class ActorContext { * * @param message */ - public void broadcast(Object message){ - for(String shardName : configuration.getAllShardNames()){ - - Optional primary = findPrimaryShard(shardName); - if (primary.isPresent()) { - primary.get().tell(message, ActorRef.noSender()); - } else { - LOG.warn("broadcast failed to send message {} to shard {}. Primary not found", - message.getClass().getSimpleName(), shardName); - } + public void broadcast(final Object message){ + for(final String shardName : configuration.getAllShardNames()){ + + Future primaryFuture = findPrimaryShardAsync(shardName); + primaryFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorSelection primaryShard) { + if(failure != null) { + LOG.warn("broadcast failed to send message {} to shard {}: {}", + message.getClass().getSimpleName(), shardName, failure); + } else { + primaryShard.tell(message, ActorRef.noSender()); + } + } + }, getClientDispatcher()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 3ac61f2371..378bc717f4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -86,7 +87,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } protected Props newShardProps() { - return Shard.props(shardID, Collections.emptyMap(), + return Shard.props(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -101,7 +102,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override protected void onRecoveryComplete() { @@ -153,16 +154,17 @@ public abstract class AbstractShardTest extends AbstractActorTest{ shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - protected void verifyLastLogIndex(TestActorRef shard, long expectedValue) { + protected void verifyLastApplied(TestActorRef shard, long expectedValue) { + long lastApplied = -1; for(int i = 0; i < 20 * 5; i++) { - long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex(); - if(lastLogIndex == expectedValue) { - break; + lastApplied = shard.underlyingActor().getShardMBean().getLastApplied(); + if(lastApplied == expectedValue) { + return; } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex()); + Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied)); } protected NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index 4896b059c7..c6c5486ee3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -94,8 +94,7 @@ public abstract class AbstractTransactionProxyTest { protected final String memberName = "mock-member"; - protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). - shardBatchedModificationCount(1); + protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2); @BeforeClass public static void setUpClass() throws IOException { @@ -251,6 +250,13 @@ public abstract class AbstractTransactionProxyTest { eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } + protected void expectBatchedModificationsReady(ActorRef actorRef, int count) { + Future replyFuture = Futures.successful( + new BatchedModificationsReply(count, actorRef.path().toString())); + doReturn(replyFuture).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } + protected void expectBatchedModifications(int count) { doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), isA(BatchedModifications.class)); @@ -307,15 +313,21 @@ public abstract class AbstractTransactionProxyTest { protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) { - ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - log.info("Created mock shard Tx actor {}", txActorRef); + ActorRef txActorRef; + if(type == TransactionType.WRITE_ONLY && transactionVersion >= DataStoreVersions.LITHIUM_VERSION && + dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) { + txActorRef = shardActorRef; + } else { + txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + log.info("Created mock shard Tx actor {}", txActorRef); - doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection( - txActorRef.path().toString()); + doReturn(actorSystem.actorSelection(txActorRef.path())). + when(mockActorContext).actorSelection(txActorRef.path().toString()); - doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(prefix, type)); + doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(prefix, type)); + } return txActorRef; } @@ -358,17 +370,18 @@ public abstract class AbstractTransactionProxyTest { return captured; } - protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { + protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), expected); + verifyBatchedModifications(batchedModifications.get(0), expIsReady, expected); } - protected void verifyBatchedModifications(Object message, Modification... expected) { + protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) { assertEquals("Message type", BatchedModifications.class, message.getClass()); BatchedModifications batchedModifications = (BatchedModifications)message; assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); + assertEquals("isReady", expIsReady, batchedModifications.isReady()); for(int i = 0; i < batchedModifications.getModifications().size(); i++) { Modification actual = batchedModifications.getModifications().get(i); assertEquals("Modification type", expected[i].getClass(), actual.getClass()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 54a9e2dd94..fdc7e664c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -147,8 +147,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test - public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ + private void testTransactionWritesWithShardNotInitiallyReady(final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionWritesWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -163,7 +162,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modification operations and ready the Tx on a separate thread. @@ -239,7 +239,18 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } @Test - public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ + public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionWritesWithShardNotInitiallyReady(true); + } + + @Test + public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception { + testTransactionWritesWithShardNotInitiallyReady(false); + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionReadsWithShardNotInitiallyReady"; String shardName = "test-1"; @@ -454,8 +465,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } - @Test(expected=NoShardLeaderException.class) - public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { new IntegrationTestKit(getSystem()) {{ String testName = "testTransactionCommitFailureWithNoShardLeader"; String shardName = "test-1"; @@ -464,6 +474,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // by setting the election timeout, which is based on the heartbeat interval, really high. datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); // Set the leader election timeout low for the test. @@ -473,7 +484,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Create the write Tx. - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate thread. @@ -522,6 +534,17 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test(expected=NoShardLeaderException.class) + public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { + datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testTransactionCommitFailureWithNoShardLeader(true); + } + + @Test(expected=NoShardLeaderException.class) + public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { + testTransactionCommitFailureWithNoShardLeader(false); + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java index 4e61260550..1ab03b216c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RoleChangeNotifierTest.java @@ -1,21 +1,22 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.RaftState; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; public class RoleChangeNotifierTest extends AbstractActorTest { @@ -51,8 +52,6 @@ public class RoleChangeNotifierTest extends AbstractActorTest { TestActorRef notifierTestActorRef = TestActorRef.create( getSystem(), RoleChangeNotifier.getProps(memberId), memberId); - RoleChangeNotifier roleChangeNotifier = notifierTestActorRef.underlyingActor(); - notifierTestActorRef.tell(new RoleChanged(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), shardActor); // no notification should be sent as listener has not yet registered @@ -74,6 +73,32 @@ public class RoleChangeNotifierTest extends AbstractActorTest { }}; } + + @Test + public void testHandleLeaderStateChanged() throws Exception { + new JavaTestKit(getSystem()) {{ + String actorId = "testHandleLeaderStateChanged"; + TestActorRef notifierTestActorRef = TestActorRef.create( + getSystem(), RoleChangeNotifier.getProps(actorId), actorId); + + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader1"), ActorRef.noSender()); + + // listener registers after the sate has been changed, ensure we sent the latest state change after a reply + notifierTestActorRef.tell(new RegisterRoleChangeListener(), getRef()); + + expectMsgClass(RegisterRoleChangeListenerReply.class); + + LeaderStateChanged leaderStateChanged = expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader1", leaderStateChanged.getLeaderId()); + + notifierTestActorRef.tell(new LeaderStateChanged("member1", "leader2"), ActorRef.noSender()); + + leaderStateChanged = expectMsgClass(LeaderStateChanged.class); + assertEquals("getMemberId", "member1", leaderStateChanged.getMemberId()); + assertEquals("getLeaderId", "leader2", leaderStateChanged.getLeaderId()); + }}; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 99417076bf..ae7a4f96c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; @@ -44,9 +45,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; @@ -56,6 +59,7 @@ import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; @@ -66,7 +70,10 @@ public class ShardManagerTest extends AbstractActorTest { @Mock private static CountDownLatch ready; - private static ActorRef mockShardActor; + private static TestActorRef mockShardActor; + + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); @Before public void setUp() { @@ -75,9 +82,11 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); - mockShardActor = getSystem().actorOf(Props.create(DoNothingActor.class), name); + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); } + + mockShardActor.underlyingActor().clear(); } @After @@ -86,44 +95,93 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - DatastoreContext.Builder builder = DatastoreContext.newBuilder(); - builder.dataStoreType(shardMrgIDSuffix); return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - builder.build(), ready); + datastoreContextBuilder.build(), ready); + } + + private Props newPropsShardMgrWithMockShardActor() { + Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), + datastoreContextBuilder.build(), ready) { + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return mockShardActor; + } + }; + } + }; + + return Props.create(new DelegatingShardManagerCreator(creator)); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); - expectMsgEquals(duration("5 seconds"), - new PrimaryNotFound("non-existent").toSerializable()); + expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); }}; } @Test - public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + + MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; } @Test - public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception { + public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-2-shard-default")); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); @@ -132,28 +190,129 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception { + public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); // We're passing waitUntilInitialized = true to FindPrimary so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + // delayed until we send ActorInitialized and RoleChangeNotification. + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); shardManager.tell(new ActorInitialized(), mockShardActor); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound); + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + + shardManager.tell(new ActorInitialized(), mockShardActor); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.Candidate.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -168,7 +327,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -185,7 +344,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); @@ -196,7 +355,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -439,14 +598,11 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, times(1)).countDown(); @@ -457,14 +613,10 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), persistentProps); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); verify(ready, never()).countDown(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index a787ab46f2..3e0bc42397 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Props; import akka.dispatch.Dispatchers; @@ -35,13 +36,15 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.DataPersistenceProvider; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; @@ -49,10 +52,13 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; @@ -82,11 +88,13 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -96,6 +104,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + @Test public void testRegisterChangeListener() throws Exception { new ShardTestKit(getSystem()) {{ @@ -149,7 +158,7 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override public void onReceiveCommand(final Object message) throws Exception { @@ -277,7 +286,7 @@ public class ShardTest extends AbstractShardTest { final CountDownLatch recoveryComplete = new CountDownLatch(1); class TestShard extends Shard { TestShard() { - super(shardID, Collections.singletonMap(shardID, null), + super(shardID, Collections.singletonMap(shardID.toString(), null), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -308,7 +317,7 @@ public class ShardTest extends AbstractShardTest { Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); String address = "akka://foobar"; - shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address)); + shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address)); assertEquals("getPeerAddresses", address, ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString())); @@ -422,42 +431,42 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + final String transactionID1 = "tx1"; + final String transactionID2 = "tx2"; + final String transactionID3 = "tx3"; - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); + final AtomicReference mockCohort1 = new AtomicReference<>(); + final AtomicReference mockCohort2 = new AtomicReference<>(); + final AtomicReference mockCohort3 = new AtomicReference<>(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) { + if(transactionID.equals(transactionID1)) { + mockCohort1.set(createDelegatingMockCohort("cohort1", actual)); + return mockCohort1.get(); + } else if(transactionID.equals(transactionID2)) { + mockCohort2.set(createDelegatingMockCohort("cohort2", actual)); + return mockCohort2.get(); + } else { + mockCohort3.set(createDelegatingMockCohort("cohort3", actual)); + return mockCohort3.get(); + } + } + }; - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification3); + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent - // by the ShardTransaction. + // Send a BatchedModifications message for the first transaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); - assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class); + assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath()); + assertEquals("getNumBatched", 1, batchedReply.getNumBatched()); // Send the CanCommitTransaction message for the first Tx. @@ -466,15 +475,16 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send the ForwardedReadyTransaction for the next 2 Tx's. + // Send BatchedModifications for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and // processed after the first Tx completes. @@ -567,16 +577,16 @@ public class ShardTest extends AbstractShardTest { assertEquals("Commits complete", true, done); - InOrder inOrder = inOrder(cohort1, cohort2, cohort3); - inOrder.verify(cohort1).canCommit(); - inOrder.verify(cohort1).preCommit(); - inOrder.verify(cohort1).commit(); - inOrder.verify(cohort2).canCommit(); - inOrder.verify(cohort2).preCommit(); - inOrder.verify(cohort2).commit(); - inOrder.verify(cohort3).canCommit(); - inOrder.verify(cohort3).preCommit(); - inOrder.verify(cohort3).commit(); + InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get()); + inOrder.verify(mockCohort1.get()).canCommit(); + inOrder.verify(mockCohort1.get()).preCommit(); + inOrder.verify(mockCohort1.get()).commit(); + inOrder.verify(mockCohort2.get()).canCommit(); + inOrder.verify(mockCohort2.get()).preCommit(); + inOrder.verify(mockCohort2.get()).commit(); + inOrder.verify(mockCohort3.get()).canCommit(); + inOrder.verify(mockCohort3.get()).preCommit(); + inOrder.verify(mockCohort3.get()).commit(); // Verify data in the data store. @@ -593,40 +603,68 @@ public class ShardTest extends AbstractShardTest { assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - verifyLastLogIndex(shard, 2); + verifyLastApplied(shard, 2); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } + private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + NormalizedNode data, boolean ready) { + return newBatchedModifications(transactionID, null, path, data, ready); + } + + private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + YangInstanceIdentifier path, NormalizedNode data, boolean ready) { + BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); + batched.addModification(new WriteModification(path, data)); + batched.setReady(ready); + return batched; + } + + @SuppressWarnings("unchecked") @Test - public void testCommitWithPersistenceDisabled() throws Throwable { - dataStoreContextBuilder.persistent(false); + public void testMultipleBatchedModifications() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testCommitPhaseFailure"); + "testMultipleBatchedModifications"); waitUntilLeader(shard); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + final String transactionID = "tx"; + FiniteDuration duration = duration("5 seconds"); - // Setup a simulated transactions with a mock cohort. + final AtomicReference mockCohort = new AtomicReference<>(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + if(mockCohort.get() == null) { + mockCohort.set(createDelegatingMockCohort("cohort", actual)); + } - String transactionID = "tx"; - MutableCompositeModification modification = new MutableCompositeModification(); - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, - TestModel.TEST_PATH, containerNode, modification); + return mockCohort.get(); + } + }; - FiniteDuration duration = duration("5 seconds"); + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + + // Send a BatchedModifications to start a transaction. - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + // Send a couple more BatchedModifications. + + shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message. @@ -640,10 +678,153 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); - InOrder inOrder = inOrder(cohort); - inOrder.verify(cohort).canCommit(); - inOrder.verify(cohort).preCommit(); - inOrder.verify(cohort).commit(); + InOrder inOrder = inOrder(mockCohort.get()); + inOrder.verify(mockCohort.get()).canCommit(); + inOrder.verify(mockCohort.get()).preCommit(); + inOrder.verify(mockCohort.get()).commit(); + + // Verify data in the data store. + + NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); + assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", + outerList.getValue() instanceof Iterable); + Object entry = ((Iterable)outerList.getValue()).iterator().next(); + assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", + entry instanceof MapEntryNode); + MapEntryNode mapEntry = (MapEntryNode)entry; + Optional> idLeaf = + mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); + assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); + assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testBatchedModificationsOnTransactionChain() throws Throwable { + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testBatchedModificationsOnTransactionChain"); + + waitUntilLeader(shard); + + String transactionChainID = "txChain"; + String transactionID1 = "tx1"; + String transactionID2 = "tx2"; + + FiniteDuration duration = duration("5 seconds"); + + // Send a BatchedModifications to start a chained write transaction and ready it. + + ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + YangInstanceIdentifier path = TestModel.TEST_PATH; + shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, + containerNode, true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Create a read Tx on the same chain. + + shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() , + transactionChainID).toSerializable(), getRef()); + + CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); + + getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef()); + ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class); + assertEquals("Read node", containerNode, readReply.getNormalizedNode()); + + // Commit the write transaction. + + shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + // Verify data in the data store. + + NormalizedNode actualNode = readStore(shard, path); + assertEquals("Stored node", containerNode, actualNode); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testOnBatchedModificationsWhenNotLeader() { + final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); + new ShardTestKit(getSystem()) {{ + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new Shard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT) { + @Override + protected boolean isLeader() { + return overrideLeaderCalls.get() ? false : super.isLeader(); + } + + @Override + protected ActorSelection getLeader() { + return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) : + super.getLeader(); + } + }; + } + }; + + TestActorRef shard = TestActorRef.create(getSystem(), + Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader"); + + waitUntilLeader(shard); + + overrideLeaderCalls.set(true); + + BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""); + + shard.tell(batched, ActorRef.noSender()); + + expectMsgEquals(batched); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testCommitWithPersistenceDisabled() throws Throwable { + dataStoreContextBuilder.persistent(false); + new ShardTestKit(getSystem()) {{ + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWithPersistenceDisabled"); + + waitUntilLeader(shard); + + String transactionID = "tx"; + FiniteDuration duration = duration("5 seconds"); + + // Send a BatchedModifications to start a transaction. + + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + // Send the CanCommitTransaction message. + + shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); @@ -652,6 +833,117 @@ public class ShardTest extends AbstractShardTest { }}; } + @Test + public void testCommitWhenTransactionHasNoModifications(){ + // Note that persistence is enabled which would normally result in the entry getting written to the journal + // but here that need not happen + new ShardTestKit(getSystem()) { + { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWhenTransactionHasNoModifications"); + + waitUntilLeader(shard); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + + FiniteDuration duration = duration("5 seconds"); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + + // Use MBean for verification + // Committed transaction count should increase as usual + assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + + // Commit index should not advance because this does not go into the journal + assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + } + }; + } + + @Test + public void testCommitWhenTransactionHasModifications(){ + new ShardTestKit(getSystem()) { + { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testCommitWhenTransactionHasModifications"); + + waitUntilLeader(shard); + + String transactionID = "tx1"; + MutableCompositeModification modification = new MutableCompositeModification(); + modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); + doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + + FiniteDuration duration = duration("5 seconds"); + + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + + // Send the CanCommitTransaction message. + + shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + assertEquals("Can commit", true, canCommitReply.getCanCommit()); + + shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); + expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + + // Use MBean for verification + // Committed transaction count should increase as usual + assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + + // Commit index should advance as we do not have an empty modification + assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + } + }; + } + @Test public void testCommitPhaseFailure() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -661,34 +953,40 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 simulated transactions with mock cohorts. The first one fails in the - // commit phase. + // Setup 2 mock cohorts. The first one fails in the commit phase. - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final String transactionID1 = "tx1"; + final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + final String transactionID2 = "tx2"; + final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return transactionID1.equals(transactionID) ? cohort1 : cohort2; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + // Send BatchedModifications to start and ready each transaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -741,19 +1039,27 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); String transactionID = "tx1"; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return cohort; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + FiniteDuration duration = duration("5 seconds"); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + // Send BatchedModifications to start and ready a transaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message. @@ -788,16 +1094,24 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); String transactionID = "tx1"; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return cohort; + } + }; + + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + + // Send BatchedModifications to start and ready a transaction. - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message. @@ -841,14 +1155,9 @@ public class ShardTest extends AbstractShardTest { } }; - MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), - modification, preCommit); - - shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, - cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( @@ -882,42 +1191,26 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Create 1st Tx - will timeout + // Create and ready the 1st Tx - will timeout String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), - modification1); + shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - // Create 2nd Tx + // Create and ready the 2nd Tx - String transactionID2 = "tx3"; - MutableCompositeModification modification2 = new MutableCompositeModification(); + String transactionID2 = "tx2"; YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, - listNodePath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), - modification2); - - // Ready the Tx's - - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); + shard.tell(newBatchedModifications(transactionID2, listNodePath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -954,38 +1247,23 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); - InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, - TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), - modification2); - String transactionID3 = "tx3"; - MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, - TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); - // Ready the Tx's + // Send a BatchedModifications to start transactions and ready them. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, - cohort3, modification3, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // canCommit 1st Tx. @@ -1030,30 +1308,37 @@ public class ShardTest extends AbstractShardTest { // Setup 2 simulated transactions with mock cohorts. The first one will be aborted. - String transactionID1 = "tx1"; - MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + final String transactionID1 = "tx1"; + final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); - String transactionID2 = "tx2"; - MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + final String transactionID2 = "tx2"; + final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Simulate the ForwardedReadyTransaction messages that would be sent - // by the ShardTransaction. + ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { + @Override + public DOMStoreThreePhaseCommitCohort decorate(String transactionID, + DOMStoreThreePhaseCommitCohort actual) { + return transactionID1.equals(transactionID) ? cohort1 : cohort2; + } + }; - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, - cohort1, modification1, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, - cohort2, modification2, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + // Send BatchedModifications to start and ready each transaction. + + shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); + + shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); + expectMsgClass(duration, BatchedModificationsReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1141,7 +1426,7 @@ public class ShardTest extends AbstractShardTest { Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { DelegatingPersistentDataProvider delegating; @@ -1242,13 +1527,13 @@ public class ShardTest extends AbstractShardTest { final DatastoreContext persistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); - final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), persistentContext, SCHEMA_CONTEXT); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); - final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), nonPersistentContext, SCHEMA_CONTEXT); new ShardTestKit(getSystem()) {{ @@ -1267,6 +1552,7 @@ public class ShardTest extends AbstractShardTest { shard2.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; + } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 09a4532b53..c3fef611e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -60,7 +60,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { } private ActorRef createShard(){ - return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, + return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index c6b5cb4402..e63ace3e2c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -79,7 +79,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef createShard(){ return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, - Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); + Collections.emptyMap(), datastoreContext, TestModel.createTestContext())); } private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) { @@ -372,7 +372,7 @@ public class ShardTransactionTest extends AbstractActorTest { YangInstanceIdentifier deletePath = TestModel.TEST_PATH; - BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); batched.addModification(new WriteModification(writePath, writeData)); batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 4f00ed5f4b..acba775445 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -29,8 +29,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; @@ -111,6 +114,74 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { verify(mockActorContext, times(0)).acquireTxCreationPermit(); } + /** + * Tests 2 successive chained write-only transactions and verifies the second transaction isn't + * initiated until the first one completes its read future. + */ + @Test + public void testChainedWriteOnlyTransactions() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem()); + + Promise batchedReplyPromise1 = akka.dispatch.Futures.promise(); + doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(txActorRef1)), isA(BatchedModifications.class)); + + DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction(); + + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx1.write(TestModel.TEST_PATH, writeNode1); + + writeTx1.ready(); + + verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true); + + ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem()); + + expectBatchedModifications(txActorRef2, 1); + + final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction(); + + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch write2Complete = new CountDownLatch(1); + new Thread() { + @Override + public void run() { + try { + writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2); + } catch (Exception e) { + caughtEx.set(e); + } finally { + write2Complete.countDown(); + } + } + }.start(); + + assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS)); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + try { + verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } catch (AssertionError e) { + fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); + } + + batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString())); + + // Tx 2 should've proceeded to find the primary shard. + verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } + /** * Tests 2 successive chained read-write transactions and verifies the second transaction isn't * initiated until the first one completes its read future. @@ -134,7 +205,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { writeTx1.ready(); - verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1)); + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false); String tx2MemberName = "tx2MemberName"; doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8278d3cffc..265ec59f1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -31,6 +32,9 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -384,24 +388,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWrite() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); } @Test @@ -456,7 +454,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { // This sends the batched modification. transactionProxy.ready(); - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), BatchedModificationsReply.class); @@ -479,48 +477,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMerge() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false); } @Test public void testDelete() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - // This sends the batched modification. - transactionProxy.ready(); - - verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH)); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false); } @Test - public void testReady() throws Exception { + public void testReadyWithReadWrite() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -550,18 +536,91 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + + verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + @Test + public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + expectBatchedModificationsReady(actorRef, 1); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures()); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), true, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + @Test + public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + expectBatchedModificationsReady(actorRef, 1); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + verifyBatchedModifications(batchedModifications.get(1), true); + + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test public void testReadyWithRecordingOperationFailure() throws Exception { + dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); expectFailedBatchedModifications(actorRef); - expectReadyTransaction(actorRef); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -581,15 +640,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithReplyFailure() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModifications(actorRef, 1); - - doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + expectFailedBatchedModifications(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -601,17 +658,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - verifyCohortFutures(proxy, TestException.class); } - @Test - public void testReadyWithInitialCreateTransactionFailure() throws Exception { - - doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when( - mockActorContext).findPrimaryShardAsync(anyString()); + private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { + doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -629,20 +680,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, PrimaryNotFoundException.class); + verifyCohortFutures(proxy, toThrow.getClass()); + } + + @Test + public void testWriteOnlyTxWithPrimaryNotFoundException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock")); + } + + @Test + public void testWriteOnlyTxWithNotInitializedException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock")); + } + + @Test + public void testWriteOnlyTxWithNoShardLeaderException() throws Exception { + testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock")); } @Test public void testReadyWithInvalidReplyMessageType() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModifications(actorRef, 1); + //expectBatchedModifications(actorRef, 1); doReturn(Futures.successful(new Object())).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + isA(BatchedModifications.class)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -657,17 +724,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures(proxy, IllegalArgumentException.class); } - @Test - public void testUnusedTransaction() throws Exception { - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertEquals("canCommit", true, ready.canCommit().get()); - ready.preCommit().get(); - ready.commit().get(); - } - @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); @@ -711,24 +767,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { */ @Test public void testLocalTxActorRead() throws Exception { - ActorSystem actorSystem = getSystem(); - ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - - doReturn(actorSystem.actorSelection(shardActorRef.path())). - when(mockActorContext).actorSelection(shardActorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() - .setTransactionId("txn-1").setTransactionActorPath(actorPath).build(); - - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_ONLY)); - - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); @@ -764,40 +804,20 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testLocalTxActorReady() throws Exception { - ActorSystem actorSystem = getSystem(); - ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - - doReturn(actorSystem.actorSelection(shardActorRef.path())). - when(mockActorContext).actorSelection(shardActorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). - setTransactionId("txn-1").setTransactionActorPath(actorPath). - setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); - - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, WRITE_ONLY)); - - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( any(ActorSelection.class), isA(BatchedModifications.class)); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - // testing ready - doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(ReadyTransaction.class)); + doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.class)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); @@ -805,7 +825,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } private static interface TransactionProxyOperation { @@ -875,20 +895,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { - doReturn(Futures.failed(new Exception("not found"))) + doReturn(Futures.failed(new PrimaryNotFoundException("test"))) .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } - String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + String actorPath = txActorRef.path().toString(); CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). setTransactionId("txn-1").setTransactionActorPath(actorPath). setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); + doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath); + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE)); - doReturn(true).when(mockActorContext).isPathLocal(actorPath); + doReturn(true).when(mockActorContext).isPathLocal(anyString()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -921,6 +944,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardFound(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -938,6 +962,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardNotFound(){ // Confirm that there is no throttling when the Shard is not found + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -956,6 +981,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -972,7 +998,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardFound(){ - + dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -989,7 +1015,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardNotFound(){ - + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1006,6 +1032,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1054,6 +1081,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletion(){ + dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1210,13 +1238,11 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { }, 2, true); } - @Test - public void testModificationOperationBatching() throws Throwable { + private void testModificationOperationBatching(TransactionType type) throws Exception { int shardBatchedModificationCount = 3; - doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). - when(mockActorContext).getDatastoreContext(); + dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type); expectBatchedModifications(actorRef, shardBatchedModificationCount); @@ -1243,7 +1269,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH; YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH; - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type); transactionProxy.write(writePath1, writeNode1); transactionProxy.write(writePath2, writeNode2); @@ -1260,24 +1286,46 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1), new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1)); - verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - verifyBatchedModifications(batchedModifications.get(2), new MergeModification(mergePath3, mergeNode3), + boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled(); + verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + if(optimizedWriteOnly) { + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class); + } else { + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); + } + } + + @Test + public void testReadWriteModificationOperationBatching() throws Throwable { + testModificationOperationBatching(READ_WRITE); + } + + @Test + public void testWriteOnlyModificationOperationBatching() throws Throwable { + testModificationOperationBatching(WRITE_ONLY); + } + + @Test + public void testOptimizedWriteOnlyModificationOperationBatching() throws Throwable { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + testModificationOperationBatching(WRITE_ONLY); } @Test public void testModificationOperationBatchingWithInterleavedReads() throws Throwable { + int shardBatchedModificationCount = 10; - doReturn(dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount).build()). - when(mockActorContext).getDatastoreContext(); + dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount); ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); @@ -1333,13 +1381,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 3, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), new WriteModification(writePath1, writeNode1), + verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1), new WriteModification(writePath2, writeNode2)); - verifyBatchedModifications(batchedModifications.get(1), new MergeModification(mergePath1, mergeNode1), + verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2)); - verifyBatchedModifications(batchedModifications.get(2), new DeleteModification(deletePath)); + verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath)); InOrder inOrder = Mockito.inOrder(mockActorContext); inOrder.verify(mockActorContext).executeOperationAsync( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 471009d4df..cc860eafc7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -211,7 +211,7 @@ public class PreLithiumShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testConcurrentThreePhaseCommits"); + "testPreLithiumConcurrentThreePhaseCommits"); waitUntilLeader(shard); @@ -386,7 +386,7 @@ public class PreLithiumShardTest extends AbstractShardTest { assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - verifyLastLogIndex(shard, 2); + verifyLastApplied(shard, 2); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 08c32c9a54..2980f83564 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -15,10 +15,12 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; import akka.dispatch.Futures; import com.google.common.base.Optional; import java.util.concurrent.TimeUnit; +import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -154,4 +156,36 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), eq(actorRef.path().toString())); } + + @Test + @Ignore + // FIXME: disabled until we can get the primary shard version from the ShardManager as we now skip + // creating transaction actors for write-only Tx's. + public void testWriteOnlyCompatibilityWithHeliumR2Version() throws Exception { + short version = DataStoreVersions.HELIUM_2_VERSION; + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, version); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, testNode); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java index 2a29d2c089..e206e69cda 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -59,7 +59,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc SchemaContext schemaContext = TestModel.createTestContext(); Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). shardName("inventory").type("config").build(), - Collections.emptyMap(), + Collections.emptyMap(), DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); @@ -133,7 +133,7 @@ public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractAc SchemaContext schemaContext = TestModel.createTestContext(); Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). shardName("inventory").type("config").build(), - Collections.emptyMap(), + Collections.emptyMap(), DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java index 15d2eea598..c4027ad2a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -41,15 +41,19 @@ public class BatchedModificationsTest { YangInstanceIdentifier deletePath = TestModel.TEST_PATH; - BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION); + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, "txChain"); batched.addModification(new WriteModification(writePath, writeData)); batched.addModification(new MergeModification(mergePath, mergeData)); batched.addModification(new DeleteModification(deletePath)); + batched.setReady(true); BatchedModifications clone = (BatchedModifications) SerializationUtils.clone( (Serializable) batched.toSerializable()); assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, clone.getVersion()); + assertEquals("getTransactionID", "tx1", clone.getTransactionID()); + assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID()); + assertEquals("isReady", true, clone.isReady()); assertEquals("getModifications size", 3, clone.getModifications().size()); @@ -66,6 +70,20 @@ public class BatchedModificationsTest { DeleteModification delete = (DeleteModification)clone.getModifications().get(2); assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion()); assertEquals("getPath", deletePath, delete.getPath()); + + // Test with different params. + + batched = new BatchedModifications("tx2", (short)10, null); + + clone = (BatchedModifications) SerializationUtils.clone((Serializable) batched.toSerializable()); + + assertEquals("getVersion", 10, clone.getVersion()); + assertEquals("getTransactionID", "tx2", clone.getTransactionID()); + assertEquals("getTransactionChainID", "", clone.getTransactionChainID()); + assertEquals("isReady", false, clone.isReady()); + + assertEquals("getModifications size", 0, clone.getModifications().size()); + } @Test @@ -73,5 +91,11 @@ public class BatchedModificationsTest { BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone( (Serializable) new BatchedModificationsReply(100).toSerializable()); assertEquals("getNumBatched", 100, clone.getNumBatched()); + assertEquals("getCohortPath", null, clone.getCohortPath()); + + clone = (BatchedModificationsReply) SerializationUtils.clone( + (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable()); + assertEquals("getNumBatched", 50, clone.getNumBatched()); + assertEquals("getCohortPath", "cohort path", clone.getCohortPath()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6bd732e038..2746bcf982 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -20,8 +20,12 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; @@ -30,14 +34,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -45,10 +53,16 @@ import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ + static final Logger log = LoggerFactory.getLogger(ActorContextTest.class); + + private static class TestMessage { + } + private static class MockShardManager extends UntypedActor { private final boolean found; private final ActorRef actorRef; + private final Map findPrimaryResponses = Maps.newHashMap(); private MockShardManager(boolean found, ActorRef actorRef){ @@ -57,6 +71,18 @@ public class ActorContextTest extends AbstractActorTest{ } @Override public void onReceive(Object message) throws Exception { + if(message instanceof FindPrimary) { + FindPrimary fp = (FindPrimary)message; + Object resp = findPrimaryResponses.get(fp.getShardName()); + if(resp == null) { + log.error("No expected FindPrimary response found for shard name {}", fp.getShardName()); + } else { + getSender().tell(resp, getSelf()); + } + + return; + } + if(found){ getSender().tell(new LocalShardFound(actorRef), getSelf()); } else { @@ -64,15 +90,28 @@ public class ActorContextTest extends AbstractActorTest{ } } + void addFindPrimaryResp(String shardName, Object resp) { + findPrimaryResponses.put(shardName, resp); + } + private static Props props(final boolean found, final ActorRef actorRef){ return Props.create(new MockShardManagerCreator(found, actorRef) ); } + private static Props props(){ + return Props.create(new MockShardManagerCreator() ); + } + @SuppressWarnings("serial") private static class MockShardManagerCreator implements Creator { final boolean found; final ActorRef actorRef; + MockShardManagerCreator() { + this.found = false; + this.actorRef = null; + } + MockShardManagerCreator(boolean found, ActorRef actorRef) { this.found = found; this.actorRef = actorRef; @@ -287,18 +326,15 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testRateLimiting(){ - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + transactionCreationInitialRateLimit(155L).build(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), dataStoreContext); // Check that the initial value is being picked up from DataStoreContext - assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); + assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); actorContext.setTxCreationLimit(1.0); @@ -320,16 +356,9 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -337,18 +366,11 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testClientDispatcherIsNotGlobalDispatcher(){ - - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); - ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext); + mock(Configuration.class), DatastoreContext.newBuilder().build()); assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -388,15 +410,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); @@ -431,15 +450,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new PrimaryNotFound("foobar")); @@ -459,7 +475,6 @@ public class ActorContextTest extends AbstractActorTest{ Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); assertNull(cached); - } @Test @@ -468,15 +483,12 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); - DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); - - doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); - doReturn("config").when(mockDataStoreContext).getDataStoreType(); - doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), mockDataStoreContext) { + mock(Configuration.class), dataStoreContext) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { return Futures.successful((Object) new ActorNotInitialized()); @@ -496,7 +508,49 @@ public class ActorContextTest extends AbstractActorTest{ Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); assertNull(cached); + } + + @Test + public void testBroadcast() { + new JavaTestKit(getSystem()) {{ + ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); + MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); + shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable()); + shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable()); + shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); + + Configuration mockConfig = mock(Configuration.class); + doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))). + when(mockConfig).getAllShardNames(); + ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, + mock(ClusterWrapper.class), mockConfig, + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + + actorContext.broadcast(new TestMessage()); + + expectFirstMatching(shardActorRef1, TestMessage.class); + expectFirstMatching(shardActorRef2, TestMessage.class); + }}; } + private T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java index 4bd0ad818f..d62c9dbc28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java @@ -10,13 +10,14 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.UntypedActor; - import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.junit.Assert; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -31,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration; *

*/ public class MessageCollectorActor extends UntypedActor { - private List messages = new ArrayList<>(); + private final List messages = new ArrayList<>(); @Override public void onReceive(Object message) throws Exception { if(message instanceof String){ @@ -43,6 +44,10 @@ public class MessageCollectorActor extends UntypedActor { } } + public void clear() { + messages.clear(); + } + public static List getAllMessages(ActorRef actor) throws Exception { FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); Timeout operationTimeout = new Timeout(operationDuration); @@ -87,4 +92,20 @@ public class MessageCollectorActor extends UntypedActor { return output; } + public static T expectFirstMatching(ActorRef actor, Class clazz) { + int count = 5000 / 50; + for(int i = 0; i < count; i++) { + try { + T message = (T) getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 81b6bccaf0..63878df23c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; -import com.google.common.base.Optional; public class MockActorContext extends ActorContext { @@ -36,10 +35,6 @@ public class MockActorContext extends ActorContext { return executeRemoteOperationResponse; } - @Override public Optional findPrimaryShard(String shardName) { - return Optional.absent(); - } - public void setExecuteShardOperationResponse(Object response){ executeShardOperationResponse = response; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java index 360ac68a51..a6fdfd3ff9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -53,7 +53,16 @@ public class RemoteRpcImplementation implements DOMRpcImplementation { @Override public void onComplete(final Throwable failure, final Object reply) throws Throwable { if(failure != null) { - LOG.error("InvokeRpc failed", failure); + + // When we return a failure to the caller they can choose to log it if they like + // so here we just do basic warn logging by default and log the stack trace only when debug + // is enabled + + LOG.warn("InvokeRpc failed rpc = {}, identifier = {}", rpcMsg.getRpc(), rpcMsg.getIdentifier()); + + if(LOG.isDebugEnabled()){ + LOG.debug("Detailed Error", failure); + } final String message = String.format("Execution of RPC %s failed", rpcMsg.getRpc()); Collection errors = ((RpcErrorsException)failure).getRpcErrors();