DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new RegisterChangeListener(path, dataChangeListenerActor, scope),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
persistenceId(), listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
ListenerRegistrationMessages.RegisterChangeListener.class;
private final YangInstanceIdentifier path;
- private final ActorPath dataChangeListenerPath;
+ private final ActorRef dataChangeListener;
private final AsyncDataBroker.DataChangeScope scope;
public RegisterChangeListener(YangInstanceIdentifier path,
- ActorPath dataChangeListenerPath,
+ ActorRef dataChangeListener,
AsyncDataBroker.DataChangeScope scope) {
this.path = path;
- this.dataChangeListenerPath = dataChangeListenerPath;
+ this.dataChangeListener = dataChangeListener;
this.scope = scope;
}
}
public ActorPath getDataChangeListenerPath() {
- return dataChangeListenerPath;
+ return dataChangeListener.path();
}
public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
.setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
- .setDataChangeListenerActorPath(dataChangeListenerPath.toString())
+ .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
.setDataChangeScope(scope.ordinal()).build();
}
- public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){
+ public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
- actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(),
+ actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
public class RegisterChangeListenerReply implements SerializableMessage{
public static final Class<ListenerRegistrationMessages.RegisterChangeListenerReply> SERIALIZABLE_CLASS =
ListenerRegistrationMessages.RegisterChangeListenerReply.class;
- private final ActorPath listenerRegistrationPath;
+ private final ActorRef listenerRegistration;
- public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) {
- this.listenerRegistrationPath = listenerRegistrationPath;
+ public RegisterChangeListenerReply(final ActorRef listenerRegistration) {
+ this.listenerRegistration = listenerRegistration;
}
public ActorPath getListenerRegistrationPath() {
- return listenerRegistrationPath;
+ return listenerRegistration.path();
}
@Override
public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder()
- .setListenerRegistrationPath(listenerRegistrationPath.toString()).build();
+ .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build();
}
public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){
ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable;
return new RegisterChangeListenerReply(
- actorSystem.actorFor(o.getListenerRegistrationPath()).path()
+ actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath())
);
}
}
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
- reply(new RegisterChangeListenerReply(getRef().path()));
+ reply(new RegisterChangeListenerReply(getRef()));
for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@Override
public Future<Object> answer(InvocationOnMock invocation) {
proxy.close();
- return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
}
};
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
// Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ shard.tell(new RegisterChangeListener(path, dclActor,
AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerReplyTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath());
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+
+ RegisterChangeListenerReply fromSerialized
+ = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized);
+
+ assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.BASE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+ NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath();
+
+ assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
+ assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.SUBTREE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+
+ RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized);
+
+ assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
+ assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
+ assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
+
+
+ }
+}
\ No newline at end of file