X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=835ad68bf2df2caf047055bf86eead5ce045bca2;hb=5f0488294169c571a33bdb76ff19a3ca3e1e6bb6;hp=a2da063e55d465ffd4c9bb256e9a257475d0e107;hpb=8a31e147100aa64b6090d856b2efe7ef50021555;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index a2da063e55..835ad68bf2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -40,6 +39,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import java.util.concurrent.ExecutionException; @@ -65,9 +65,10 @@ import java.util.concurrent.ExecutionException; *
  • {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction} *

    */ -public class ShardTransaction extends UntypedActor { +public class ShardTransaction extends AbstractUntypedActor { private final ActorRef shardActor; + private final SchemaContext schemaContext; // FIXME : see below // If transactionChain is not null then this transaction is part of a @@ -83,54 +84,53 @@ public class ShardTransaction extends UntypedActor { Logging.getLogger(getContext().system(), this); public ShardTransaction(DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { - this(null, transaction, shardActor); + ActorRef shardActor, SchemaContext schemaContext) { + this(null, transaction, shardActor, schemaContext); } public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { + ActorRef shardActor, SchemaContext schemaContext) { this.transactionChain = transactionChain; this.transaction = transaction; this.shardActor = shardActor; + this.schemaContext = schemaContext; } public static Props props(final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transaction, shardActor); + return new ShardTransaction(transaction, shardActor, schemaContext); } }); } public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transactionChain, transaction, shardActor); + return new ShardTransaction(transactionChain, transaction, shardActor, schemaContext); } }); } @Override - public void onReceive(Object message) throws Exception { - log.debug("Received message {}", message); - + public void handleReceive(Object message) throws Exception { if (message instanceof ReadData) { readData((ReadData) message); - } else if (message instanceof WriteData) { - writeData((WriteData) message); - } else if (message instanceof MergeData) { - mergeData((MergeData) message); - } else if (message instanceof DeleteData) { - deleteData((DeleteData) message); + } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + writeData(WriteData.fromSerializable(message, schemaContext)); + } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { + mergeData(MergeData.fromSerializable(message, schemaContext)); + } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + deleteData(DeleteData.fromSerizalizable(message)); } else if (message instanceof ReadyTransaction) { readyTransaction((ReadyTransaction) message); } else if (message instanceof CloseTransaction) { @@ -172,14 +172,14 @@ public class ShardTransaction extends UntypedActor { private void writeData(WriteData message) { modification.addModification( - new WriteModification(message.getPath(), message.getData())); + new WriteModification(message.getPath(), message.getData(),schemaContext)); transaction.write(message.getPath(), message.getData()); getSender().tell(new WriteDataReply(), getSelf()); } private void mergeData(MergeData message) { modification.addModification( - new MergeModification(message.getPath(), message.getData())); + new MergeModification(message.getPath(), message.getData(), schemaContext)); transaction.merge(message.getPath(), message.getData()); getSender().tell(new MergeDataReply(), getSelf()); } @@ -193,7 +193,7 @@ public class ShardTransaction extends UntypedActor { private void readyTransaction(ReadyTransaction message) { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); ActorRef cohortActor = getContext().actorOf( - ThreePhaseCommitCohort.props(cohort, shardActor, modification)); + ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort"); getSender() .tell(new ReadyTransactionReply(cohortActor.path()), getSelf());