Bug 2055: Handle shard not initialized resiliently 94/11994/9
authortpantelis <tpanteli@brocade.com>
Sat, 4 Oct 2014 06:37:00 +0000 (02:37 -0400)
committertpantelis <tpanteli@brocade.com>
Sun, 12 Oct 2014 10:10:18 +0000 (06:10 -0400)
Added a flag to the FindLocalShard and FindPrimaryShard messages to
specify whether or not to wait for he shard to be initialized.

Modified FindLocalShard and FindPrimaryShard message handling in the
ShardManager to wait for the shard to be initialized before replying if
the flag is set in the message. This is done async by caching the
response info in the ShardInformation and sending the response when the
ActorInitialized message for the shard is received.

Modified DistributedDataStore#registerChangeListener to always create a
DataChangeListenerRegistrationProxy and moved the code that performs the
findLocalShard and RegisterChangeListener operations into
DataChangeListenerRegistrationProxy. In DataChangeListenerRegistrationProxy,
the findLocalShard operation is done async (with a long time out). On success,
the RegisterChangeListener message is sent.

Added change listener test to DistributedDataStoreIntegrationTest and
removed the DistributedDataStoreTest since most of the tests tested
registration and the bulk of that logic is now in
DataChangeListenerRegistrationProxy with associated tests. The rest of
the tests in DistributedDataStoreTest are covered by
DistributedDataStoreIntegrationTest.

Change-Id: I9f0d412801110c97eb48ecc4d0fd77ee081a7e81
Signed-off-by: tpantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindLocalShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index acf630e2e95598e71fdbd786da628f3524a29408..b2ae060c3d30c219615432ad59d0b495f93e4294 100644 (file)
@@ -8,14 +8,27 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import scala.concurrent.Future;
 
 /**
  * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
@@ -24,25 +37,36 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  * </p>
  */
+@SuppressWarnings("rawtypes")
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
+
+    public static final Timeout REGISTER_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+
     private volatile ActorSelection listenerRegistrationActor;
-    private final AsyncDataChangeListener listener;
-    private final ActorRef dataChangeListenerActor;
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private ActorRef dataChangeListenerActor;
+    private final String shardName;
+    private final ActorContext actorContext;
     private boolean closed = false;
 
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        ActorSelection listenerRegistrationActor,
-        L listener, ActorRef dataChangeListenerActor) {
-        this.listenerRegistrationActor = listenerRegistrationActor;
+                                                              DataChangeListenerRegistrationProxy (
+            String shardName, ActorContext actorContext, L listener) {
+        this.shardName = shardName;
+        this.actorContext = actorContext;
         this.listener = listener;
-        this.dataChangeListenerActor = dataChangeListenerActor;
     }
 
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-    DataChangeListenerRegistrationProxy(
-        L listener, ActorRef dataChangeListenerActor) {
-        this(null, listener, dataChangeListenerActor);
+    @VisibleForTesting
+    ActorSelection getListenerRegistrationActor() {
+        return listenerRegistrationActor;
+    }
+
+    @VisibleForTesting
+    ActorRef getDataChangeListenerActor() {
+        return dataChangeListenerActor;
     }
 
     @Override
@@ -50,7 +74,11 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
         return listener;
     }
 
-    public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+    private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+        if(listenerRegistrationActor == null) {
+            return;
+        }
+
         boolean sendCloseMessage = false;
         synchronized(this) {
             if(closed) {
@@ -59,16 +87,55 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
                 this.listenerRegistrationActor = listenerRegistrationActor;
             }
         }
+
         if(sendCloseMessage) {
             listenerRegistrationActor.tell(new
                 CloseDataChangeListenerRegistration().toSerializable(), null);
         }
+    }
 
-        this.listenerRegistrationActor = listenerRegistrationActor;
+    public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+        dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+                DataChangeListener.props(listener));
+
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName, REGISTER_TIMEOUT);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable failure, ActorRef shard) {
+                if(failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+                            "cannot be registered", shardName, listener, path);
+                } else if(failure != null) {
+                    LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+                            "cannot be registered: {}", shardName, listener, path, failure);
+                } else {
+                    doRegistration(shard, path, scope);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
     }
 
