From: Moiz Raja Date: Mon, 8 Sep 2014 00:39:45 +0000 (-0700) Subject: BUG 1735 Registering a data change listener should be asynchronous X-Git-Tag: release/helium~108^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9f61e98b036119694dfef0759a7cafc56aae6e86 BUG 1735 Registering a data change listener should be asynchronous Change-Id: I1a819976afe6ca44ac811ee30d305fbe76bb8acf Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java index e3cdbb4ee1..acf630e2e9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java @@ -25,9 +25,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; *

*/ public class DataChangeListenerRegistrationProxy implements ListenerRegistration { - private final ActorSelection listenerRegistrationActor; + private volatile ActorSelection listenerRegistrationActor; private final AsyncDataChangeListener listener; private final ActorRef dataChangeListenerActor; + private boolean closed = false; public >> DataChangeListenerRegistrationProxy( @@ -38,14 +39,51 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration this.dataChangeListenerActor = dataChangeListenerActor; } + public >> + DataChangeListenerRegistrationProxy( + L listener, ActorRef dataChangeListenerActor) { + this(null, listener, dataChangeListenerActor); + } + @Override public Object getInstance() { return listener; } + public void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) { + boolean sendCloseMessage = false; + synchronized(this) { + if(closed) { + sendCloseMessage = true; + } else { + this.listenerRegistrationActor = listenerRegistrationActor; + } + } + if(sendCloseMessage) { + listenerRegistrationActor.tell(new + CloseDataChangeListenerRegistration().toSerializable(), null); + } + + this.listenerRegistrationActor = listenerRegistrationActor; + } + + public ActorSelection getListenerRegistrationActor() { + return listenerRegistrationActor; + } + @Override public void close() { - listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(), null); + + boolean sendCloseMessage; + synchronized(this) { + sendCloseMessage = !closed && listenerRegistrationActor != null; + closed = true; + } + if(sendCloseMessage) { + listenerRegistrationActor.tell(new + CloseDataChangeListenerRegistration().toSerializable(), null); + } + dataChangeListenerActor.tell(PoisonPill.getInstance(), null); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index db01d51535..bf541d95de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSystem; - +import akka.dispatch.OnComplete; +import akka.util.Timeout; import com.google.common.base.Preconditions; - import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; @@ -32,6 +32,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; /** * @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; public class DistributedDataStore implements DOMStore, SchemaContextListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); + public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout private final ActorContext actorContext; @@ -69,7 +71,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au @Override public >> ListenerRegistration registerChangeListener( - YangInstanceIdentifier path, L listener, + final YangInstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { Preconditions.checkNotNull(path, "path should not be null"); @@ -82,14 +84,29 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); - Object result = actorContext.executeLocalShardOperation(shardName, - new RegisterChangeListener(path, dataChangeListenerActor.path(), scope)); - - if (result != null) { - RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; - return new DataChangeListenerRegistrationProxy(actorContext - .actorSelection(reply.getListenerRegistrationPath()), listener, - dataChangeListenerActor); + Future future = actorContext.executeLocalShardOperationAsync(shardName, + new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + new Timeout(actorContext.getOperationDuration().$times( + REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR))); + + if (future != null) { + final DataChangeListenerRegistrationProxy listenerRegistrationProxy = + new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor); + + future.onComplete(new OnComplete(){ + + @Override public void onComplete(Throwable failure, Object result) + throws Throwable { + if(failure != null){ + LOG.error("Failed to register listener at path " + path.toString(), failure); + return; + } + RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; + listenerRegistrationProxy.setListenerRegistrationActor(actorContext + .actorSelection(reply.getListenerRegistrationPath())); + } + }, actorContext.getActorSystem().dispatcher()); + return listenerRegistrationProxy; } LOG.debug( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index c989b275df..7b5588cb19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -13,8 +13,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; +import akka.pattern.Patterns; import akka.util.Timeout; - import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -27,7 +27,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -266,6 +265,30 @@ public class ActorContext { } + /** + * Execute an operation on the the local shard only asynchronously + * + *

+ * This method first finds the address of the local shard if any. It then + * executes the operation on it. + *

+ * + * @param shardName the name of the shard on which the operation needs to be executed + * @param message the message that needs to be sent to the shard + * @param timeout the amount of time that this method should wait for a response before timing out + * @return null if the shard could not be located else a future on which the caller can wait + * + */ + public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) { + ActorRef local = findLocalShard(shardName); + if(local == null){ + return null; + } + return Patterns.ask(local, message, timeout); + } + + + public void shutdown() { shardManager.tell(PoisonPill.getInstance(), null); actorSystem.shutdown(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index 3d0aaa0082..ab3ff795d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -17,6 +17,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import java.util.List; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; + public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class)); @@ -64,14 +68,41 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ Object messages = testContext .executeLocalOperation(actorRef, "messages"); - Assert.assertNotNull(messages); + assertNotNull(messages); - Assert.assertTrue(messages instanceof List); + assertTrue(messages instanceof List); List listMessages = (List) messages; - Assert.assertEquals(1, listMessages.size()); + assertEquals(1, listMessages.size()); + + assertTrue(listMessages.get(0).getClass() + .equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS)); + } + + @Test + public void testCloseWhenRegistrationIsNull() throws Exception { + final Props props = Props.create(MessageCollectorActor.class); + final ActorRef actorRef = getSystem().actorOf(props); + + DataChangeListenerRegistrationProxy proxy = + new DataChangeListenerRegistrationProxy( + new MockDataChangeListener(), dataChangeListenerActor); + + proxy.close(); + + //Check if it was received by the remote actor + ActorContext + testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration()); + Object messages = testContext + .executeLocalOperation(actorRef, "messages"); + + assertNotNull(messages); + + assertTrue(messages instanceof List); + + List listMessages = (List) messages; - Assert.assertTrue(listMessages.get(0).getClass().equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS)); + assertEquals(0, listMessages.size()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index aeb47de888..08c3ea9602 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -1,14 +1,20 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; - +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.util.Timeout; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; @@ -24,13 +30,23 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DistributedDataStoreTest extends AbstractActorTest{ @@ -95,20 +111,108 @@ public class DistributedDataStoreTest extends AbstractActorTest{ @Test public void testRegisterChangeListenerWhenShardIsLocal() throws Exception { + ActorContext actorContext = mock(ActorContext.class); + + distributedDataStore = new DistributedDataStore(actorContext); + distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); - mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path())); + Future future = mock(Future.class); + when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS)); + when(actorContext.getActorSystem()).thenReturn(getSystem()); + when(actorContext + .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(future); ListenerRegistration registration = - distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener>() { - @Override - public void onDataChanged(AsyncDataChangeEvent> change) { - throw new UnsupportedOperationException("onDataChanged"); - } - }, AsyncDataBroker.DataChangeScope.BASE); + distributedDataStore.registerChangeListener(TestModel.TEST_PATH, + mock(AsyncDataChangeListener.class), + AsyncDataBroker.DataChangeScope.BASE); - assertTrue(registration instanceof DataChangeListenerRegistrationProxy); + assertNotNull(registration); + + assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass()); + } + + @Test + public void testRegisterChangeListenerWhenSuccessfulReplyReceived() throws Exception { + ActorContext actorContext = mock(ActorContext.class); + + distributedDataStore = new DistributedDataStore(actorContext); + distributedDataStore.onGlobalContextUpdated( + TestModel.createTestContext()); + + ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor()); + + // Make Future successful + Future f = Futures.successful(new RegisterChangeListenerReply(doNothingActorRef.path())); + + // Setup the mocks + ActorSystem actorSystem = mock(ActorSystem.class); + ActorSelection actorSelection = mock(ActorSelection.class); + + when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS)); + when(actorSystem.dispatcher()).thenReturn(executor); + when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef); + when(actorContext.getActorSystem()).thenReturn(actorSystem); + when(actorContext + .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f); + when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection); + + ListenerRegistration registration = + distributedDataStore.registerChangeListener(TestModel.TEST_PATH, + mock(AsyncDataChangeListener.class), + AsyncDataBroker.DataChangeScope.BASE); assertNotNull(registration); + + assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass()); + + ActorSelection listenerRegistrationActor = + ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor(); + + assertNotNull(listenerRegistrationActor); + + assertEquals(actorSelection, listenerRegistrationActor); + } + + @Test + public void testRegisterChangeListenerWhenSuccessfulReplyFailed() throws Exception { + ActorContext actorContext = mock(ActorContext.class); + + distributedDataStore = new DistributedDataStore(actorContext); + distributedDataStore.onGlobalContextUpdated( + TestModel.createTestContext()); + + ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(MoreExecutors.sameThreadExecutor()); + + // Make Future fail + Future f = Futures.failed(new IllegalArgumentException()); + + // Setup the mocks + ActorSystem actorSystem = mock(ActorSystem.class); + ActorSelection actorSelection = mock(ActorSelection.class); + + when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS)); + when(actorSystem.dispatcher()).thenReturn(executor); + when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef); + when(actorContext.getActorSystem()).thenReturn(actorSystem); + when(actorContext + .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f); + when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection); + + ListenerRegistration registration = + distributedDataStore.registerChangeListener(TestModel.TEST_PATH, + mock(AsyncDataChangeListener.class), + AsyncDataBroker.DataChangeScope.BASE); + + assertNotNull(registration); + + assertEquals(DataChangeListenerRegistrationProxy.class, registration.getClass()); + + ActorSelection listenerRegistrationActor = + ((DataChangeListenerRegistrationProxy) registration).getListenerRegistrationActor(); + + assertNull(listenerRegistrationActor); + }