Merge "Bug 2055: Handle shard not initialized resiliently"
authorMoiz Raja <moraja@cisco.com>
Mon, 27 Oct 2014 17:14:58 +0000 (17:14 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 27 Oct 2014 17:14:58 +0000 (17:14 +0000)
14 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index acf630e2e95598e71fdbd786da628f3524a29408..b2ae060c3d30c219615432ad59d0b495f93e4294 100644 (file)
@@ -8,14 +8,27 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import scala.concurrent.Future;
 
 /**
  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
@@ -24,25 +37,36 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  * </p>
  */
+@SuppressWarnings("rawtypes")
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+    public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+
     private volatile ActorSelection listenerRegistrationActor;
-    private final AsyncDataChangeListener listener;
-    private final ActorRef dataChangeListenerActor;
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private ActorRef dataChangeListenerActor;
+    private final String shardName;
+    private final ActorContext actorContext;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        ActorSelection listenerRegistrationActor,
-        L listener, ActorRef dataChangeListenerActor) {
-        this.listenerRegistrationActor = listenerRegistrationActor;
+                                                              DataChangeListenerRegistrationProxy (
+            String shardName, ActorContext actorContext, L listener) {
+        this.shardName = shardName;
+        this.actorContext = actorContext;
         this.listener = listener;
-        this.dataChangeListenerActor = dataChangeListenerActor;
     }
 
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        L listener, ActorRef dataChangeListenerActor) {
-        this(null, listener, dataChangeListenerActor);
+    @VisibleForTesting
+    ActorSelection getListenerRegistrationActor() {
+        return listenerRegistrationActor;
+    }
+
+    @VisibleForTesting
+    ActorRef getDataChangeListenerActor() {
+        return dataChangeListenerActor;
     }
 
     @Override
@@ -50,7 +74,11 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         return listener;
     }
 
-    public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+    private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+        if(listenerRegistrationActor == null) {
+            return;
+        }
+
         boolean sendCloseMessage = false;
         synchronized(this) {
             if(closed) {
@@ -59,16 +87,55 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                 this.listenerRegistrationActor = listenerRegistrationActor;
             }
         }
+
         if(sendCloseMessage) {
             listenerRegistrationActor.tell(new
                 CloseDataChangeListenerRegistration().toSerializable(), null);
         }
+    }
 
-        this.listenerRegistrationActor = listenerRegistrationActor;
+    public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+        dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+                DataChangeListener.props(listener));
+
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable failure, ActorRef shard) {
+                if(failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+                            "cannot be registered", shardName, listener, path);
+                } else if(failure != null) {
+                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+                            "cannot be registered: {}", shardName, listener, path, failure);
+                } else {
+                    doRegistration(shard, path, scope);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
     }
 
-    public ActorSelection getListenerRegistrationActor() {
-        return listenerRegistrationActor;
+    private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+            DataChangeScope scope) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                REGISTER_TIMEOUT);
+
+        future.onComplete(new OnComplete<Object>(){
+            @Override
+            public void onComplete(Throwable failure, Object result) {
+                if(failure != null) {
+                    LOG.error("Failed to register DataChangeListener {} at path {}",
+                            listener, path.toString(), failure);
+                } else {
+                    RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath()));
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
     }
 
     @Override
@@ -79,11 +146,16 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             sendCloseMessage = !closed && listenerRegistrationActor != null;
             closed = true;
         }
+
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
+                    ActorRef.noSender());
+            listenerRegistrationActor = null;
         }
 
-        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+        if(dataChangeListenerActor != null) {
+            dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            dataChangeListenerActor = null;
+        }
     }
 }
