}
@Override public void handleReceive(Object message) throws Exception {
- if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
+ if(message instanceof DataChanged){
dataChanged(message);
} else if(message instanceof EnableNotification){
enableNotification((EnableNotification) message);
return;
}
- DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
+ DataChanged reply = (DataChanged) message;
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
change = reply.getChange();
this.listener.onDataChanged(change);
if(getSender() != null){
- getSender().tell(new DataChangedReply().toSerializable(), getSelf());
+ getSender().tell(new DataChangedReply(), getSelf());
}
}
@Override public void onDataChanged(
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- dataChangeListenerActor.tell(new DataChanged(schemaContext,change).toSerializable(), null);
+ dataChangeListenerActor.tell(new DataChanged(schemaContext,change), null);
}
}
package org.opendaylight.controller.cluster.datastore;
-import java.util.concurrent.Executors;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Executors;
/**
*
Object result = actorContext.executeLocalShardOperation(shardName,
new RegisterChangeListener(path, dataChangeListenerActor.path(),
- scope).toSerializable(),
+ scope),
ActorContext.ASK_DURATION
);
if (result != null) {
- RegisterChangeListenerReply reply = RegisterChangeListenerReply
- .fromSerializable(actorContext.getActorSystem(), result);
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
return new DataChangeListenerRegistrationProxy(actorContext
.actorSelection(reply.getListenerRegistrationPath()), listener,
dataChangeListenerActor);
} else if(getLeader() != null){
getLeader().forward(message, getContext());
}
- } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
- registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof ForwardedCommitTransaction) {
LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
getSender()
- .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
+ .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
getSelf());
}
Assert.assertEquals(1, listMessages.size());
- Assert.assertTrue(listMessages.get(0).getClass().equals(DataChanged.SERIALIZABLE_CLASS));
+ Assert.assertTrue(listMessages.get(0).getClass().equals(DataChanged.class));
}
}
subject.tell(new EnableNotification(true), getRef());
subject.tell(
- new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+ new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
getRef());
final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
// do not put code outside this method, will run afterwards
protected Boolean match(Object in) {
- if (in != null && in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
+ if (in != null && in.getClass().equals(DataChangedReply.class)) {
return true;
} else {
protected void run() {
subject.tell(
- new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
+ new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
getRef());
expectNoMsg();
@org.junit.Test
public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
- mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
+ mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
ListenerRegistration registration =
distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
getRef());
subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
+ getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
getRef());
final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
final String out = new ExpectMsg<String>("match hint") {
// do not put code outside this method, will run afterwards
protected String match(Object in) {
- if (in.getClass().equals(RegisterChangeListenerReply.SERIALIZABLE_CLASS)) {
+ if (in.getClass().equals(RegisterChangeListenerReply.class)) {
RegisterChangeListenerReply reply =
- RegisterChangeListenerReply.fromSerializable(getSystem(),in);
+ (RegisterChangeListenerReply) in;
return reply.getListenerRegistrationPath()
.toString();
} else {