Merge "Implement DataChangeListener"
authorEd Warnicke <eaw@cisco.com>
Fri, 4 Jul 2014 21:46:23 +0000 (21:46 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 4 Jul 2014 21:46:23 +0000 (21:46 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
new file mode 100644 (file)
index 0000000..aae468f
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+
+public abstract class AbstractUntypedActor extends UntypedActor {
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    @Override public void onReceive(Object message) throws Exception {
+        LOG.debug("Received message {}", message);
+        handleReceive(message);
+        LOG.debug("Done handling message {}", message);
+    }
+
+    protected abstract void handleReceive(Object message) throws Exception;
+}
index ba09d0402530921667915eae11103302cfcfb2ea..fd4f9f75b5bd801aaa1fd9484c3e0b01a8d522ae 100644 (file)
@@ -9,19 +9,41 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.japi.Creator;
+import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class DataChangeListener extends UntypedActor {
-    @Override public void onReceive(Object message) throws Exception {
-        throw new UnsupportedOperationException("onReceive");
+public class DataChangeListener extends AbstractUntypedActor {
+    private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
+
+    public DataChangeListener(
+        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+        this.listener = listener;
+    }
+
+    @Override public void handleReceive(Object message) throws Exception {
+        if(message instanceof DataChanged){
+            DataChanged reply = (DataChanged) message;
+            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>>
+                change = reply.getChange();
+            this.listener.onDataChanged(change);
+
+            if(getSender() != null){
+                getSender().tell(new DataChangedReply(), getSelf());
+            }
+
+        }
     }
 
-    public static Props props() {
+    public static Props props(final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
         return Props.create(new Creator<DataChangeListener>() {
             @Override
             public DataChangeListener create() throws Exception {
-                return new DataChangeListener();
+                return new DataChangeListener(listener);
             }
 
         });
index e90d53c69c2947cb7a1a7fa072c3e3fdf1892c0c..c2eab0df440689ef22fb6743a03449c640a8c605 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
@@ -17,7 +16,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class DataChangeListenerRegistration extends UntypedActor{
+public class DataChangeListenerRegistration extends AbstractUntypedActor{
 
   private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
 
@@ -27,7 +26,7 @@ public class DataChangeListenerRegistration extends UntypedActor{
   }
 
   @Override
-  public void onReceive(Object message) throws Exception {
+  public void handleReceive(Object message) throws Exception {
     if(message instanceof CloseDataChangeListenerRegistration){
       closeListenerRegistration((CloseDataChangeListenerRegistration) message);
     }
index 3c760f35b8a671be81239e48ef6936a1e065c065..58b22a9970cb452dd51f44e84b8b189f3f97e341 100644 (file)
@@ -57,7 +57,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         AsyncDataBroker.DataChangeScope scope) {
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props());
+            DataChangeListener.props(listener));
 
         Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
index 4e2369d3758596bd1217670f8f3ec5a2438db36d..79e90c3fc9f36436f12975b38fd2d89a356cf2d6 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -48,7 +47,7 @@ import java.util.Map;
  * <li> When a local shard replica comes alive
  * </p>
  */
-public class ShardManager extends UntypedActor {
+public class ShardManager extends AbstractUntypedActor {
 
   // Stores a mapping between a shard name and the address of the current primary
   private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
@@ -84,7 +83,7 @@ public class ShardManager extends UntypedActor {
   }
 
   @Override
-  public void onReceive(Object message) throws Exception {
+  public void handleReceive(Object message) throws Exception {
     if (message instanceof FindPrimary) {
       FindPrimary msg = ((FindPrimary) message);
       String shardName = msg.getShardName();
index a2da063e55d465ffd4c9bb256e9a257475d0e107..ff02bfbcce520be7a791a555cba318520d90e978 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -65,7 +64,7 @@ import java.util.concurrent.ExecutionException;
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
  * </p>
  */
-public class ShardTransaction extends UntypedActor {
+public class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
 
@@ -120,9 +119,7 @@ public class ShardTransaction extends UntypedActor {
 
 
     @Override
-    public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
-
+    public void handleReceive(Object message) throws Exception {
         if (message instanceof ReadData) {
             readData((ReadData) message);
         } else if (message instanceof WriteData) {
index 6c14f1d8d78560f202344b237703e284d144bd54..57c935b0ad8d045d3325fc28d412503d744b4a65 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -22,7 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 /**
  * The ShardTransactionChain Actor represents a remote TransactionChain
  */
-public class ShardTransactionChain extends UntypedActor{
+public class ShardTransactionChain extends AbstractUntypedActor{
 
   private final DOMStoreTransactionChain chain;
 
@@ -31,7 +30,7 @@ public class ShardTransactionChain extends UntypedActor{
   }
 
   @Override
-  public void onReceive(Object message) throws Exception {
+  public void handleReceive(Object message) throws Exception {
     if(message instanceof CreateTransaction){
       DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
       ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent()));
index b10bf1d9fc6b3a353a68c55d960c5955f5323aa1..060c9d6b509ab26f481a59c492da753ab70a2400 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
@@ -29,7 +28,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 
 import java.util.concurrent.ExecutionException;
 
-public class ThreePhaseCommitCohort extends UntypedActor {
+public class ThreePhaseCommitCohort extends AbstractUntypedActor {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final ActorRef shardActor;
     private final CompositeModification modification;
@@ -58,9 +57,7 @@ public class ThreePhaseCommitCohort extends UntypedActor {
 
 
     @Override
-    public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
-
+    public void handleReceive(Object message) throws Exception {
         if (message instanceof CanCommitTransaction) {
             canCommit((CanCommitTransaction) message);
         } else if (message instanceof PreCommitTransaction) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java
new file mode 100644 (file)
index 0000000..3531021
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class DataChangedReply {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
new file mode 100644 (file)
index 0000000..6f0816b
--- /dev/null
@@ -0,0 +1,102 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+public class DataChangeListenerTest extends AbstractActorTest {
+
+    private static class MockDataChangedEvent implements AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+        @Override
+        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+            throw new UnsupportedOperationException("getCreatedData");
+        }
+
+        @Override
+        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+            throw new UnsupportedOperationException("getUpdatedData");
+        }
+
+        @Override public Set<InstanceIdentifier> getRemovedPaths() {
+            throw new UnsupportedOperationException("getRemovedPaths");
+        }
+
+        @Override
+        public Map<InstanceIdentifier, ? extends NormalizedNode<?, ?>> getOriginalData() {
+            throw new UnsupportedOperationException("getOriginalData");
+        }
+
+        @Override public NormalizedNode<?, ?> getOriginalSubtree() {
+            throw new UnsupportedOperationException("getOriginalSubtree");
+        }
+
+        @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
+            throw new UnsupportedOperationException("getUpdatedSubtree");
+        }
+    }
+
+    private class MockDataChangeListener implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
+        private boolean gotIt = false;
+
+        @Override public void onDataChanged(
+            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+            gotIt = true;
+        }
+
+        public boolean gotIt() {
+            return gotIt;
+        }
+    }
+
+    @Test
+    public void testDataChanged(){
+        new JavaTestKit(getSystem()) {{
+            final MockDataChangeListener listener = new MockDataChangeListener();
+            final Props props = DataChangeListener.props(listener);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testDataChanged");
+
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(
+                        new DataChanged(new MockDataChangedEvent()),
+                        getRef());
+
+                    final Boolean out = new ExpectMsg<Boolean>("dataChanged") {
+                        // do not put code outside this method, will run afterwards
+                        protected Boolean match(Object in) {
+                            if (in instanceof DataChangedReply) {
+                                DataChangedReply reply =
+                                    (DataChangedReply) in;
+                                return true;
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertTrue(out);
+                    assertTrue(listener.gotIt());
+                    // Will wait for the rest of the 3 seconds
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+}