index f6c31aab04c76a376e18dad96dae95aa9cffb637..2c73807dca858f754dd4521a390e0777d6ff60d6 100644 (file)
@@ -8,16 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -34,7 +28,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 /**
  *
@@ -83,39 +76,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
-
-        //if shard is NOT local
-        if (!shard.isPresent()) {
-            LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
-            return new NoOpDataChangeListenerRegistration(listener);
-        }
-        //if shard is local
-        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
-        Future future = actorContext.executeOperationAsync(shard.get(),
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-                new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
-
         final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
-                new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
-
-        future.onComplete(new OnComplete() {
-
-            @Override
-            public void onComplete(Throwable failure, Object result)
-                    throws Throwable {
-                if (failure != null) {
-                    LOG.error("Failed to register listener at path " + path.toString(), failure);
-                    return;
-                }
-                RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-                listenerRegistrationProxy.setListenerRegistrationActor(actorContext
-                        .actorSelection(reply.getListenerRegistrationPath()));
-            }
-        }, actorContext.getActorSystem().dispatcher());
+                new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+        listenerRegistrationProxy.init(path, scope);
 
         return listenerRegistrationProxy;
-
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java
deleted file mode 100644 (file)
index eb28159..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.UntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-
-public class NoOpCohort extends UntypedActor {
-
-    @Override public void onReceive(Object message) throws Exception {
-        if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new CanCommitTransactionReply(false).toSerializable(), getSelf());
-        } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(
-                new PreCommitTransactionReply().toSerializable(),
-                getSelf());
-        } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new CommitTransactionReply().toSerializable(), getSelf());
-        } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new AbortTransactionReply().toSerializable(), getSelf());
-        } else {
-            throw new Exception ("Not recognized message received,message="+message);
-        }
-
-    }
-}
-
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java
deleted file mode 100644 (file)
index 14af31e..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * When a consumer registers a data change listener and no local shard is
- * available to register that listener with then we return an instance of
- * NoOpDataChangeListenerRegistration
- *
- * <p>
- *
- * The NoOpDataChangeListenerRegistration as it's name suggests does
- * nothing when an operation is invoked on it
- */
-public class NoOpDataChangeListenerRegistration
-    implements ListenerRegistration {
-
-    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-        listener;
-
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-
-        this.listener = listener;
-    }
-
-    @Override
-    public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-        return listener;
-    }
-
-    @Override public void close() {
-
-    }
-}
index d0bb3d3b69824d18b2acdb08a4defb50d31953b2..d831b7c1844006fca5bea575ee20d656c4b616c4 100644 (file)
@@ -217,6 +217,10 @@ public class Shard extends RaftActor {
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+            // Even though recovery failed, we still need to finish our recovery, eg send the
+            // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+            onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
         }
@@ -699,12 +703,15 @@ public class Shard extends RaftActor {
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
-        // Schedule a message to be periodically sent to check if the current in-progress
-        // transaction should be expired and aborted.
-        FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
-        txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
-                period, period, getSelf(),
-                TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        // Being paranoid here - this method should only be called once but just in case...
+        if(txCommitTimeoutCheckSchedule == null) {
+            // Schedule a message to be periodically sent to check if the current in-progress
+            // transaction should be expired and aborted.
+            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+                    period, period, getSelf(),
+                    TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        }
     }
 
     @Override
index 157f1cb3771cd71ddd1ddf14d2541bef3a0aefc3..c7c382c82ffa2fc6db82664995d404320674c447 100644 (file)
@@ -25,6 +25,7 @@ import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -163,7 +164,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("Initializing shard [{}]", shardName);
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
-            shardInformation.setShardInitialized(true);
+            shardInformation.setActorInitialized();
         }
     }
 
@@ -192,7 +193,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -200,9 +201,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
-    private void sendResponse(ShardInformation shardInformation,  Supplier<Object> messageSupplier) {
-        if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
-            getSender().tell(new ActorNotInitialized(), getSelf());
+    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+            final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized()) {
+            if(waitUntilInitialized) {
+                final ActorRef sender = getSender();
+                final ActorRef self = self();
+                shardInformation.addRunnableOnInitialized(new Runnable() {
+                    @Override
+                    public void run() {
+                        sender.tell(messageSupplier.get(), self);
+                    }
+                });
+            } else {
+                getSender().tell(new ActorNotInitialized(), getSelf());
+            }
+
             return;
         }
 
