import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class ListenerRegistration extends UntypedActor{
+public class DataChangeListenerRegistration extends UntypedActor{
private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
- public ListenerRegistration(org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ public DataChangeListenerRegistration(
+ org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
this.registration = registration;
}
@Override
public void onReceive(Object message) throws Exception {
- if(message instanceof CloseListenerRegistration){
- closeListenerRegistration((CloseListenerRegistration) message);
+ if(message instanceof CloseDataChangeListenerRegistration){
+ closeListenerRegistration((CloseDataChangeListenerRegistration) message);
}
}
public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
- return Props.create(new Creator<ListenerRegistration>(){
+ return Props.create(new Creator<DataChangeListenerRegistration>(){
@Override
- public ListenerRegistration create() throws Exception {
- return new ListenerRegistration(registration);
+ public DataChangeListenerRegistration create() throws Exception {
+ return new DataChangeListenerRegistration(registration);
}
});
}
- private void closeListenerRegistration(CloseListenerRegistration message){
+ private void closeListenerRegistration(CloseDataChangeListenerRegistration message){
registration.close();
- getSender().tell(new CloseListenerRegistrationReply(), getSelf());
+ getSender().tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
+ * <p>
+ * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
+ * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
+ * </p>
+ */
+public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
+ private final ActorSelection listenerRegistrationActor;
+ private final AsyncDataChangeListener listener;
+
+ public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+ DataChangeListenerRegistrationProxy(
+ ActorSelection listenerRegistrationActor,
+ L listener) {
+ this.listenerRegistrationActor = listenerRegistrationActor;
+ this.listener = listener;
+ }
+
+ @Override
+ public Object getInstance() {
+ return listener;
+ }
+
+ @Override
+ public void close() {
+ listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+ }
+}
InstanceIdentifier path, L listener,
AsyncDataBroker.DataChangeScope scope) {
- ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props());
+ ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataChangeListener.props());
Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
new RegisterChangeListener(path, dataChangeListenerActor.path(),
AsyncDataBroker.DataChangeScope.BASE),
- ActorContext.ASK_DURATION);
+ ActorContext.ASK_DURATION
+ );
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
+ return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener);
}
}
@Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
- actorContext.getShardManager().tell(new UpdateSchemaContext(schemaContext), null);
+ actorContext.getShardManager().tell(
+ new UpdateSchemaContext(schemaContext), null);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorPath;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-
-/**
- * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard
- *
- * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
- * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
- */
-public class ListenerRegistrationProxy implements ListenerRegistration {
- private final ActorPath listenerRegistrationPath;
-
- public ListenerRegistrationProxy(ActorPath listenerRegistrationPath) {
-
- this.listenerRegistrationPath = listenerRegistrationPath;
- }
-
- @Override
- public Object getInstance() {
- throw new UnsupportedOperationException("getInstance");
- }
-
- @Override
- public void close() {
- throw new UnsupportedOperationException("close");
- }
-}
store.registerChangeListener(registerChangeListener.getPath(),
listener, registerChangeListener.getScope());
ActorRef listenerRegistration =
- getContext().actorOf(ListenerRegistration.props(registration));
+ getContext().actorOf(
+ DataChangeListenerRegistration.props(registration));
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
getSelf());
package org.opendaylight.controller.cluster.datastore.messages;
-public class CloseListenerRegistration {
+public class CloseDataChangeListenerRegistration {
}
package org.opendaylight.controller.cluster.datastore.messages;
-public class CloseListenerRegistrationReply {
+public class CloseDataChangeListenerRegistrationReply {
}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.util.List;
+
+public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
+
+ private static class MockDataChangeListener implements
+ AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+ @Override public void onDataChanged(
+ AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+ throw new UnsupportedOperationException("onDataChanged");
+ }
+ }
+
+ @Test
+ public void testGetInstance() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ MockDataChangeListener listener =
+ new MockDataChangeListener();
+ DataChangeListenerRegistrationProxy proxy =
+ new DataChangeListenerRegistrationProxy(
+ getSystem().actorSelection(actorRef.path()),
+ listener);
+
+ Assert.assertEquals(listener, proxy.getInstance());
+
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ DataChangeListenerRegistrationProxy proxy =
+ new DataChangeListenerRegistrationProxy(
+ getSystem().actorSelection(actorRef.path()),
+ new MockDataChangeListener());
+
+ proxy.close();
+
+ //Check if it was received by the remote actor
+ ActorContext
+ testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages",
+ ActorContext.ASK_DURATION);
+
+ Assert.assertNotNull(messages);
+
+ Assert.assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
+
+ Assert.assertEquals(1, listMessages.size());
+
+ Assert.assertTrue(listMessages.get(0) instanceof CloseDataChangeListenerRegistration);
+ }
+}
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.CloseListenerRegistrationReply;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import static org.junit.Assert.assertEquals;
-public class ListenerRegistrationTest extends AbstractActorTest {
+public class DataChangeListenerRegistrationTest extends AbstractActorTest {
private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
@Test
public void testOnReceiveCloseListenerRegistration() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ListenerRegistration.props(store.registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(), AsyncDataBroker.DataChangeScope.BASE));
+ final Props props = DataChangeListenerRegistration.props(store
+ .registerChangeListener(TestModel.TEST_PATH, noOpDataChangeListener(),
+ AsyncDataBroker.DataChangeScope.BASE));
final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
new Within(duration("1 seconds")) {
protected void run() {
- subject.tell(new CloseListenerRegistration(), getRef());
+ subject.tell(new CloseDataChangeListenerRegistration(), getRef());
final String out = new ExpectMsg<String>("match hint") {
// do not put code outside this method, will run afterwards
protected String match(Object in) {
- if (in instanceof CloseListenerRegistrationReply) {
+ if (in instanceof CloseDataChangeListenerRegistrationReply) {
return "match";
} else {
throw noMatch();
};
}
-}
\ No newline at end of file
+}