X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=737f57bf5d7314536f5fc20b1cbc45167f1b6e97;hb=83140d53722ad77dd804f7b4d761a673110b83b3;hp=f747afa7867ac8f7f6434987045315b98bc9cca5;hpb=18a4539ad844c05fcd30373efa43f873aca4c142;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index f747afa786..737f57bf5d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -9,8 +9,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.PoisonPill; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; @@ -36,8 +36,10 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import java.util.concurrent.ExecutionException; @@ -63,9 +65,15 @@ import java.util.concurrent.ExecutionException; *
  • {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction} *

    */ -public class ShardTransaction extends UntypedActor { +public class ShardTransaction extends AbstractUntypedActor { private final ActorRef shardActor; + private final SchemaContext schemaContext; + + // FIXME : see below + // If transactionChain is not null then this transaction is part of a + // transactionChain. Not really clear as to what that buys us + private final DOMStoreTransactionChain transactionChain; private final DOMStoreReadWriteTransaction transaction; @@ -76,50 +84,70 @@ public class ShardTransaction extends UntypedActor { Logging.getLogger(getContext().system(), this); public ShardTransaction(DOMStoreReadWriteTransaction transaction, - ActorRef shardActor) { + ActorRef shardActor, SchemaContext schemaContext) { + this(null, transaction, shardActor, schemaContext); + } + + public ShardTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, + ActorRef shardActor, SchemaContext schemaContext) { + this.transactionChain = transactionChain; this.transaction = transaction; this.shardActor = shardActor; + this.schemaContext = schemaContext; } + public static Props props(final DOMStoreReadWriteTransaction transaction, - final ActorRef shardActor) { + final ActorRef shardActor, final SchemaContext schemaContext) { return Props.create(new Creator() { @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transaction, shardActor); + return new ShardTransaction(transaction, shardActor, schemaContext); } }); } + public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction, + final ActorRef shardActor, final SchemaContext schemaContext) { + return Props.create(new Creator() { + + @Override + public ShardTransaction create() throws Exception { + return new ShardTransaction(transactionChain, transaction, shardActor, schemaContext); + } + }); + } + + @Override - public void onReceive(Object message) throws Exception { - log.debug("Received message {}", message); - - if (message instanceof ReadData) { - readData((ReadData) message); - } else if (message instanceof WriteData) { - writeData((WriteData) message); - } else if (message instanceof MergeData) { - mergeData((MergeData) message); - } else if (message instanceof DeleteData) { - deleteData((DeleteData) message); - } else if (message instanceof ReadyTransaction) { - readyTransaction((ReadyTransaction) message); - } else if (message instanceof CloseTransaction) { - closeTransaction((CloseTransaction) message); + public void handleReceive(Object message) throws Exception { + if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) { + readData(ReadData.fromSerializable(message)); + } else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + writeData(WriteData.fromSerializable(message, schemaContext)); + } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) { + mergeData(MergeData.fromSerializable(message, schemaContext)); + } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) { + deleteData(DeleteData.fromSerizalizable(message)); + } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) { + readyTransaction(new ReadyTransaction()); + } else if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) { + closeTransaction(new CloseTransaction()); } else if (message instanceof GetCompositedModification) { // This is here for testing only getSender().tell(new GetCompositeModificationReply( new ImmutableCompositeModification(modification)), getSelf()); + }else{ + throw new Exception ("Shard:handleRecieve received an unknown message"+message); } } private void readData(ReadData message) { final ActorRef sender = getSender(); final ActorRef self = getSelf(); - final InstanceIdentifier path = message.getPath(); + final YangInstanceIdentifier path = message.getPath(); final ListenableFuture>> future = transaction.read(path); @@ -129,9 +157,9 @@ public class ShardTransaction extends UntypedActor { try { Optional> optional = future.get(); if (optional.isPresent()) { - sender.tell(new ReadDataReply(optional.get()), self); + sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); } else { - //TODO : Need to decide what to do here + sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); } } catch (InterruptedException | ExecutionException e) { log.error(e, @@ -146,36 +174,39 @@ public class ShardTransaction extends UntypedActor { private void writeData(WriteData message) { modification.addModification( - new WriteModification(message.getPath(), message.getData())); + new WriteModification(message.getPath(), message.getData(),schemaContext)); + LOG.debug("writeData at path : " + message.getPath().toString()); transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply(), getSelf()); + getSender().tell(new WriteDataReply().toSerializable(), getSelf()); } private void mergeData(MergeData message) { modification.addModification( - new MergeModification(message.getPath(), message.getData())); + new MergeModification(message.getPath(), message.getData(), schemaContext)); + LOG.debug("mergeData at path : " + message.getPath().toString()); transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply(), getSelf()); + getSender().tell(new MergeDataReply().toSerializable(), getSelf()); } private void deleteData(DeleteData message) { modification.addModification(new DeleteModification(message.getPath())); transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply(), getSelf()); + getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); } private void readyTransaction(ReadyTransaction message) { DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); ActorRef cohortActor = getContext().actorOf( - ThreePhaseCommitCohort.props(cohort, shardActor, modification)); + ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort"); getSender() - .tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); + .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf()); } private void closeTransaction(CloseTransaction message) { transaction.close(); - getSender().tell(new CloseTransactionReply(), getSelf()); + getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); }