@@ -277,7 +291,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            sendResponse(info, new Supplier<Object>() {
+            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
                 @Override
                 public Object get() {
                     return new PrimaryFound(info.getActorPath().toString()).toSerializable();
@@ -422,7 +436,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
-        private boolean shardInitialized = false; // flag that determines if the actor is ready for business
+
+        // flag that determines if the actor is ready for business
+        private boolean actorInitialized = false;
+
+        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<ShardIdentifier, String> peerAddresses) {
@@ -474,11 +492,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardInitialized() {
-            return shardInitialized;
+            return getActor() != null && actorInitialized;
+        }
+
+        void setActorInitialized() {
+            this.actorInitialized = true;
+
+            for(Runnable runnable: runnablesOnInitialized) {
+                runnable.run();
+            }
+
+            runnablesOnInitialized.clear();
         }
 
-        void setShardInitialized(boolean shardInitialized) {
-            this.shardInitialized = shardInitialized;
+        void addRunnableOnInitialized(Runnable runnable) {
+            runnablesOnInitialized.add(runnable);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java
new file mode 100644 (file)
index 0000000..7a976b8
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception thrown when attempting to find a local shard but it doesn't exist.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalShardNotFoundException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public LocalShardNotFoundException(String message){
+        super(message);
+    }
+}
index c415db6efe2fd3da9677db7d16239408253ff2e2..b6560a8347d806e62d1717bea1bab5600238cec2 100644 (file)
@@ -14,12 +14,18 @@ package org.opendaylight.controller.cluster.datastore.messages;
  */
 public class FindLocalShard {
     private final String shardName;
+    private final boolean waitUntilInitialized;
 
-    public FindLocalShard(String shardName) {
+    public FindLocalShard(String shardName, boolean waitUntilInitialized) {
         this.shardName = shardName;
+        this.waitUntilInitialized = waitUntilInitialized;
     }
 
     public String getShardName() {
         return shardName;
     }
+
+    public boolean isWaitUntilInitialized() {
+        return waitUntilInitialized;
+    }
 }
index f5a6a348415308377d755011b133796659d3f27d..a34330bcf6864c26799aae0c6a48990dee1f6f82 100644 (file)
@@ -15,26 +15,33 @@ import com.google.common.base.Preconditions;
  *
  */
 public class FindPrimary implements SerializableMessage{
-  public static final Class SERIALIZABLE_CLASS = FindPrimary.class;
+    public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+
     private final String shardName;
+    private final boolean waitUntilInitialized;
 
-    public FindPrimary(String shardName){
+    public FindPrimary(String shardName, boolean waitUntilInitialized){
 
         Preconditions.checkNotNull(shardName, "shardName should not be null");
 
         this.shardName = shardName;
+        this.waitUntilInitialized = waitUntilInitialized;
     }
 
     public String getShardName() {
         return shardName;
     }
 
-  @Override
-  public Object toSerializable() {
-    return this;
-  }
+    public boolean isWaitUntilInitialized() {
+        return waitUntilInitialized;
+    }
 
-  public static FindPrimary fromSerializable(Object message){
-    return (FindPrimary) message;
-  }
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+
+    public static FindPrimary fromSerializable(Object message){
+        return (FindPrimary) message;
+    }
 }
index 314ae916de1444349816988089011d82b7cd9c8b..0a1e80b0cbaea069f3a75cb558bad130d7562dce 100644 (file)
@@ -13,17 +13,21 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
+import akka.dispatch.Mapper;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -115,14 +119,14 @@ public class ActorContext {
     }
 
     /**
-     * Finds a local shard given it's shard name and return it's ActorRef
+     * Finds a local shard given its shard name and return it's ActorRef
      *
      * @param shardName the name of the local shard that needs to be found
      * @return a reference to a local shard actor which represents the shard
      *         specified by the shardName
      */
     public Optional<ActorRef> findLocalShard(String shardName) {
-        Object result = executeOperation(shardManager, new FindLocalShard(shardName));
+        Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -133,9 +137,40 @@ public class ActorContext {
         return Optional.absent();
     }
 
+    /**
+     * Finds a local shard async given its shard name and return a Future from which to obtain the
+     * ActorRef.
+     *
+     * @param shardName the name of the local shard that needs to be found
+     */
+    public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+        Future<Object> future = executeOperationAsync(shardManager,
+                new FindLocalShard(shardName, true), timeout);
+
+        return future.map(new Mapper<Object, ActorRef>() {
+            @Override
+            public ActorRef checkedApply(Object response) throws Throwable {
+                if(response instanceof LocalShardFound) {
+                    LocalShardFound found = (LocalShardFound)response;
+                    LOG.debug("Local shard found {}", found.getPath());
+                    return found.getPath();
+                } else if(response instanceof ActorNotInitialized) {
+                    throw new NotInitializedException(
+                            String.format("Found local shard for %s but it's not initialized yet.",
+                                    shardName));
+                } else if(response instanceof LocalShardNotFound) {
+                    throw new LocalShardNotFoundException(
+                            String.format("Local shard for %s does not exist.", shardName));
+                }
+
+                throw new UnknownMessageException(String.format(
+                        "FindLocalShard returned unkown response: %s", response));
+            }
+        }, getActorSystem().dispatcher());
+    }
 
     private String findPrimaryPathOrNull(String shardName) {
-        Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
+        Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
index aaf080bdf7d8d50de4a3f31713143389994872a6..c27993f97b9fef669c03f0c378e83f8474944e96 100644 (file)
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
-import junit.framework.Assert;
+import akka.actor.Terminated;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
 
-import java.util.List;
+/**
+ * Unit tests for DataChangeListenerRegistrationProxy.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
 
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNotNull;
-import static junit.framework.TestCase.assertTrue;
+    @SuppressWarnings("unchecked")
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
+            Mockito.mock(AsyncDataChangeListener.class);
 
-public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+    @Test
+    public void testGetInstance() throws Exception {
+        DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                "shard", Mockito.mock(ActorContext.class), mockListener);
+
+        Assert.assertEquals(mockListener, proxy.getInstance());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout=10000)
+    public void testSuccessfulRegistration() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
+
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    "shard-1", actorContext, mockListener);
+
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardFound(getRef()));
+
+            RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+            Assert.assertEquals("getPath", path, registerMsg.getPath());
+            Assert.assertEquals("getScope", scope, registerMsg.getScope());
+
+            reply(new RegisterChangeListenerReply(getRef().path()));
+
+            for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+
+            Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+                    proxy.getListenerRegistrationActor());
+
+            watch(proxy.getDataChangeListenerActor());
 
-    private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+            proxy.close();
 
-    private static class MockDataChangeListener implements
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+            // The listener registration actor should get a Close message
+            expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
 
-        @Override public void onDataChanged(
-            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-            throw new UnsupportedOperationException("onDataChanged");
-        }
+            // The DataChangeListener actor should be terminated
+            expectMsgClass(timeout, Terminated.class);
+
+            proxy.close();
+
+            expectNoMsg();
+        }};
     }
 
-    @Test
-    public void testGetInstance() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    @Test(timeout=10000)
+    public void testLocalShardNotFound() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
 
-        MockDataChangeListener listener =
-            new MockDataChangeListener();
-        DataChangeListenerRegistrationProxy proxy =
-            new DataChangeListenerRegistrationProxy(
-                getSystem().actorSelection(actorRef.path()),
-                listener, dataChangeListenerActor);
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    "shard-1", actorContext, mockListener);
 
-        Assert.assertEquals(listener, proxy.getInstance());
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
 
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardNotFound("shard-1"));
+
+            expectNoMsg(duration("1 seconds"));
+        }};
     }
 
-    @Test
-    public void testClose() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    @Test(timeout=10000)
+    public void testLocalShardNotInitialized() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
 
-        DataChangeListenerRegistrationProxy proxy =
-            new DataChangeListenerRegistrationProxy(
-                getSystem().actorSelection(actorRef.path()),
-                new MockDataChangeListener(), dataChangeListenerActor);
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    "shard-1", actorContext, mockListener);
 
-        proxy.close();
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new ActorNotInitialized());
+
+            new Within(duration("1 seconds")) {
+                @Override
+                protected void run() {
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testFailedRegistration() {
+        new JavaTestKit(getSystem()) {{
+            ActorSystem mockActorSystem = mock(ActorSystem.class);
 
-        //Check if it was received by the remote actor
-        ActorContext
-            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
-        Object messages = testContext
-            .executeOperation(actorRef, "messages");
+            ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
+                    "testFailedRegistration");
+            doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
+            ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
+                    MoreExecutors.sameThreadExecutor());
+            doReturn(executor).when(mockActorSystem).dispatcher();
 
-        assertNotNull(messages);
+            ActorContext actorContext = mock(ActorContext.class);
 
-        assertTrue(messages instanceof List);
+            String shardName = "shard-1";
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    shardName, actorContext, mockListener);
 
-        List<Object> listMessages = (List<Object>) messages;
+            doReturn(mockActorSystem).when(actorContext).getActorSystem();
+            doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+            doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+                    any(Timeout.class));
+            doReturn(Futures.failed(new RuntimeException("mock"))).
+                    when(actorContext).executeOperationAsync(any(ActorRef.class),
+                            any(Object.class), any(Timeout.class));
 
-        assertEquals(1, listMessages.size());
+            proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+                    AsyncDataBroker.DataChangeScope.ONE);
 
-        assertTrue(listMessages.get(0).getClass()
-            .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
+            Assert.assertEquals("getListenerRegistrationActor", null,
+                    proxy.getListenerRegistrationActor());
+        }};
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testCloseWhenRegistrationIsNull() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    public void testCloseBeforeRegistration() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = mock(ActorContext.class);
 
-        DataChangeListenerRegistrationProxy proxy =
-            new DataChangeListenerRegistrationProxy(
-                new MockDataChangeListener(), dataChangeListenerActor);
+            String shardName = "shard-1";
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    shardName, actorContext, mockListener);
 
-        proxy.close();
+            doReturn(getSystem()).when(actorContext).getActorSystem();
+            doReturn(getSystem().actorSelection(getRef().path())).
+                    when(actorContext).actorSelection(getRef().path());
+            doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+            doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+                    any(Timeout.class));
 
-        //Check if it was received by the remote actor
-        ActorContext
-            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
-        Object messages = testContext
-            .executeOperation(actorRef, "messages");
+            Answer<Future<Object>> answer = new Answer<Future<Object>>() {
+                @Override
+                public Future<Object> answer(InvocationOnMock invocation) {
+                    proxy.close();
+                    return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+                }
+            };
 
-        assertNotNull(messages);
+            doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
+                    any(Object.class), any(Timeout.class));
 
-        assertTrue(messages instanceof List);
+            proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+                    AsyncDataBroker.DataChangeScope.ONE);
 
-        List<Object> listMessages = (List<Object>) messages;
+            expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
 
-        assertEquals(0, listMessages.size());
+            Assert.assertEquals("getListenerRegistrationActor", null,
+                    proxy.getListenerRegistrationActor());
+        }};
     }
 }
index 395021d361c6d4b210fcf29bf20707802b6c590a..d35c36fb0a2c883649ec7fcccc42ac981f20e256 100644 (file)
@@ -4,9 +4,11 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -14,15 +16,21 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
@@ -204,6 +212,68 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testChangeListenerRegistration() throws Exception{
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+            final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                                                                changeList = Lists.newArrayList();
+            final CountDownLatch changeLatch = new CountDownLatch(3);
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+                    new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+                @Override
+                public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
+                                                               NormalizedNode<?, ?>> change) {
+                    changeList.add(change);
+                    changeLatch.countDown();
+                }
+            };
+
+            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                    listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+                            DataChangeScope.SUBTREE);
+
+            assertNotNull("registerChangeListener returned null", listenerReg);
+
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+            YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+                    nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+            testWriteTransaction(dataStore, listPath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+            assertEquals("Change notifications complete", true,
+                    Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+            assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
+                    changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
+
+            assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
+                    changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
+
+            assertTrue("Change 3 does not contain " + listPath,
+                    changeList.get(2).getCreatedData().containsKey(listPath));
+
+            listenerReg.close();
+
+            testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+                    nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+            assertEquals("Received unexpected change after close", 3, changeList.size());
+
+            cleanup(dataStore);
+        }};
+    }
+
     class IntegrationTestKit extends ShardTestKit {
 
         IntegrationTestKit(ActorSystem actorSystem) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
deleted file mode 100644 (file)
index 00243ea..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DistributedDataStoreTest extends AbstractActorTest{
-
-    private DistributedDataStore distributedDataStore;
-    private MockActorContext mockActorContext;
-    private ActorRef doNothingActorRef;
-
-    @Before
-    public void setUp() throws Exception {
-        ShardStrategyFactory.setConfiguration(new MockConfiguration());
-        final Props props = Props.create(DoNothingActor.class);
-
-        doNothingActorRef = getSystem().actorOf(props);
-
-        mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
-        distributedDataStore = new DistributedDataStore(mockActorContext);
-        distributedDataStore.onGlobalContextUpdated(
-            TestModel.createTestContext());
-
-        // Make CreateTransactionReply as the default response. Will need to be
-        // tuned if a specific test requires some other response
-        mockActorContext.setExecuteShardOperationResponse(
-            CreateTransactionReply.newBuilder()
-                .setTransactionActorPath(doNothingActorRef.path().toString())
-                .setTransactionId("txn-1 ")
-                .build());
-    }
-
-    @After
-    public void tearDown() throws Exception {
-
-    }
-
-    @SuppressWarnings("resource")
-    @Test
-    public void testConstructor(){
-        ActorSystem actorSystem = mock(ActorSystem.class);
-
-        new DistributedDataStore(actorSystem, "config",
-            mock(ClusterWrapper.class), mock(Configuration.class),
-            DatastoreContext.newBuilder().build());
-
-        verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
-
-        ListenerRegistration registration =
-                distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-                    @Override
-                    public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-                        throw new UnsupportedOperationException("onDataChanged");
-                    }
-                }, AsyncDataBroker.DataChangeScope.BASE);
-
-        // Since we do not expect the shard to be local registration will return a NoOpRegistration
-        assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
-
-        assertNotNull(registration);
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
-        ActorContext actorContext = mock(ActorContext.class);
-
-        distributedDataStore = new DistributedDataStore(actorContext);
-        distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
-
-        Future future = mock(Future.class);
-        when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
-        when(actorContext.getActorSystem()).thenReturn(getSystem());
-        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
-        when(actorContext
-                .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
-
-        ListenerRegistration registration =
-            distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
-                mock(AsyncDataChangeListener.class),
-                AsyncDataBroker.DataChangeScope.BASE);
-
-        assertNotNull(registration);
-
-        assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
-        ActorContext actorContext = mock(ActorContext.class);
-
-        distributedDataStore = new DistributedDataStore(actorContext);
-        distributedDataStore.onGlobalContextUpdated(
-            TestModel.createTestContext());
-
-        ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
-        // Make Future successful
-        Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
-
-        // Setup the mocks
-        ActorSystem actorSystem = mock(ActorSystem.class);
-        ActorSelection actorSelection = mock(ActorSelection.class);
-
-        when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
-        when(actorSystem.dispatcher()).thenReturn(executor);
-        when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
-        when(actorContext.getActorSystem()).thenReturn(actorSystem);
-        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
-        when(actorContext
-            .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
-        when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
-        ListenerRegistration registration =
-            distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
-                mock(AsyncDataChangeListener.class),
-                AsyncDataBroker.DataChangeScope.BASE);
-
-        assertNotNull(registration);
-
-        assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
-        ActorSelection listenerRegistrationActor =
-            ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
-        assertNotNull(listenerRegistrationActor);
-
-        assertEquals(actorSelection, listenerRegistrationActor);
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
-        ActorContext actorContext = mock(ActorContext.class);
-
-        distributedDataStore = new DistributedDataStore(actorContext);
-        distributedDataStore.onGlobalContextUpdated(
-            TestModel.createTestContext());
-
-        ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
-        // Make Future fail
-        Future f = Futures.failed(new IllegalArgumentException());
-
-        // Setup the mocks
-        ActorSystem actorSystem = mock(ActorSystem.class);
-        ActorSelection actorSelection = mock(ActorSelection.class);
-
-        when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
-        when(actorSystem.dispatcher()).thenReturn(executor);
-        when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
-        when(actorContext.getActorSystem()).thenReturn(actorSystem);
-        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
-        when(actorContext
-            .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
-        when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
-        ListenerRegistration registration =
-            distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
-                mock(AsyncDataChangeListener.class),
-                AsyncDataBroker.DataChangeScope.BASE);
-
-        assertNotNull(registration);
-
-        assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
-        ActorSelection listenerRegistrationActor =
-            ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
-        assertNull(listenerRegistrationActor);
-
-    }
-
-
-    @Test
-    public void testCreateTransactionChain() throws Exception {
-        final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
-        assertNotNull(transactionChain);
-    }
-
-    @Test
-    public void testNewReadOnlyTransaction() throws Exception {
-        final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
-        assertNotNull(transaction);
-    }
-
-    @Test
-    public void testNewWriteOnlyTransaction() throws Exception {
-        final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
-        assertNotNull(transaction);
-    }
-
-    @Test
-    public void testNewReadWriteTransaction() throws Exception {
-        final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
-        assertNotNull(transaction);
-    }
-}
index 5022d97997dfad32ef29ae16865a7e7dc3c2b6e1..c04dcf1534506609865e95e7b7a6789d3ab676eb 100644 (file)
@@ -2,9 +2,11 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import akka.japi.Creator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -29,6 +31,8 @@ import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashSet;
@@ -76,7 +80,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
 
             expectMsgEquals(duration("5 seconds"),
                     new PrimaryNotFound("non-existent").toSerializable());
@@ -91,25 +95,44 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
         }};
     }
 
     @Test
-    public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
+    public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+            // delayed until we send ActorInitialized.
+            Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
+                    new Timeout(5, TimeUnit.SECONDS));
+
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            Object resp = Await.result(future, duration("5 seconds"));
+            assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+        }};
+    }
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -117,7 +140,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindLocalShard("non-existent"), getRef());
+            shardManager.tell(new FindLocalShard("non-existent", false), getRef());
 
             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
 
@@ -133,7 +156,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
 
@@ -148,14 +171,32 @@ public class ShardManagerTest extends AbstractActorTest {
             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-            //shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+            // delayed until we send ActorInitialized.
+            Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+                    new Timeout(5, TimeUnit.SECONDS));
+
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            Object resp = Await.result(future, duration("5 seconds"));
+            assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+        }};
+    }
+
     @Test
     public void testOnReceiveMemberUp() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -163,7 +204,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 
-            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 
             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
                     PrimaryFound.SERIALIZABLE_CLASS));
@@ -180,13 +221,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 
-            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
 
-            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
         }};