Implement DataChangeListener 25/8425/5
authorMoiz Raja <moraja@cisco.com>
Fri, 27 Jun 2014 02:28:52 +0000 (19:28 -0700)
committerEd Warnicke <eaw@cisco.com>
Fri, 4 Jul 2014 20:26:44 +0000 (20:26 +0000)
- Add a DataChangeReply to confirm receipt of a DataChange
- Created an AbstractUntypedActor class from which all actors extend. Currently the only thing this base
  class does is uniformly log incoming messages. Going forward will automatically register actors for DeathWatch and such.

Change-Id: Ibcc4179597023aa37b95641c0b666b3c650dc370
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
new file mode 100644 (file)
index 0000000..aae468f
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+public abstract class AbstractUntypedActor extends UntypedActor {
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    @Override public void onReceive(Object message) throws Exception {
+        LOG.debug("Received message {}", message);
+        handleReceive(message);
+        LOG.debug("Done handling message {}", message);
+    }
+
+    protected abstract void handleReceive(Object message) throws Exception;
+}
index ba09d04..fd4f9f7 100644 (file)
@@ -9,19 +9,41 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class DataChangeListener extends UntypedActor {
-    @Override public void onReceive(Object message) throws Exception {
-        throw new UnsupportedOperationException("onReceive");
+public class DataChangeListener extends AbstractUntypedActor {
+    private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
+
+    public DataChangeListener(
+        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+        this.listener = listener;
+    }
+
+    @Override public void handleReceive(Object message) throws Exception {
+        if(message instanceof DataChanged){
+            DataChanged reply = (DataChanged) message;
+            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>>
+                change = reply.getChange();
+            this.listener.onDataChanged(change);
+
+            if(getSender() != null){
+                getSender().tell(new DataChangedReply(), getSelf());
+            }
+
+        }
     }
 
-    public static Props props() {
+    public static Props props(final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
         return Props.create(new Creator<DataChangeListener>() {
             @Override
             public DataChangeListener create() throws Exception {
-                return new DataChangeListener();
+                return new DataChangeListener(listener);
             }
 
         });
index e90d53c..c2eab0d 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
@@ -17,7 +16,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class DataChangeListenerRegistration extends UntypedActor{
+public class DataChangeListenerRegistration extends AbstractUntypedActor{
 
   private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
 
@@ -27,7 +26,7 @@ public class DataChangeListenerRegistration extends UntypedActor{
   }
 
   @Override
-  public void onReceive(Object message) throws Exception {
+  public void handleReceive(Object message) throws Exception {
     if(message instanceof CloseDataChangeListenerRegistration){
       closeListenerRegistration((CloseDataChangeListenerRegistration) message);
     }
index 3c760f3..58b22a9 100644 (file)
@@ -57,7 +57,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         AsyncDataBroker.DataChangeScope scope) {
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props());
+            DataChangeListener.props(listener));
 
         Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
index 4e2369d..79e90c3 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -48,7 +47,7 @@ import java.util.Map;
  * <li> When a local shard replica comes alive
  * </p>
  */
-public class ShardManager extends UntypedActor {
+public class ShardManager extends AbstractUntypedActor {
 
   // Stores a mapping between a shard name and the address of the current primary
   private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
@@ -84,7 +83,7 @@ public class ShardManager extends UntypedActor {
   }
 
   @Override
-  public void onReceive(Object message) throws Exception {
+  public void handleReceive(Object message) throws Exception {
     if (message instanceof FindPrimary) {
       FindPrimary msg = ((FindPrimary) message);
       String shardName = msg.getShardName();
index a2da063..ff02bfb 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -65,7 +64,7 @@ import java.util.concurrent.ExecutionException;
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
  * </p>
  */
-public class ShardTransaction extends UntypedActor {
+public class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
 
@@ -120,9 +119,7 @@ public class ShardTransaction extends UntypedActor {
 
 
     @Override
-    public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
-
+    public void handleReceive(Object message) throws Exception {
         if (message instanceof ReadData) {
             readData((ReadData) message);
         } else if (message instanceof WriteData) {
index 6c14f1d..57c935b 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -22,7 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 /**
  * The ShardTransactionChain Actor represents a remote TransactionChain
  */
-public class ShardTransactionChain extends UntypedActor{
+public class ShardTransactionChain extends AbstractUntypedActor{
 
   private final DOMStoreTransactionChain chain;
 
@@ -31,7 +30,7 @@ public class ShardTransactionChain extends UntypedActor{
   }
 
   @Override
-  public void onReceive(Object message) throws Exception {
+  public void handleReceive(Object message) throws Exception {
     if(message instanceof CreateTransaction){
       DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
       ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent()));
index b10bf1d..060c9d6 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -29,7 +28,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 
 import java.util.concurrent.ExecutionException;
 
-public class ThreePhaseCommitCohort extends UntypedActor {
+public class ThreePhaseCommitCohort extends AbstractUntypedActor {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final ActorRef shardActor;
     private final CompositeModification modification;
@@ -58,9 +57,7 @@ public class ThreePhaseCommitCohort extends UntypedActor {
 
 
     @Override
-    public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
-
+    public void handleReceive(Object message) throws Exception {
         if (message instanceof CanCommitTransaction) {
             canCommit((CanCommitTransaction) message);
         } else if (message instanceof PreCommitTransaction) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java
new file mode 100644 (file)
index 0000000..3531021
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class DataChangedReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
new file mode 100644 (file)
index 0000000..6f0816b
--- /dev/null
@@ -0,0 +1,102 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+public class DataChangeListenerTest extends AbstractActorTest {
+
+    private static class MockDataChangedEvent implements AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+        @Override
+        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+            throw new UnsupportedOperationException("getCreatedData");
+        }
+
+        @Override
+        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+            throw new UnsupportedOperationException("getUpdatedData");
+        }
+
+        @Override public Set<InstanceIdentifier> getRemovedPaths() {
+            throw new UnsupportedOperationException("getRemovedPaths");
+        }
+
+        @Override
+        public Map<InstanceIdentifier, ? extends NormalizedNode<?, ?>> getOriginalData() {
+            throw new UnsupportedOperationException("getOriginalData");
+        }
+
+        @Override public NormalizedNode<?, ?> getOriginalSubtree() {
+            throw new UnsupportedOperationException("getOriginalSubtree");
+        }
+
+        @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
+            throw new UnsupportedOperationException("getUpdatedSubtree");
+        }
+    }
+
+    private class MockDataChangeListener implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
+        private boolean gotIt = false;
+
+        @Override public void onDataChanged(
+            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+            gotIt = true;
+        }
+
+        public boolean gotIt() {
+            return gotIt;
+        }
+    }
+
+    @Test
+    public void testDataChanged(){
+        new JavaTestKit(getSystem()) {{
+            final MockDataChangeListener listener = new MockDataChangeListener();
+            final Props props = DataChangeListener.props(listener);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testDataChanged");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(
+                        new DataChanged(new MockDataChangedEvent()),
+                        getRef());
+
+                    final Boolean out = new ExpectMsg<Boolean>("dataChanged") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof DataChangedReply) {
+                                DataChangedReply reply =
+                                    (DataChangedReply) in;
+                                return true;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertTrue(out);
+                    assertTrue(listener.gotIt());
+                    // Will wait for the rest of the 3 seconds
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+}