-    public ActorSelection getListenerRegistrationActor() {
-        return listenerRegistrationActor;
+    private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+            DataChangeScope scope) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                REGISTER_TIMEOUT);
+
+        future.onComplete(new OnComplete<Object>(){
+            @Override
+            public void onComplete(Throwable failure, Object result) {
+                if(failure != null) {
+                    LOG.error("Failed to register DataChangeListener {} at path {}",
+                            listener, path.toString(), failure);
+                } else {
+                    RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath()));
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
     }
 
     @Override
@@ -79,11 +146,16 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             sendCloseMessage = !closed && listenerRegistrationActor != null;
             closed = true;
         }
+
         if(sendCloseMessage) {
-            listenerRegistrationActor.tell(new
-                CloseDataChangeListenerRegistration().toSerializable(), null);
+            listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(),
+                    ActorRef.noSender());
+            listenerRegistrationActor = null;
         }
 
-        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
+        if(dataChangeListenerActor != null) {
+            dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            dataChangeListenerActor = null;
+        }
     }
 }
index f6c31aab04c76a376e18dad96dae95aa9cffb637..2c73807dca858f754dd4521a390e0777d6ff60d6 100644 (file)
@@ -8,16 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -34,7 +28,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 /**
  *
@@ -83,39 +76,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
-
-        //if shard is NOT local
-        if (!shard.isPresent()) {
-            LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
-            return new NoOpDataChangeListenerRegistration(listener);
-        }
-        //if shard is local
-        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
-        Future future = actorContext.executeOperationAsync(shard.get(),
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-                new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
-
         final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
-                new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
-
-        future.onComplete(new OnComplete() {
-
-            @Override
-            public void onComplete(Throwable failure, Object result)
-                    throws Throwable {
-                if (failure != null) {
-                    LOG.error("Failed to register listener at path " + path.toString(), failure);
-                    return;
-                }
-                RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-                listenerRegistrationProxy.setListenerRegistrationActor(actorContext
-                        .actorSelection(reply.getListenerRegistrationPath()));
-            }
-        }, actorContext.getActorSystem().dispatcher());
+                new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+        listenerRegistrationProxy.init(path, scope);
 
         return listenerRegistrationProxy;
-
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java
deleted file mode 100644 (file)
index eb28159..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.UntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-
-public class NoOpCohort extends UntypedActor {
-
-    @Override public void onReceive(Object message) throws Exception {
-        if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new CanCommitTransactionReply(false).toSerializable(), getSelf());
-        } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(
-                new PreCommitTransactionReply().toSerializable(),
-                getSelf());
-        } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new CommitTransactionReply().toSerializable(), getSelf());
-        } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
-            getSender().tell(new AbortTransactionReply().toSerializable(), getSelf());
-        } else {
-            throw new Exception ("Not recognized message received,message="+message);
-        }
-
-    }
-}
-
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDataChangeListenerRegistration.java
deleted file mode 100644 (file)
index 14af31e..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * When a consumer registers a data change listener and no local shard is
- * available to register that listener with then we return an instance of
- * NoOpDataChangeListenerRegistration
- *
- * <p>
- *
- * The NoOpDataChangeListenerRegistration as it's name suggests does
- * nothing when an operation is invoked on it
- */
-public class NoOpDataChangeListenerRegistration
-    implements ListenerRegistration {
-
-    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-        listener;
-
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> NoOpDataChangeListenerRegistration(
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
-
-        this.listener = listener;
-    }
-
-    @Override
-    public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-        return listener;
-    }
-
-    @Override public void close() {
-
-    }
-}
index d0bb3d3b69824d18b2acdb08a4defb50d31953b2..d831b7c1844006fca5bea575ee20d656c4b616c4 100644 (file)
@@ -217,6 +217,10 @@ public class Shard extends RaftActor {
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+            // Even though recovery failed, we still need to finish our recovery, eg send the
+            // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+            onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
         }
@@ -699,12 +703,15 @@ public class Shard extends RaftActor {
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
-        // Schedule a message to be periodically sent to check if the current in-progress
-        // transaction should be expired and aborted.
-        FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
-        txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
-                period, period, getSelf(),
-                TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        // Being paranoid here - this method should only be called once but just in case...
+        if(txCommitTimeoutCheckSchedule == null) {
+            // Schedule a message to be periodically sent to check if the current in-progress
+            // transaction should be expired and aborted.
+            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+                    period, period, getSelf(),
+                    TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        }
     }
 
     @Override
