From: Tony Tkacik Date: Wed, 11 Mar 2015 06:13:38 +0000 (+0000) Subject: Merge "Bug 2412: Expose Mountpoints on proper path" X-Git-Tag: release/lithium~426 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=0187700726503922bd771575a9382106c87c8df8;hp=026f5db7c93f7317f4e0c6c22c93be502364fcf7;p=controller.git Merge "Bug 2412: Expose Mountpoints on proper path" --- diff --git a/features/mdsal/pom.xml b/features/mdsal/pom.xml index 5b5c1b94e0..bd0a99d9ca 100644 --- a/features/mdsal/pom.xml +++ b/features/mdsal/pom.xml @@ -300,6 +300,10 @@ xml config + + org.opendaylight.controller + netconf-ssh + org.opendaylight.controller.model diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java index 3ce1f5d1e8..72b5ac9515 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java @@ -18,12 +18,18 @@ package org.opendaylight.controller.cluster.raft.base.messages; */ public class FollowerInitialSyncUpStatus { private final boolean initialSyncDone; + private final String name; - public FollowerInitialSyncUpStatus(boolean initialSyncDone){ + public FollowerInitialSyncUpStatus(boolean initialSyncDone, String name){ this.initialSyncDone = initialSyncDone; + this.name = name; } public boolean isInitialSyncDone() { return initialSyncDone; } + + public String getName() { + return name; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index ef5f11e37a..e814cd000d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -476,4 +476,8 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } } + protected String getId(){ + return context.getId(); + } + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 618865cb88..0f251a3012 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -352,7 +352,7 @@ public class Follower extends AbstractRaftActorBehavior { return snapshotTracker; } - private static class InitialSyncStatusTracker { + private class InitialSyncStatusTracker { private static final long INVALID_LOG_INDEX = -2L; private long initialLeaderCommit = INVALID_LOG_INDEX; @@ -374,10 +374,10 @@ public class Follower extends AbstractRaftActorBehavior { if(!initialSyncUpDone){ if(initialLeaderCommit == INVALID_LOG_INDEX){ - actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender()); + actor.tell(new FollowerInitialSyncUpStatus(false, getId()), ActorRef.noSender()); initialLeaderCommit = leaderCommit; } else if(commitIndex >= initialLeaderCommit){ - actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender()); + actor.tell(new FollowerInitialSyncUpStatus(true, getId()), ActorRef.noSender()); initialSyncUpDone = true; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java index 80aa3793c1..ec867dda0b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationCompleter.java @@ -12,7 +12,7 @@ import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; -final class OperationCompleter extends OnComplete { +public final class OperationCompleter extends OnComplete { private final Semaphore operationLimiter; OperationCompleter(Semaphore operationLimiter){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 52b4652de6..e704e42465 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -67,6 +67,7 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; @@ -295,6 +296,9 @@ public class Shard extends RaftActor { onDatastoreContext((DatastoreContext)message); } else if(message instanceof RegisterRoleChangeListener){ roleChangeNotifier.get().forward(message, context()); + } else if (message instanceof FollowerInitialSyncUpStatus){ + shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone()); + context().parent().tell(message, self()); } else { super.onReceiveCommand(message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index c441afb497..136c6813ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -55,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -166,16 +168,31 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ignoreMessage(message); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); - } else if(message instanceof RoleChangeNotification){ + } else if(message instanceof RoleChangeNotification) { onRoleChangeNotification((RoleChangeNotification) message); + } else if(message instanceof FollowerInitialSyncUpStatus){ + onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); } else{ unknownMessage(message); } } - private void onRoleChangeNotification(RoleChangeNotification message) { - RoleChangeNotification roleChanged = message; + private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + LOG.info("Received follower initial sync status for {} status sync done {}", status.getName(), + status.isInitialSyncDone()); + + ShardInformation shardInformation = findShardInformation(status.getName()); + + if(shardInformation != null) { + shardInformation.setFollowerSyncStatus(status.isInitialSyncDone()); + + mBean.setSyncStatus(isInSync()); + } + + } + + private void onRoleChangeNotification(RoleChangeNotification roleChanged) { LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), roleChanged.getOldRole(), roleChanged.getNewRole()); @@ -189,6 +206,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { waitTillReadyCountdownLatch.countDown(); } + + mBean.setSyncStatus(isInSync()); } } @@ -214,6 +233,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return isReady; } + private boolean isInSync(){ + for (ShardInformation info : localShards.values()) { + if(!info.isInSync()){ + return false; + } + } + return true; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -519,6 +547,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return dataPersistenceProvider; } + @VisibleForTesting + ShardManagerInfoMBean getMBean(){ + return mBean; + } + private class ShardInformation { private final ShardIdentifier shardId; private final String shardName; @@ -529,6 +562,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // flag that determines if the actor is ready for business private boolean actorInitialized = false; + private boolean followerSyncStatus = false; + private final List runnablesOnInitialized = Lists.newArrayList(); private String role ; @@ -607,6 +642,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return this.role; } + public void setFollowerSyncStatus(boolean syncStatus){ + this.followerSyncStatus = syncStatus; + } + + public boolean isInSync(){ + if(RaftState.Follower.name().equals(this.role)){ + return followerSyncStatus; + } else if(RaftState.Leader.name().equals(this.role)){ + return true; + } + + return false; + } + } private static class ShardManagerCreator implements Creator { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index ee3a5cc825..58ac1d8b82 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -175,45 +175,47 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { /** * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the + * before we initiate the next Tx in the chain to avoid creation failures if the * previous Tx's ready operations haven't completed yet. */ @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - + protected Future sendFindPrimaryShardAsync(final String shardName) { // Check if there are any previous ready Futures, otherwise let the super class handle it. if(previousReadyFutures.isEmpty()) { - return super.sendCreateTransaction(shard, serializedCreateMessage); + return super.sendFindPrimaryShardAsync(shardName); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", + previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); } // Combine the ready Futures into 1. Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getActorSystem().dispatcher()); + previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); + final Promise returnPromise = akka.dispatch.Futures.promise(); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) { if(failure != null) { // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); + returnPromise.failure(failure); } else { - LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", + LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", getIdentifier(), getTransactionChainId()); - // Send the CreateTx message and use the resulting Future to complete the + // Send the FindPrimaryShard message and use the resulting Future to complete the // returned Promise. - createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, - serializedCreateMessage)); + returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); } } }; - combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - return createTxPromise.future(); + return returnPromise.future(); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index 1e222e4c0a..c1f9c78e69 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -37,7 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -class TransactionContextImpl extends AbstractTransactionContext { +public class TransactionContextImpl extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); private final ActorContext actorContext; @@ -49,7 +49,7 @@ class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; - TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, + protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); @@ -153,8 +153,8 @@ class TransactionContextImpl extends AbstractTransactionContext { } else { // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("Invalid reply type %s", - serializedReadyReply.getClass())); + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + identifier, serializedReadyReply.getClass())); } } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 0bc82af335..64b9086c25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -425,18 +426,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { protected void onTransactionReady(List> cohortFutures) { } - /** - * Method called to send a CreateTransaction message to a shard. - * - * @param shard the shard actor to send to - * @param serializedCreateMessage the serialized message to send - * @return the response Future - */ - protected Future sendCreateTransaction(ActorSelection shard, - Object serializedCreateMessage) { - return actorContext.executeOperationAsync(shard, serializedCreateMessage); - } - @Override public Object getIdentifier() { return this.identifier; @@ -465,14 +454,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return ShardStrategyFactory.getStrategy(path).findShard(path); } + protected Future sendFindPrimaryShardAsync(String shardName) { + return actorContext.findPrimaryShardAsync(shardName); + } + private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { String shardName = shardNameFromIdentifier(path); TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); if(txFutureCallback == null) { - Future findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName); + Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - final TransactionFutureCallback newTxFutureCallback = - new TransactionFutureCallback(shardName); + final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); txFutureCallback = newTxFutureCallback; txFutureCallbackMap.put(shardName, txFutureCallback); @@ -598,10 +590,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Performs a CreateTransaction try async. */ private void tryCreateTransaction() { - Future createTxFuture = sendCreateTransaction(primaryShard, - new CreateTransaction(identifier.toString(), - TransactionProxy.this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + Object serializedCreateMessage = new CreateTransaction(identifier.toString(), + TransactionProxy.this.transactionType.ordinal(), + getTransactionChainId()).toSerializable(); + + Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); createTxFuture.onComplete(this, actorContext.getClientDispatcher()); } @@ -731,7 +724,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } else { - return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier, + return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java similarity index 83% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index 65d82b73d9..e407c7cc47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LegacyTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -5,9 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore; +package org.opendaylight.controller.cluster.datastore.compat; import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.OperationCompleter; +import org.opendaylight.controller.cluster.datastore.TransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; @@ -23,9 +25,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * * @author Thomas Pantelis */ -class LegacyTransactionContextImpl extends TransactionContextImpl { +public class PreLithiumTransactionContextImpl extends TransactionContextImpl { - LegacyTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, + public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(transactionPath, actor, identifier, actorContext, schemaContext, isTxActorLocal, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java index 945ae0a478..6222d3be09 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java @@ -67,6 +67,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private boolean followerInitialSyncStatus = false; + public ShardStats(final String shardName, final String mxBeanType) { super(shardName, mxBeanType, JMX_CATEGORY_SHARD); } @@ -276,4 +278,13 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { public void setDataStore(final InMemoryDOMDataStore store) { setNotificationManager(store.getDataChangeListenerNotificationManager()); } + + public void setFollowerInitialSyncStatus(boolean followerInitialSyncStatus) { + this.followerInitialSyncStatus = followerInitialSyncStatus; + } + + @Override + public boolean getFollowerInitialSyncStatus() { + return followerInitialSyncStatus; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java index 9698f6fe22..0281cdd8ce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java @@ -53,4 +53,6 @@ public interface ShardStatsMXBean { void resetTransactionCounters(); long getInMemoryJournalDataSize(); + + boolean getFollowerInitialSyncStatus(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java index 99c8daf87d..8adc8b24b2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java @@ -18,6 +18,8 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo private final List localShards; + private boolean syncStatus = false; + public ShardManagerInfo(String name, String mxBeanType, List localShards) { super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER); this.localShards = localShards; @@ -36,4 +38,13 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo public List getLocalShards() { return localShards; } + + @Override + public boolean getSyncStatus() { + return this.syncStatus; + } + + public void setSyncStatus(boolean syncStatus){ + this.syncStatus = syncStatus; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java index 28ccc4f0b3..b64ba74782 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java @@ -12,4 +12,5 @@ import java.util.List; public interface ShardManagerInfoMBean { List getLocalShards(); + boolean getSyncStatus(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java new file mode 100644 index 0000000000..4896b059c7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -0,0 +1,408 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.Futures; +import akka.testkit.JavaTestKit; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.CheckedFuture; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +/** + * Abstract base class for TransactionProxy unit tests. + * + * @author Thomas Pantelis + */ +public abstract class AbstractTransactionProxyTest { + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private static ActorSystem system; + + private final Configuration configuration = new MockConfiguration(); + + @Mock + protected ActorContext mockActorContext; + + private SchemaContext schemaContext; + + @Mock + private ClusterWrapper mockClusterWrapper; + + protected final String memberName = "mock-member"; + + protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). + shardBatchedModificationCount(1); + + @BeforeClass + public static void setUpClass() throws IOException { + + Config config = ConfigFactory.parseMap(ImmutableMap.builder(). + put("akka.actor.default-dispatcher.type", + "akka.testkit.CallingThreadDispatcherConfigurator").build()). + withFallback(ConfigFactory.load()); + system = ActorSystem.create("test", config); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + ShardStrategyFactory.setConfiguration(configuration); + } + + protected ActorSystem getSystem() { + return system; + } + + protected CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + + return false; + } + }; + + return argThat(matcher); + } + + protected DataExists eqSerializedDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + protected DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof DataExists) && + ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + protected ReadData eqSerializedReadData() { + return eqSerializedReadData(TestModel.TEST_PATH); + } + + protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(path); + } + }; + + return argThat(matcher); + } + + protected ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof ReadData) && + ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + protected Future readySerializedTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); + } + + protected Future readyTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path)); + } + + protected Future readSerializedDataReply(NormalizedNode data, + short transactionVersion) { + return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable()); + } + + protected Future readSerializedDataReply(NormalizedNode data) { + return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION); + } + + protected Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION)); + } + + protected Future dataExistsSerializedReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } + + protected Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists)); + } + + protected Future batchedModificationsReply(int count) { + return Futures.successful(new BatchedModificationsReply(count)); + } + + protected Future incompleteFuture(){ + return mock(Future.class); + } + + protected ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + + protected void expectBatchedModifications(ActorRef actorRef, int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } + + protected void expectBatchedModifications(int count) { + doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); + } + + protected void expectIncompleteBatchedModifications() { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(BatchedModifications.class)); + } + + protected void expectReadyTransaction(ActorRef actorRef) { + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + } + + protected void expectFailedBatchedModifications(ActorRef actorRef) { + doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } + + protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1") + .setMessageVersion(transactionVersion) + .build(); + } + + protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { + ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + log.info("Created mock shard actor {}", actorRef); + + doReturn(actorSystem.actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + + doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + return actorRef; + } + + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion) { + ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); + + return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion, + memberName, shardActorRef); + } + + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion, String prefix, ActorRef shardActorRef) { + + ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + log.info("Created mock shard Tx actor {}", txActorRef); + + doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext).actorSelection( + txActorRef.path().toString()); + + doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(prefix, type)); + + return txActorRef; + } + + protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); + } + + + protected void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } + + protected List captureBatchedModifications(ActorRef actorRef) { + ArgumentCaptor batchedModificationsCaptor = + ArgumentCaptor.forClass(BatchedModifications.class); + verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( + eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); + + List batchedModifications = filterCaptured( + batchedModificationsCaptor, BatchedModifications.class); + return batchedModifications; + } + + protected List filterCaptured(ArgumentCaptor captor, Class type) { + List captured = new ArrayList<>(); + for(T c: captor.getAllValues()) { + if(type.isInstance(c)) { + captured.add(c); + } + } + + return captured; + } + + protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), expected); + } + + protected void verifyBatchedModifications(Object message, Modification... expected) { + assertEquals("Message type", BatchedModifications.class, message.getClass()); + BatchedModifications batchedModifications = (BatchedModifications)message; + assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); + for(int i = 0; i < batchedModifications.getModifications().size(); i++) { + Modification actual = batchedModifications.getModifications().get(i); + assertEquals("Modification type", expected[i].getClass(), actual.getClass()); + assertEquals("getPath", ((AbstractModification)expected[i]).getPath(), + ((AbstractModification)actual).getPath()); + if(actual instanceof WriteModification) { + assertEquals("getData", ((WriteModification)expected[i]).getData(), + ((WriteModification)actual).getData()); + } + } + } + + protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorSelection) { + ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + assertTrue(String.format("Expected exception type %s. Actual %s", + expReply, e.getClass()), ((Class)expReply).isInstance(e)); + } + } + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index f0cdacc9ef..c005751380 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -20,8 +20,10 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import java.net.URI; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -48,6 +50,7 @@ import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -469,6 +472,132 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testByDefaultSyncStatusIsFalse() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + } + + @Test + public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Follower.name(), RaftState.Leader.name())); + + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + } + + @Test + public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Follower.name(), RaftState.Candidate.name())); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown")); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + } + + @Test + public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Candidate.name(), RaftState.Follower.name())); + + // Initially will be false + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Send status true will make sync status true + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown")); + + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + + // Send status false will make sync status false + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown")); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + } + + @Test + public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration() { + @Override + public List getMemberShardNames(String memberName) { + return Arrays.asList("default", "astronauts"); + } + }, + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + + // Initially will be false + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Make default shard leader + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Follower.name(), RaftState.Leader.name())); + + // default = Leader, astronauts is unknown so sync status remains false + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Make astronauts shard leader as well + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown", + RaftState.Follower.name(), RaftState.Leader.name())); + + // Now sync status should be true + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + + // Make astronauts a Follower + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown", + RaftState.Leader.name(), RaftState.Follower.name())); + + // Sync status is not true + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Make the astronauts follower sync status true + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown")); + + // Sync status is now true + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + + } private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 7dfbd668b8..d930b2519e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -84,6 +84,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; @@ -1618,7 +1619,25 @@ public class ShardTest extends AbstractActorTest { List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); assertEquals(1, allMatching.size()); - }}; + } + }; + } + + @Test + public void testFollowerInitialSyncStatus() throws Exception { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testFollowerInitialSyncStatus"); + + shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational")); + + assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus()); + + shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational")); + + assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus()); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index 88ab0dd292..4f00ed5f4b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -10,47 +10,44 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import akka.actor.ActorRef; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Promise; -public class TransactionChainProxyTest extends AbstractActorTest{ - ActorContext actorContext = null; - SchemaContext schemaContext = mock(SchemaContext.class); - - @Mock - ActorContext mockActorContext; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - actorContext = new MockActorContext(getSystem()); - actorContext.setSchemaContext(schemaContext); - - doReturn(schemaContext).when(mockActorContext).getSchemaContext(); - doReturn(DatastoreContext.newBuilder().build()).when(mockActorContext).getDatastoreContext(); - } +public class TransactionChainProxyTest extends AbstractTransactionProxyTest { @SuppressWarnings("resource") @Test public void testNewReadOnlyTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction(); Assert.assertTrue(dst instanceof DOMStoreReadTransaction); } @@ -58,7 +55,7 @@ public class TransactionChainProxyTest extends AbstractActorTest{ @SuppressWarnings("resource") @Test public void testNewReadWriteTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction(); Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction); } @@ -66,18 +63,16 @@ public class TransactionChainProxyTest extends AbstractActorTest{ @SuppressWarnings("resource") @Test public void testNewWriteOnlyTransaction() throws Exception { - DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction(); + DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction(); Assert.assertTrue(dst instanceof DOMStoreWriteTransaction); } @Test public void testClose() throws Exception { - ActorContext context = mock(ActorContext.class); + new TransactionChainProxy(mockActorContext).close(); - new TransactionChainProxy(context).close(); - - verify(context, times(1)).broadcast(anyObject()); + verify(mockActorContext, times(1)).broadcast(anyObject()); } @Test @@ -115,4 +110,93 @@ public class TransactionChainProxyTest extends AbstractActorTest{ verify(mockActorContext, times(0)).acquireTxCreationPermit(); } + + /** + * Tests 2 successive chained read-write transactions and verifies the second transaction isn't + * initiated until the first one completes its read future. + */ + @Test + public void testChainedReadWriteTransactions() throws Exception { + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + expectBatchedModifications(txActorRef1, 1); + + Promise readyReplyPromise1 = akka.dispatch.Futures.promise(); + doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction(); + + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx1.write(TestModel.TEST_PATH, writeNode1); + + writeTx1.ready(); + + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1)); + + String tx2MemberName = "tx2MemberName"; + doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName(); + ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem()); + ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, + DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2); + + expectBatchedModifications(txActorRef2, 1); + + final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction(); + + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch write2Complete = new CountDownLatch(1); + new Thread() { + @Override + public void run() { + try { + writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2); + } catch (Exception e) { + caughtEx.set(e); + } finally { + write2Complete.countDown(); + } + } + }.start(); + + assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS)); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + try { + verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), + eqCreateTransaction(tx2MemberName, READ_WRITE)); + } catch (AssertionError e) { + fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); + } + + readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get()); + + verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), + eqCreateTransaction(tx2MemberName, READ_WRITE)); + } + + @Test(expected=IllegalStateException.class) + public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + expectBatchedModifications(actorRef, 1); + + TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext); + + DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction(); + + NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx1.write(TestModel.TEST_PATH, writeNode1); + + NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME); + + txChainProxy.newWriteOnlyTransaction(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index abfe7eae22..8278d3cffc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -6,11 +6,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -21,78 +19,44 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; -import akka.testkit.JavaTestKit; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; import org.mockito.InOrder; -import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.DataExists; -import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; -import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; -import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; -import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; -import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.Duration; @SuppressWarnings("resource") -public class TransactionProxyTest { +public class TransactionProxyTest extends AbstractTransactionProxyTest { @SuppressWarnings("serial") static class TestException extends RuntimeException { @@ -102,291 +66,6 @@ public class TransactionProxyTest { CheckedFuture invoke(TransactionProxy proxy) throws Exception; } - private static ActorSystem system; - - private final Configuration configuration = new MockConfiguration(); - - @Mock - private ActorContext mockActorContext; - - private SchemaContext schemaContext; - - @Mock - private ClusterWrapper mockClusterWrapper; - - private final String memberName = "mock-member"; - - private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2). - shardBatchedModificationCount(1); - - @BeforeClass - public static void setUpClass() throws IOException { - - Config config = ConfigFactory.parseMap(ImmutableMap.builder(). - put("akka.actor.default-dispatcher.type", - "akka.testkit.CallingThreadDispatcherConfigurator").build()). - withFallback(ConfigFactory.load()); - system = ActorSystem.create("test", config); - } - - @AfterClass - public static void tearDownClass() throws IOException { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - @Before - public void setUp(){ - MockitoAnnotations.initMocks(this); - - schemaContext = TestModel.createTestContext(); - - doReturn(getSystem()).when(mockActorContext).getActorSystem(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); - doReturn(memberName).when(mockActorContext).getCurrentMemberName(); - doReturn(schemaContext).when(mockActorContext).getSchemaContext(); - doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); - doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); - doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - - ShardStrategyFactory.setConfiguration(configuration); - } - - private ActorSystem getSystem() { - return system; - } - - private CreateTransaction eqCreateTransaction(final String memberName, - final TransactionType type) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) { - CreateTransaction obj = CreateTransaction.fromSerializable(argument); - return obj.getTransactionId().startsWith(memberName) && - obj.getTransactionType() == type.ordinal(); - } - - return false; - } - }; - - return argThat(matcher); - } - - private DataExists eqSerializedDataExists() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && - DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private DataExists eqDataExists() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return (argument instanceof DataExists) && - ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private ReadData eqSerializedReadData() { - return eqSerializedReadData(TestModel.TEST_PATH); - } - - private ReadData eqSerializedReadData(final YangInstanceIdentifier path) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && - ReadData.fromSerializable(argument).getPath().equals(path); - } - }; - - return argThat(matcher); - } - - private ReadData eqReadData() { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return (argument instanceof ReadData) && - ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); - } - }; - - return argThat(matcher); - } - - private WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) { - WriteData obj = WriteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) { - MergeData obj = MergeData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); - } - - return false; - } - }; - - return argThat(matcher); - } - - private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { - ArgumentMatcher matcher = new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) && - DeleteData.fromSerializable(argument).getPath().equals(expPath); - } - }; - - return argThat(matcher); - } - - private Future readySerializedTxReply(String path) { - return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); - } - - private Future readyTxReply(String path) { - return Futures.successful((Object)new ReadyTransactionReply(path)); - } - - private Future readSerializedDataReply(NormalizedNode data, - short transactionVersion) { - return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable()); - } - - private Future readSerializedDataReply(NormalizedNode data) { - return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION); - } - - private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION)); - } - - private Future dataExistsSerializedReply(boolean exists) { - return Futures.successful(new DataExistsReply(exists).toSerializable()); - } - - private Future dataExistsReply(boolean exists) { - return Futures.successful(new DataExistsReply(exists)); - } - - private Future batchedModificationsReply(int count) { - return Futures.successful(new BatchedModificationsReply(count)); - } - - private Future incompleteFuture(){ - return mock(Future.class); - } - - private ActorSelection actorSelection(ActorRef actorRef) { - return getSystem().actorSelection(actorRef.path()); - } - - private void expectBatchedModifications(ActorRef actorRef, int count) { - doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); - } - - private void expectBatchedModifications(int count) { - doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); - } - - private void expectIncompleteBatchedModifications() { - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); - } - - private void expectReadyTransaction(ActorRef actorRef) { - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - } - - private void expectFailedBatchedModifications(ActorRef actorRef) { - doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(BatchedModifications.class)); - } - - private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .setMessageVersion(transactionVersion) - .build(); - } - - private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { - ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(actorSystem.actorSelection(actorRef.path())). - when(mockActorContext).actorSelection(actorRef.path().toString()); - - doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). - when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); - - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - - return actorRef; - } - - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion) { - ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); - - doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), - eqCreateTransaction(memberName, type)); - - return actorRef; - } - - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { - return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); - } - - - private void propagateReadFailedExceptionCause(CheckedFuture future) - throws Throwable { - - try { - future.checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - throw e.getCause(); - } - } - @Test public void testRead() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); @@ -840,31 +519,6 @@ public class TransactionProxyTest { BatchedModificationsReply.class); } - private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, - Object... expReplies) throws Exception { - assertEquals("getReadyOperationFutures size", expReplies.length, - proxy.getCohortFutures().size()); - - int i = 0; - for( Future future: proxy.getCohortFutures()) { - assertNotNull("Ready operation Future is null", future); - - Object expReply = expReplies[i++]; - if(expReply instanceof ActorSelection) { - ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - assertEquals("Cohort actor path", expReply, actual); - } else { - // Expecting exception. - try { - Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - fail("Expected exception from ready operation Future"); - } catch(Exception e) { - // Expected - } - } - } - } - @Test public void testReady() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); @@ -898,74 +552,6 @@ public class TransactionProxyTest { isA(BatchedModifications.class)); } - private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), - READ_WRITE, version); - - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), eqSerializedReadData()); - - doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - - doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode)); - - doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); - - expectReadyTransaction(actorRef); - - doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - - Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). - get(5, TimeUnit.SECONDS); - - assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - assertEquals("Response NormalizedNode", testNode, readOptional.get()); - - transactionProxy.write(TestModel.TEST_PATH, testNode); - - transactionProxy.merge(TestModel.TEST_PATH, testNode); - - transactionProxy.delete(TestModel.TEST_PATH); - - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - ShardTransactionMessages.WriteDataReply.class, ShardTransactionMessages.MergeDataReply.class, - ShardTransactionMessages.DeleteDataReply.class); - - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); - - return actorRef; - } - - @Test - public void testCompatibilityWithBaseHeliumVersion() throws Exception { - ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); - - verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - } - - @Test - public void testCompatibilityWithHeliumR1Version() throws Exception { - ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION); - - verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), - eq(actorRef.path().toString())); - } - @Test public void testReadyWithRecordingOperationFailure() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); @@ -1777,49 +1363,4 @@ public class TransactionProxyTest { verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); } - - private List captureBatchedModifications(ActorRef actorRef) { - ArgumentCaptor batchedModificationsCaptor = - ArgumentCaptor.forClass(BatchedModifications.class); - verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync( - eq(actorSelection(actorRef)), batchedModificationsCaptor.capture()); - - List batchedModifications = filterCaptured( - batchedModificationsCaptor, BatchedModifications.class); - return batchedModifications; - } - - private List filterCaptured(ArgumentCaptor captor, Class type) { - List captured = new ArrayList<>(); - for(T c: captor.getAllValues()) { - if(type.isInstance(c)) { - captured.add(c); - } - } - - return captured; - } - - private void verifyOneBatchedModification(ActorRef actorRef, Modification expected) { - List batchedModifications = captureBatchedModifications(actorRef); - assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - - verifyBatchedModifications(batchedModifications.get(0), expected); - } - - private void verifyBatchedModifications(Object message, Modification... expected) { - assertEquals("Message type", BatchedModifications.class, message.getClass()); - BatchedModifications batchedModifications = (BatchedModifications)message; - assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size()); - for(int i = 0; i < batchedModifications.getModifications().size(); i++) { - Modification actual = batchedModifications.getModifications().get(i); - assertEquals("Modification type", expected[i].getClass(), actual.getClass()); - assertEquals("getPath", ((AbstractModification)expected[i]).getPath(), - ((AbstractModification)actual).getPath()); - if(actual instanceof WriteModification) { - assertEquals("getData", ((WriteModification)expected[i]).getData(), - ((WriteModification)actual).getData()); - } - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java new file mode 100644 index 0000000000..08c32c9a54 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.compat; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import com.google.common.base.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.AbstractTransactionProxyTest; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy; +import org.opendaylight.controller.cluster.datastore.TransactionProxy; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; +import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +/** + * Unit tests for backwards compatibility with pre-Lithium versions. + * + * @author Thomas Pantelis + */ +public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest { + + private WriteData eqLegacyWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(ShardTransactionMessages.WriteData.class.equals(argument.getClass())) { + WriteData obj = WriteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private MergeData eqLegacyMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(ShardTransactionMessages.MergeData.class.equals(argument.getClass())) { + MergeData obj = MergeData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private DeleteData eqLegacyDeleteData(final YangInstanceIdentifier expPath) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ShardTransactionMessages.DeleteData.class.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(expPath); + } + }; + + return argThat(matcher); + } + + private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(testNode, version)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData(TestModel.TEST_PATH)); + + doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); + + doReturn(Futures.successful(new MergeDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyMergeData(testNode)); + + doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + Optional> readOptional = transactionProxy.read(TestModel.TEST_PATH). + get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + assertEquals("Response NormalizedNode", testNode, readOptional.get()); + + transactionProxy.write(TestModel.TEST_PATH, testNode); + + transactionProxy.merge(TestModel.TEST_PATH, testNode); + + transactionProxy.delete(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + return actorRef; + } + + @Test + public void testCompatibilityWithBaseHeliumVersion() throws Exception { + ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.BASE_HELIUM_VERSION); + + verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } + + @Test + public void testCompatibilityWithHeliumR1Version() throws Exception { + ActorRef actorRef = testCompatibilityWithHeliumVersion(DataStoreVersions.HELIUM_1_VERSION); + + verify(mockActorContext, Mockito.never()).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } +}