X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=737f57bf5d7314536f5fc20b1cbc45167f1b6e97;hb=83140d53722ad77dd804f7b4d761a673110b83b3;hp=e3d1e2d9d42e4968f4b81ca0ea33d40c2b7b07f7;hpb=b3d0ded2590e6a5a61055010f7b24e9a943c8d31;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index e3d1e2d9d4..737f57bf5d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -37,8 +37,9 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import java.util.concurrent.ExecutionException; @@ -67,6 +68,7 @@ import java.util.concurrent.ExecutionException; public class ShardTransaction extends AbstractUntypedActor { private final ActorRef shardActor; + private final SchemaContext schemaContext; // FIXME : see below // If transactionChain is not null then this transaction is part of a @@ -82,37 +84,38 @@ public class ShardTransaction extends AbstractUntypedActor { Logging.getLogger(getContext().system(), this); public ShardTransaction(DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { - this(null, transaction, shardActor); + ActorRef shardActor, SchemaContext schemaContext) { + this(null, transaction, shardActor, schemaContext); } public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { + ActorRef shardActor, SchemaContext schemaContext) { this.transactionChain = transactionChain; this.transaction = transaction; this.shardActor = shardActor; + this.schemaContext = schemaContext; } public static Props props(final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transaction, shardActor); + return new ShardTransaction(transaction, shardActor, schemaContext); } }); } public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transactionChain, transaction, shardActor); + return new ShardTransaction(transactionChain, transaction, shardActor, schemaContext); } }); } @@ -120,29 +123,31 @@ public class ShardTransaction extends AbstractUntypedActor { @Override public void handleReceive(Object message) throws Exception { - if (message instanceof ReadData) { - readData((ReadData) message); - } else if (message instanceof WriteData) { - writeData((WriteData) message); - } else if (message instanceof MergeData) { - mergeData((MergeData) message); - } else if (message instanceof DeleteData) { - deleteData((DeleteData) message); - } else if (message instanceof ReadyTransaction) { - readyTransaction((ReadyTransaction) message); - } else if (message instanceof CloseTransaction) { - closeTransaction((CloseTransaction) message); + if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { + readData(ReadData.fromSerializable(message)); + } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + writeData(WriteData.fromSerializable(message, schemaContext)); + } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { + mergeData(MergeData.fromSerializable(message, schemaContext)); + } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + deleteData(DeleteData.fromSerizalizable(message)); + } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { + readyTransaction(new ReadyTransaction()); + } else if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) { + closeTransaction(new CloseTransaction()); } else if (message instanceof GetCompositedModification) { // This is here for testing only getSender().tell(new GetCompositeModificationReply( new ImmutableCompositeModification(modification)), getSelf()); + }else{ + throw new Exception ("Shard:handleRecieve received an unknown message"+message); } } private void readData(ReadData message) { final ActorRef sender = getSender(); final ActorRef self = getSelf(); - final InstanceIdentifier path = message.getPath(); + final YangInstanceIdentifier path = message.getPath(); final ListenableFuture>> future = transaction.read(path); @@ -152,9 +157,9 @@ public class ShardTransaction extends AbstractUntypedActor { try { Optional> optional = future.get(); if (optional.isPresent()) { - sender.tell(new ReadDataReply(optional.get()), self); + sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); } else { - sender.tell(new ReadDataReply(null), self); + sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); } } catch (InterruptedException | ExecutionException e) { log.error(e, @@ -169,22 +174,24 @@ public class ShardTransaction extends AbstractUntypedActor { private void writeData(WriteData message) { modification.addModification( - new WriteModification(message.getPath(), message.getData())); + new WriteModification(message.getPath(), message.getData(),schemaContext)); + LOG.debug("writeData at path : " + message.getPath().toString()); transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply(), getSelf()); + getSender().tell(new WriteDataReply().toSerializable(), getSelf()); } private void mergeData(MergeData message) { modification.addModification( - new MergeModification(message.getPath(), message.getData())); + new MergeModification(message.getPath(), message.getData(), schemaContext)); + LOG.debug("mergeData at path : " + message.getPath().toString()); transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply(), getSelf()); + getSender().tell(new MergeDataReply().toSerializable(), getSelf()); } private void deleteData(DeleteData message) { modification.addModification(new DeleteModification(message.getPath())); transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply(), getSelf()); + getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); } private void readyTransaction(ReadyTransaction message) { @@ -192,13 +199,13 @@ public class ShardTransaction extends AbstractUntypedActor { ActorRef cohortActor = getContext().actorOf( ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort"); getSender() - .tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); + .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf()); } private void closeTransaction(CloseTransaction message) { transaction.close(); - getSender().tell(new CloseTransactionReply(), getSelf()); + getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); }