index 157f1cb3771cd71ddd1ddf14d2541bef3a0aefc3..c7c382c82ffa2fc6db82664995d404320674c447 100644 (file)
@@ -25,6 +25,7 @@ import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -163,7 +164,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("Initializing shard [{}]", shardName);
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
-            shardInformation.setShardInitialized(true);
+            shardInformation.setActorInitialized();
         }
     }
 
@@ -192,7 +193,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -200,9 +201,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
-    private void sendResponse(ShardInformation shardInformation,  Supplier<Object> messageSupplier) {
-        if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
-            getSender().tell(new ActorNotInitialized(), getSelf());
+    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+            final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized()) {
+            if(waitUntilInitialized) {
+                final ActorRef sender = getSender();
+                final ActorRef self = self();
+                shardInformation.addRunnableOnInitialized(new Runnable() {
+                    @Override
+                    public void run() {
+                        sender.tell(messageSupplier.get(), self);
+                    }
+                });
+            } else {
+                getSender().tell(new ActorNotInitialized(), getSelf());
+            }
+
             return;
         }
 
@@ -277,7 +291,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            sendResponse(info, new Supplier<Object>() {
+            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
                 @Override
                 public Object get() {
                     return new PrimaryFound(info.getActorPath().toString()).toSerializable();
@@ -422,7 +436,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
-        private boolean shardInitialized = false; // flag that determines if the actor is ready for business
+
+        // flag that determines if the actor is ready for business
+        private boolean actorInitialized = false;
+
+        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<ShardIdentifier, String> peerAddresses) {
@@ -474,11 +492,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardInitialized() {
-            return shardInitialized;
+            return getActor() != null && actorInitialized;
+        }
+
+        void setActorInitialized() {
+            this.actorInitialized = true;
+
+            for(Runnable runnable: runnablesOnInitialized) {
+                runnable.run();
+            }
+
+            runnablesOnInitialized.clear();
         }
 
-        void setShardInitialized(boolean shardInitialized) {
-            this.shardInitialized = shardInitialized;
+        void addRunnableOnInitialized(Runnable runnable) {
+            runnablesOnInitialized.add(runnable);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/LocalShardNotFoundException.java
new file mode 100644 (file)
index 0000000..7a976b8
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception thrown when attempting to find a local shard but it doesn't exist.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalShardNotFoundException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public LocalShardNotFoundException(String message){
+        super(message);
+    }
+}
index c415db6efe2fd3da9677db7d16239408253ff2e2..b6560a8347d806e62d1717bea1bab5600238cec2 100644 (file)
@@ -14,12 +14,18 @@ package org.opendaylight.controller.cluster.datastore.messages;
  */
 public class FindLocalShard {
     private final String shardName;
+    private final boolean waitUntilInitialized;
 
-    public FindLocalShard(String shardName) {
+    public FindLocalShard(String shardName, boolean waitUntilInitialized) {
         this.shardName = shardName;
+        this.waitUntilInitialized = waitUntilInitialized;
     }
 
     public String getShardName() {
         return shardName;
     }
+
+    public boolean isWaitUntilInitialized() {
+        return waitUntilInitialized;
+    }
 }
index f5a6a348415308377d755011b133796659d3f27d..a34330bcf6864c26799aae0c6a48990dee1f6f82 100644 (file)
@@ -15,26 +15,33 @@ import com.google.common.base.Preconditions;
  *
  */
 public class FindPrimary implements SerializableMessage{
-  public static final Class SERIALIZABLE_CLASS = FindPrimary.class;
+    public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+
     private final String shardName;
+    private final boolean waitUntilInitialized;
 
-    public FindPrimary(String shardName){
+    public FindPrimary(String shardName, boolean waitUntilInitialized){
 
         Preconditions.checkNotNull(shardName, "shardName should not be null");
 
         this.shardName = shardName;
+        this.waitUntilInitialized = waitUntilInitialized;
     }
 
     public String getShardName() {
         return shardName;
     }
 
-  @Override
-  public Object toSerializable() {
-    return this;
-  }
+    public boolean isWaitUntilInitialized() {
+        return waitUntilInitialized;
+    }
 
-  public static FindPrimary fromSerializable(Object message){
-    return (FindPrimary) message;
-  }
+    @Override
+    public Object toSerializable() {
+        return this;
+    }
+
+    public static FindPrimary fromSerializable(Object message){
+        return (FindPrimary) message;
+    }
 }
index 314ae916de1444349816988089011d82b7cd9c8b..0a1e80b0cbaea069f3a75cb558bad130d7562dce 100644 (file)
@@ -13,17 +13,21 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
+import akka.dispatch.Mapper;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -115,14 +119,14 @@ public class ActorContext {
     }
 
     /**
-     * Finds a local shard given it's shard name and return it's ActorRef
+     * Finds a local shard given its shard name and return it's ActorRef
      *
      * @param shardName the name of the local shard that needs to be found
      * @return a reference to a local shard actor which represents the shard
      *         specified by the shardName
      */
     public Optional<ActorRef> findLocalShard(String shardName) {
-        Object result = executeOperation(shardManager, new FindLocalShard(shardName));
+        Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -133,9 +137,40 @@ public class ActorContext {
         return Optional.absent();
     }
 
+    /**
+     * Finds a local shard async given its shard name and return a Future from which to obtain the
+     * ActorRef.
+     *
+     * @param shardName the name of the local shard that needs to be found
+     */
+    public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+        Future<Object> future = executeOperationAsync(shardManager,
+                new FindLocalShard(shardName, true), timeout);
+
+        return future.map(new Mapper<Object, ActorRef>() {
+            @Override
+            public ActorRef checkedApply(Object response) throws Throwable {
+                if(response instanceof LocalShardFound) {
+                    LocalShardFound found = (LocalShardFound)response;
+                    LOG.debug("Local shard found {}", found.getPath());
+                    return found.getPath();
+                } else if(response instanceof ActorNotInitialized) {
+                    throw new NotInitializedException(
+                            String.format("Found local shard for %s but it's not initialized yet.",
+                                    shardName));
+                } else if(response instanceof LocalShardNotFound) {
+                    throw new LocalShardNotFoundException(
+                            String.format("Local shard for %s does not exist.", shardName));
+                }
+
+                throw new UnknownMessageException(String.format(
+                        "FindLocalShard returned unkown response: %s", response));
+            }
+        }, getActorSystem().dispatcher());
+    }
 
     private String findPrimaryPathOrNull(String shardName) {
-        Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
+        Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
index aaf080bdf7d8d50de4a3f31713143389994872a6..c27993f97b9fef669c03f0c378e83f8474944e96 100644 (file)
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
-import junit.framework.Assert;
+import akka.actor.Terminated;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
 
-import java.util.List;
+/**
+ * Unit tests for DataChangeListenerRegistrationProxy.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
 
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNotNull;
-import static junit.framework.TestCase.assertTrue;
+    @SuppressWarnings("unchecked")
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
+            Mockito.mock(AsyncDataChangeListener.class);
 
-public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+    @Test
+    public void testGetInstance() throws Exception {
+        DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                "shard", Mockito.mock(ActorContext.class), mockListener);
+
+        Assert.assertEquals(mockListener, proxy.getInstance());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout=10000)
+    public void testSuccessfulRegistration() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
+
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    "shard-1", actorContext, mockListener);
+
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardFound(getRef()));
+
+            RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
+            Assert.assertEquals("getPath", path, registerMsg.getPath());
+            Assert.assertEquals("getScope", scope, registerMsg.getScope());
+
+            reply(new RegisterChangeListenerReply(getRef().path()));
+
+            for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+
+            Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
+                    proxy.getListenerRegistrationActor());
+
+            watch(proxy.getDataChangeListenerActor());
 
-    private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+            proxy.close();
 
-    private static class MockDataChangeListener implements
-        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+            // The listener registration actor should get a Close message
+            expectMsgClass(timeout, CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
 
-        @Override public void onDataChanged(
-            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-            throw new UnsupportedOperationException("onDataChanged");
-        }
+            // The DataChangeListener actor should be terminated
+            expectMsgClass(timeout, Terminated.class);
+
+            proxy.close();
+
+            expectNoMsg();
+        }};
     }
 
-    @Test
-    public void testGetInstance() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    @Test(timeout=10000)
+    public void testLocalShardNotFound() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
 
-        MockDataChangeListener listener =
-            new MockDataChangeListener();
-        DataChangeListenerRegistrationProxy proxy =
-            new DataChangeListenerRegistrationProxy(
-                getSystem().actorSelection(actorRef.path()),
-                listener, dataChangeListenerActor);
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    "shard-1", actorContext, mockListener);
 
-        Assert.assertEquals(listener, proxy.getInstance());
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
 
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardNotFound("shard-1"));
+
+            expectNoMsg(duration("1 seconds"));
+        }};
     }
 
-    @Test
-    public void testClose() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    @Test(timeout=10000)
+    public void testLocalShardNotInitialized() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
 
-        DataChangeListenerRegistrationProxy proxy =
-            new DataChangeListenerRegistrationProxy(
-                getSystem().actorSelection(actorRef.path()),
-                new MockDataChangeListener(), dataChangeListenerActor);
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    "shard-1", actorContext, mockListener);
 
-        proxy.close();
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init(path, scope);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new ActorNotInitialized());
+
+            new Within(duration("1 seconds")) {
+                @Override
+                protected void run() {
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testFailedRegistration() {
+        new JavaTestKit(getSystem()) {{
+            ActorSystem mockActorSystem = mock(ActorSystem.class);
 
-        //Check if it was received by the remote actor
-        ActorContext
-            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
-        Object messages = testContext
-            .executeOperation(actorRef, "messages");
+            ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
+                    "testFailedRegistration");
+            doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
+            ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
+                    MoreExecutors.sameThreadExecutor());
+            doReturn(executor).when(mockActorSystem).dispatcher();
 
-        assertNotNull(messages);
+            ActorContext actorContext = mock(ActorContext.class);
 
-        assertTrue(messages instanceof List);
+            String shardName = "shard-1";
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    shardName, actorContext, mockListener);
 
-        List<Object> listMessages = (List<Object>) messages;
+            doReturn(mockActorSystem).when(actorContext).getActorSystem();
+            doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+            doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+                    any(Timeout.class));
+            doReturn(Futures.failed(new RuntimeException("mock"))).
+                    when(actorContext).executeOperationAsync(any(ActorRef.class),
+                            any(Object.class), any(Timeout.class));
 
-        assertEquals(1, listMessages.size());
+            proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+                    AsyncDataBroker.DataChangeScope.ONE);
 
-        assertTrue(listMessages.get(0).getClass()
-            .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
+            Assert.assertEquals("getListenerRegistrationActor", null,
+                    proxy.getListenerRegistrationActor());
+        }};
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testCloseWhenRegistrationIsNull() throws Exception {
-        final Props props = Props.create(MessageCollectorActor.class);
-        final ActorRef actorRef = getSystem().actorOf(props);
+    public void testCloseBeforeRegistration() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = mock(ActorContext.class);
 
-        DataChangeListenerRegistrationProxy proxy =
-            new DataChangeListenerRegistrationProxy(
-                new MockDataChangeListener(), dataChangeListenerActor);
+            String shardName = "shard-1";
+            final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
+                    shardName, actorContext, mockListener);
 
-        proxy.close();
+            doReturn(getSystem()).when(actorContext).getActorSystem();
+            doReturn(getSystem().actorSelection(getRef().path())).
+                    when(actorContext).actorSelection(getRef().path());
+            doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
+            doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName),
+                    any(Timeout.class));
 
-        //Check if it was received by the remote actor
-        ActorContext
-            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
-        Object messages = testContext
-            .executeOperation(actorRef, "messages");
+            Answer<Future<Object>> answer = new Answer<Future<Object>>() {
+                @Override
+                public Future<Object> answer(InvocationOnMock invocation) {
+                    proxy.close();
+                    return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+                }
+            };
 
-        assertNotNull(messages);
+            doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
+                    any(Object.class), any(Timeout.class));
 
-        assertTrue(messages instanceof List);
+            proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
+                    AsyncDataBroker.DataChangeScope.ONE);
 
-        List<Object> listMessages = (List<Object>) messages;
+            expectMsgClass(duration("5 seconds"), CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS);
 
-        assertEquals(0, listMessages.size());
+            Assert.assertEquals("getListenerRegistrationActor", null,
+                    proxy.getListenerRegistrationActor());
+        }};
     }
 }
index 395021d361c6d4b210fcf29bf20707802b6c590a..d35c36fb0a2c883649ec7fcccc42ac981f20e256 100644 (file)
@@ -4,9 +4,11 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -14,15 +16,21 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
@@ -204,6 +212,68 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testChangeListenerRegistration() throws Exception{
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+            final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                                                                changeList = Lists.newArrayList();
+            final CountDownLatch changeLatch = new CountDownLatch(3);
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+                    new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+                @Override
+                public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
+                                                               NormalizedNode<?, ?>> change) {
+                    changeList.add(change);
+                    changeLatch.countDown();
+                }
+            };
+
+            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                    listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+                            DataChangeScope.SUBTREE);
+
+            assertNotNull("registerChangeListener returned null", listenerReg);
+
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+            YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+                    nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+            testWriteTransaction(dataStore, listPath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+            assertEquals("Change notifications complete", true,
+                    Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+            assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
+                    changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
+
+            assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
+                    changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
+
+            assertTrue("Change 3 does not contain " + listPath,
+                    changeList.get(2).getCreatedData().containsKey(listPath));
+
+            listenerReg.close();
+
+            testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+                    nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+            assertEquals("Received unexpected change after close", 3, changeList.size());
+
+            cleanup(dataStore);
+        }};
+    }
+
     class IntegrationTestKit extends ShardTestKit {
 
         IntegrationTestKit(ActorSystem actorSystem) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
deleted file mode 100644 (file)
index 00243ea..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DistributedDataStoreTest extends AbstractActorTest{
-
-    private DistributedDataStore distributedDataStore;
-    private MockActorContext mockActorContext;
-    private ActorRef doNothingActorRef;
-
-    @Before
-    public void setUp() throws Exception {
-        ShardStrategyFactory.setConfiguration(new MockConfiguration());
-        final Props props = Props.create(DoNothingActor.class);
-
-        doNothingActorRef = getSystem().actorOf(props);
-
-        mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
-        distributedDataStore = new DistributedDataStore(mockActorContext);
-        distributedDataStore.onGlobalContextUpdated(
-            TestModel.createTestContext());
-
-        // Make CreateTransactionReply as the default response. Will need to be
-        // tuned if a specific test requires some other response
-        mockActorContext.setExecuteShardOperationResponse(
-            CreateTransactionReply.newBuilder()
-                .setTransactionActorPath(doNothingActorRef.path().toString())
-                .setTransactionId("txn-1 ")
-                .build());
-    }
-
-    @After
-    public void tearDown() throws Exception {
-
-    }
-
-    @SuppressWarnings("resource")
-    @Test
-    public void testConstructor(){
-        ActorSystem actorSystem = mock(ActorSystem.class);
-
-        new DistributedDataStore(actorSystem, "config",
-            mock(ClusterWrapper.class), mock(Configuration.class),
-            DatastoreContext.newBuilder().build());
-
-        verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
-
-        ListenerRegistration registration =
-                distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-                    @Override
-                    public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-                        throw new UnsupportedOperationException("onDataChanged");
-                    }
-                }, AsyncDataBroker.DataChangeScope.BASE);
-
-        // Since we do not expect the shard to be local registration will return a NoOpRegistration
-        assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
-
-        assertNotNull(registration);
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
-        ActorContext actorContext = mock(ActorContext.class);
-
-        distributedDataStore = new DistributedDataStore(actorContext);
-        distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
-
-        Future future = mock(Future.class);
-        when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
-        when(actorContext.getActorSystem()).thenReturn(getSystem());
-        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
-        when(actorContext
-                .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
-
-        ListenerRegistration registration =
-            distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
-                mock(AsyncDataChangeListener.class),
-                AsyncDataBroker.DataChangeScope.BASE);
-
-        assertNotNull(registration);
-
-        assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception {
-        ActorContext actorContext = mock(ActorContext.class);
-
-        distributedDataStore = new DistributedDataStore(actorContext);
-        distributedDataStore.onGlobalContextUpdated(
-            TestModel.createTestContext());
-
-        ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
-        // Make Future successful
-        Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path()));
-
-        // Setup the mocks
-        ActorSystem actorSystem = mock(ActorSystem.class);
-        ActorSelection actorSelection = mock(ActorSelection.class);
-
-        when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
-        when(actorSystem.dispatcher()).thenReturn(executor);
-        when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
-        when(actorContext.getActorSystem()).thenReturn(actorSystem);
-        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
-        when(actorContext
-            .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
-        when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
-        ListenerRegistration registration =
-            distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
-                mock(AsyncDataChangeListener.class),
-                AsyncDataBroker.DataChangeScope.BASE);
-
-        assertNotNull(registration);
-
-        assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
-        ActorSelection listenerRegistrationActor =
-            ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
-        assertNotNull(listenerRegistrationActor);
-
-        assertEquals(actorSelection, listenerRegistrationActor);
-    }
-
-    @Test
-    public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception {
-        ActorContext actorContext = mock(ActorContext.class);
-
-        distributedDataStore = new DistributedDataStore(actorContext);
-        distributedDataStore.onGlobalContextUpdated(
-            TestModel.createTestContext());
-
-        ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor());
-
-        // Make Future fail
-        Future f = Futures.failed(new IllegalArgumentException());
-
-        // Setup the mocks
-        ActorSystem actorSystem = mock(ActorSystem.class);
-        ActorSelection actorSelection = mock(ActorSelection.class);
-
-        when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
-        when(actorSystem.dispatcher()).thenReturn(executor);
-        when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
-        when(actorContext.getActorSystem()).thenReturn(actorSystem);
-        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
-        when(actorContext
-            .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
-        when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
-
-        ListenerRegistration registration =
-            distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
-                mock(AsyncDataChangeListener.class),
-                AsyncDataBroker.DataChangeScope.BASE);
-
-        assertNotNull(registration);
-
-        assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass());
-
-        ActorSelection listenerRegistrationActor =
-            ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor();
-
-        assertNull(listenerRegistrationActor);
-
-    }
-
-
-    @Test
-    public void testCreateTransactionChain() throws Exception {
-        final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
-        assertNotNull(transactionChain);
-    }
-
-    @Test
-    public void testNewReadOnlyTransaction() throws Exception {
-        final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
-        assertNotNull(transaction);
-    }
-
-    @Test
-    public void testNewWriteOnlyTransaction() throws Exception {
-        final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
-        assertNotNull(transaction);
-    }
-
-    @Test
-    public void testNewReadWriteTransaction() throws Exception {
-        final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
-        assertNotNull(transaction);
-    }
-}
index 5022d97997dfad32ef29ae16865a7e7dc3c2b6e1..c04dcf1534506609865e95e7b7a6789d3ab676eb 100644 (file)
@@ -2,9 +2,11 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import akka.japi.Creator;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -29,6 +31,8 @@ import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashSet;
@@ -76,7 +80,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary("non-existent").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
 
             expectMsgEquals(duration("5 seconds"),
                     new PrimaryNotFound("non-existent").toSerializable());
@@ -91,25 +95,44 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
         }};
     }
 
     @Test
-    public void testOnReceiveFindPrimaryForNotInitialzedShard() throws Exception {
+    public void testOnReceiveFindPrimaryForNotInitializedShard() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindPrimaryWaitForShardInitialized() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            // We're passing waitUntilInitialized = true to FindPrimary so the response should be
+            // delayed until we send ActorInitialized.
+            Future<Object> future = Patterns.ask(shardManager, new FindPrimary(Shard.DEFAULT_NAME, true),
+                    new Timeout(5, TimeUnit.SECONDS));
+
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            Object resp = Await.result(future, duration("5 seconds"));
+            assertTrue("Expected: PrimaryFound, Actual: " + resp, resp instanceof PrimaryFound);
+        }};
+    }
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -117,7 +140,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindLocalShard("non-existent"), getRef());
+            shardManager.tell(new FindLocalShard("non-existent", false), getRef());
 
             LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
 
@@ -133,7 +156,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
             LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class);
 
@@ -148,14 +171,32 @@ public class ShardManagerTest extends AbstractActorTest {
             final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
-            //shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            // We're passing waitUntilInitialized = true to FindLocalShard so the response should be
+            // delayed until we send ActorInitialized.
+            Future<Object> future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true),
+                    new Timeout(5, TimeUnit.SECONDS));
+
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+
+            Object resp = Await.result(future, duration("5 seconds"));
+            assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound);
+        }};
+    }
+
     @Test
     public void testOnReceiveMemberUp() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -163,7 +204,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 
-            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 
             PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
                     PrimaryFound.SERIALIZABLE_CLASS));
@@ -180,13 +221,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
             MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 
-            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 
             MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
 
-            shardManager.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 
             expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
         }};