X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=7a0b19742e7843c979bd21d35eb5c28cafd5dfc8;hb=c6618cb0df8fc41ccfcb7578b11f5ecfa0403f39;hp=a2da063e55d465ffd4c9bb256e9a257475d0e107;hpb=8a31e147100aa64b6090d856b2efe7ef50021555;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index a2da063e55..7a0b19742e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -40,6 +39,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import java.util.concurrent.ExecutionException; @@ -65,9 +65,10 @@ import java.util.concurrent.ExecutionException; *
  • {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction} *

    */ -public class ShardTransaction extends UntypedActor { +public class ShardTransaction extends AbstractUntypedActor { private final ActorRef shardActor; + private final SchemaContext schemaContext; // FIXME : see below // If transactionChain is not null then this transaction is part of a @@ -83,54 +84,53 @@ public class ShardTransaction extends UntypedActor { Logging.getLogger(getContext().system(), this); public ShardTransaction(DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { - this(null, transaction, shardActor); + ActorRef shardActor, SchemaContext schemaContext) { + this(null, transaction, shardActor, schemaContext); } public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { + ActorRef shardActor, SchemaContext schemaContext) { this.transactionChain = transactionChain; this.transaction = transaction; this.shardActor = shardActor; + this.schemaContext = schemaContext; } public static Props props(final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transaction, shardActor); + return new ShardTransaction(transaction, shardActor, schemaContext); } }); } public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transactionChain, transaction, shardActor); + return new ShardTransaction(transactionChain, transaction, shardActor, schemaContext); } }); } @Override - public void onReceive(Object message) throws Exception { - log.debug("Received message {}", message); - - if (message instanceof ReadData) { - readData((ReadData) message); - } else if (message instanceof WriteData) { - writeData((WriteData) message); - } else if (message instanceof MergeData) { - mergeData((MergeData) message); - } else if (message instanceof DeleteData) { - deleteData((DeleteData) message); + public void handleReceive(Object message) throws Exception { + if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { + readData(ReadData.fromSerializable(message)); + } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + writeData(WriteData.fromSerializable(message, schemaContext)); + } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { + mergeData(MergeData.fromSerializable(message, schemaContext)); + } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + deleteData(DeleteData.fromSerizalizable(message)); } else if (message instanceof ReadyTransaction) { readyTransaction((ReadyTransaction) message); } else if (message instanceof CloseTransaction) { @@ -139,6 +139,8 @@ public class ShardTransaction extends UntypedActor { // This is here for testing only getSender().tell(new GetCompositeModificationReply( new ImmutableCompositeModification(modification)), getSelf()); + }else{ + throw new Exception ("handleRecieve received an unknown mesages"+message); } } @@ -155,9 +157,9 @@ public class ShardTransaction extends UntypedActor { try { Optional> optional = future.get(); if (optional.isPresent()) { - sender.tell(new ReadDataReply(optional.get()), self); + sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); } else { - sender.tell(new ReadDataReply(null), self); + sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); } } catch (InterruptedException | ExecutionException e) { log.error(e, @@ -172,14 +174,14 @@ public class ShardTransaction extends UntypedActor { private void writeData(WriteData message) { modification.addModification( - new WriteModification(message.getPath(), message.getData())); + new WriteModification(message.getPath(), message.getData(),schemaContext)); transaction.write(message.getPath(), message.getData()); getSender().tell(new WriteDataReply(), getSelf()); } private void mergeData(MergeData message) { modification.addModification( - new MergeModification(message.getPath(), message.getData())); + new MergeModification(message.getPath(), message.getData(), schemaContext)); transaction.merge(message.getPath(), message.getData()); getSender().tell(new MergeDataReply(), getSelf()); } @@ -193,7 +195,7 @@ public class ShardTransaction extends UntypedActor { private void readyTransaction(ReadyTransaction message) { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); ActorRef cohortActor = getContext().actorOf( - ThreePhaseCommitCohort.props(cohort, shardActor, modification)); + ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort"); getSender() .tell(new ReadyTransactionReply(cohortActor.path()), getSelf());