From c2b14095fb5dbef1532dde5a1b40bd3b9111d019 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Thu, 26 Jun 2014 19:28:52 -0700 Subject: [PATCH] Implement DataChangeListener - Add a DataChangeReply to confirm receipt of a DataChange - Created an AbstractUntypedActor class from which all actors extend. Currently the only thing this base class does is uniformly log incoming messages. Going forward will automatically register actors for DeathWatch and such. Change-Id: Ibcc4179597023aa37b95641c0b666b3c650dc370 Signed-off-by: Moiz Raja --- .../datastore/AbstractUntypedActor.java | 26 +++++ .../cluster/datastore/DataChangeListener.java | 34 ++++-- .../DataChangeListenerRegistration.java | 5 +- .../datastore/DistributedDataStore.java | 2 +- .../cluster/datastore/ShardManager.java | 5 +- .../cluster/datastore/ShardTransaction.java | 7 +- .../datastore/ShardTransactionChain.java | 5 +- .../datastore/ThreePhaseCommitCohort.java | 7 +- .../datastore/messages/DataChangedReply.java | 12 +++ .../datastore/DataChangeListenerTest.java | 102 ++++++++++++++++++ 10 files changed, 179 insertions(+), 26 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java new file mode 100644 index 0000000000..aae468fbc6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +public abstract class AbstractUntypedActor extends UntypedActor { + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + @Override public void onReceive(Object message) throws Exception { + LOG.debug("Received message {}", message); + handleReceive(message); + LOG.debug("Done handling message {}", message); + } + + protected abstract void handleReceive(Object message) throws Exception; +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java index ba09d04025..fd4f9f75b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -9,19 +9,41 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.japi.Creator; +import org.opendaylight.controller.cluster.datastore.messages.DataChanged; +import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class DataChangeListener extends UntypedActor { - @Override public void onReceive(Object message) throws Exception { - throw new UnsupportedOperationException("onReceive"); +public class DataChangeListener extends AbstractUntypedActor { + private final AsyncDataChangeListener> listener; + + public DataChangeListener( + AsyncDataChangeListener> listener) { + this.listener = listener; + } + + @Override public void handleReceive(Object message) throws Exception { + if(message instanceof DataChanged){ + DataChanged reply = (DataChanged) message; + AsyncDataChangeEvent> + change = reply.getChange(); + this.listener.onDataChanged(change); + + if(getSender() != null){ + getSender().tell(new DataChangedReply(), getSelf()); + } + + } } - public static Props props() { + public static Props props(final AsyncDataChangeListener> listener) { return Props.create(new Creator() { @Override public DataChangeListener create() throws Exception { - return new DataChangeListener(); + return new DataChangeListener(listener); } }); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java index e90d53c69c..c2eab0df44 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration; import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply; @@ -17,7 +16,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class DataChangeListenerRegistration extends UntypedActor{ +public class DataChangeListenerRegistration extends AbstractUntypedActor{ private final org.opendaylight.yangtools.concepts.ListenerRegistration>> registration; @@ -27,7 +26,7 @@ public class DataChangeListenerRegistration extends UntypedActor{ } @Override - public void onReceive(Object message) throws Exception { + public void handleReceive(Object message) throws Exception { if(message instanceof CloseDataChangeListenerRegistration){ closeListenerRegistration((CloseDataChangeListenerRegistration) message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 3c760f35b8..58b22a9970 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -57,7 +57,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au AsyncDataBroker.DataChangeScope scope) { ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataChangeListener.props()); + DataChangeListener.props(listener)); Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new RegisterChangeListener(path, dataChangeListenerActor.path(), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 4e2369d375..79e90c3fc9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -12,7 +12,6 @@ import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -48,7 +47,7 @@ import java.util.Map; *
  • When a local shard replica comes alive *

    */ -public class ShardManager extends UntypedActor { +public class ShardManager extends AbstractUntypedActor { // Stores a mapping between a shard name and the address of the current primary private final Map shardNameToPrimaryAddress = new HashMap<>(); @@ -84,7 +83,7 @@ public class ShardManager extends UntypedActor { } @Override - public void onReceive(Object message) throws Exception { + public void handleReceive(Object message) throws Exception { if (message instanceof FindPrimary) { FindPrimary msg = ((FindPrimary) message); String shardName = msg.getShardName(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index a2da063e55..ff02bfbcce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -65,7 +64,7 @@ import java.util.concurrent.ExecutionException; *
  • {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction} *

    */ -public class ShardTransaction extends UntypedActor { +public class ShardTransaction extends AbstractUntypedActor { private final ActorRef shardActor; @@ -120,9 +119,7 @@ public class ShardTransaction extends UntypedActor { @Override - public void onReceive(Object message) throws Exception { - log.debug("Received message {}", message); - + public void handleReceive(Object message) throws Exception { if (message instanceof ReadData) { readData((ReadData) message); } else if (message instanceof WriteData) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 6c14f1d8d7..57c935b0ad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply; @@ -22,7 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; /** * The ShardTransactionChain Actor represents a remote TransactionChain */ -public class ShardTransactionChain extends UntypedActor{ +public class ShardTransactionChain extends AbstractUntypedActor{ private final DOMStoreTransactionChain chain; @@ -31,7 +30,7 @@ public class ShardTransactionChain extends UntypedActor{ } @Override - public void onReceive(Object message) throws Exception { + public void handleReceive(Object message) throws Exception { if(message instanceof CreateTransaction){ DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction(); ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent())); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index b10bf1d9fc..060c9d6b50 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -29,7 +28,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import java.util.concurrent.ExecutionException; -public class ThreePhaseCommitCohort extends UntypedActor { +public class ThreePhaseCommitCohort extends AbstractUntypedActor { private final DOMStoreThreePhaseCommitCohort cohort; private final ActorRef shardActor; private final CompositeModification modification; @@ -58,9 +57,7 @@ public class ThreePhaseCommitCohort extends UntypedActor { @Override - public void onReceive(Object message) throws Exception { - log.debug("Received message {}", message); - + public void handleReceive(Object message) throws Exception { if (message instanceof CanCommitTransaction) { canCommit((CanCommitTransaction) message); } else if (message instanceof PreCommitTransaction) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java new file mode 100644 index 0000000000..3531021b94 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class DataChangedReply { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java new file mode 100644 index 0000000000..6f0816be5c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -0,0 +1,102 @@ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.DataChanged; +import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +public class DataChangeListenerTest extends AbstractActorTest { + + private static class MockDataChangedEvent implements AsyncDataChangeEvent> { + + @Override + public Map> getCreatedData() { + throw new UnsupportedOperationException("getCreatedData"); + } + + @Override + public Map> getUpdatedData() { + throw new UnsupportedOperationException("getUpdatedData"); + } + + @Override public Set getRemovedPaths() { + throw new UnsupportedOperationException("getRemovedPaths"); + } + + @Override + public Map> getOriginalData() { + throw new UnsupportedOperationException("getOriginalData"); + } + + @Override public NormalizedNode getOriginalSubtree() { + throw new UnsupportedOperationException("getOriginalSubtree"); + } + + @Override public NormalizedNode getUpdatedSubtree() { + throw new UnsupportedOperationException("getUpdatedSubtree"); + } + } + + private class MockDataChangeListener implements AsyncDataChangeListener> { + private boolean gotIt = false; + + @Override public void onDataChanged( + AsyncDataChangeEvent> change) { + gotIt = true; + } + + public boolean gotIt() { + return gotIt; + } + } + + @Test + public void testDataChanged(){ + new JavaTestKit(getSystem()) {{ + final MockDataChangeListener listener = new MockDataChangeListener(); + final Props props = DataChangeListener.props(listener); + final ActorRef subject = + getSystem().actorOf(props, "testDataChanged"); + + new Within(duration("1 seconds")) { + protected void run() { + + subject.tell( + new DataChanged(new MockDataChangedEvent()), + getRef()); + + final Boolean out = new ExpectMsg("dataChanged") { + // do not put code outside this method, will run afterwards + protected Boolean match(Object in) { + if (in instanceof DataChangedReply) { + DataChangedReply reply = + (DataChangedReply) in; + return true; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + assertTrue(out); + assertTrue(listener.gotIt()); + // Will wait for the rest of the 3 seconds + expectNoMsg(); + } + + + }; + }}; + } +} -- 2.36.6