X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=737f57bf5d7314536f5fc20b1cbc45167f1b6e97;hp=835ad68bf2df2caf047055bf86eead5ce045bca2;hb=ed693440aa741fee9b94447f8404d89b4020f616;hpb=5f0488294169c571a33bdb76ff19a3ca3e1e6bb6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 835ad68bf2..737f57bf5d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -37,7 +37,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -123,29 +123,31 @@ public class ShardTransaction extends AbstractUntypedActor { @Override public void handleReceive(Object message) throws Exception { - if (message instanceof ReadData) { - readData((ReadData) message); + if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { + readData(ReadData.fromSerializable(message)); } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { writeData(WriteData.fromSerializable(message, schemaContext)); } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { mergeData(MergeData.fromSerializable(message, schemaContext)); } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { deleteData(DeleteData.fromSerizalizable(message)); - } else if (message instanceof ReadyTransaction) { - readyTransaction((ReadyTransaction) message); - } else if (message instanceof CloseTransaction) { - closeTransaction((CloseTransaction) message); + } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { + readyTransaction(new ReadyTransaction()); + } else if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) { + closeTransaction(new CloseTransaction()); } else if (message instanceof GetCompositedModification) { // This is here for testing only getSender().tell(new GetCompositeModificationReply( new ImmutableCompositeModification(modification)), getSelf()); + }else{ + throw new Exception ("Shard:handleRecieve received an unknown message"+message); } } private void readData(ReadData message) { final ActorRef sender = getSender(); final ActorRef self = getSelf(); - final InstanceIdentifier path = message.getPath(); + final YangInstanceIdentifier path = message.getPath(); final ListenableFuture>> future = transaction.read(path); @@ -155,9 +157,9 @@ public class ShardTransaction extends AbstractUntypedActor { try { Optional> optional = future.get(); if (optional.isPresent()) { - sender.tell(new ReadDataReply(optional.get()), self); + sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); } else { - sender.tell(new ReadDataReply(null), self); + sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); } } catch (InterruptedException | ExecutionException e) { log.error(e, @@ -173,21 +175,23 @@ public class ShardTransaction extends AbstractUntypedActor { private void writeData(WriteData message) { modification.addModification( new WriteModification(message.getPath(), message.getData(),schemaContext)); + LOG.debug("writeData at path : " + message.getPath().toString()); transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply(), getSelf()); + getSender().tell(new WriteDataReply().toSerializable(), getSelf()); } private void mergeData(MergeData message) { modification.addModification( new MergeModification(message.getPath(), message.getData(), schemaContext)); + LOG.debug("mergeData at path : " + message.getPath().toString()); transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply(), getSelf()); + getSender().tell(new MergeDataReply().toSerializable(), getSelf()); } private void deleteData(DeleteData message) { modification.addModification(new DeleteModification(message.getPath())); transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply(), getSelf()); + getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); } private void readyTransaction(ReadyTransaction message) { @@ -195,13 +199,13 @@ public class ShardTransaction extends AbstractUntypedActor { ActorRef cohortActor = getContext().actorOf( ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort"); getSender() - .tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); + .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf()); } private void closeTransaction(CloseTransaction message) { transaction.close(); - getSender().tell(new CloseTransactionReply(), getSelf()); + getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); }