X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=2421cce26c0afa6f7a7c7cc3cf6b5da5830f6fd0;hb=4639f61a41a93d6a762af97b819d164781b0f9f8;hp=3b0e0934d9b67c6a18fde38edeb27ae7e92e52f4;hpb=e71922c94cec22e9f37648a2d04bf2eb3274cf2f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 3b0e0934d9..2421cce26c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -14,38 +14,19 @@ import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.japi.Creator; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; -import org.opendaylight.controller.cluster.datastore.messages.DeleteData; -import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; -import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.WriteData; -import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; -import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; -import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; -import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; -import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; -import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * The ShardTransaction Actor represents a remote transaction @@ -53,210 +34,133 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction *

*

- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions - * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this - * time there are no known advantages for creating a read-only or write-only transaction which may change over time - * at which point we can optimize things in the distributed store as well. - *

- *

* Handles Messages
* ----------------
*

  • {@link org.opendaylight.controller.cluster.datastore.messages.ReadData} - *
  • {@link org.opendaylight.controller.cluster.datastore.messages.WriteData} - *
  • {@link org.opendaylight.controller.cluster.datastore.messages.MergeData} - *
  • {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData} - *
  • {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction} *
  • {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction} *

    */ -public abstract class ShardTransaction extends AbstractUntypedActor { - +public abstract class ShardTransaction extends AbstractUntypedActorWithMetering { private final ActorRef shardActor; - protected final SchemaContext schemaContext; - private final String shardName; - - - private final MutableCompositeModification modification = new MutableCompositeModification(); + private final ShardStats shardStats; + private final TransactionIdentifier transactionID; - protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, - String shardName) { + protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, TransactionIdentifier transactionID) { + super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; - this.schemaContext = schemaContext; - this.shardName = shardName; + this.shardStats = shardStats; + this.transactionID = Preconditions.checkNotNull(transactionID); } - public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext,DatastoreContext datastoreContext, String shardName) { - return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext, - datastoreContext, shardName)); + public static Props props(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shardActor, + DatastoreContext datastoreContext, ShardStats shardStats) { + return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats)); } - protected abstract DOMStoreTransaction getDOMStoreTransaction(); + protected abstract AbstractShardDataTreeTransaction getDOMStoreTransaction(); + + protected ActorRef getShardActor() { + return shardActor; + } + + protected final TransactionIdentifier getTransactionID() { + return transactionID; + } @Override - public void handleReceive(Object message) throws Exception { - if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) { + public void handleReceive(Object message) { + if (CloseTransaction.isSerializedType(message)) { closeTransaction(true); - } else if (message instanceof GetCompositedModification) { - // This is here for testing only - getSender().tell(new GetCompositeModificationReply( - new ImmutableCompositeModification(modification)), getSelf()); } else if (message instanceof ReceiveTimeout) { - LOG.debug("Got ReceiveTimeout for inactivity - closing Tx"); + LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionID); closeTransaction(false); } else { - throw new UnknownMessageException(message); + unknownMessage(message); } } + protected boolean returnCloseTransactionReply() { + return true; + } + private void closeTransaction(boolean sendReply) { - getDOMStoreTransaction().close(); + getDOMStoreTransaction().abort(); - if(sendReply) { - getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); + if(sendReply && returnCloseTransactionReply()) { + getSender().tell(new CloseTransactionReply(), getSelf()); } getSelf().tell(PoisonPill.getInstance(), getSelf()); } - protected void readData(DOMStoreReadTransaction transaction,ReadData message) { - final ActorRef sender = getSender(); - final ActorRef self = getSelf(); - final YangInstanceIdentifier path = message.getPath(); - final CheckedFuture>, ReadFailedException> future = - transaction.read(path); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - Optional> optional = future.checkedGet(); - if (optional.isPresent()) { - sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); - } else { - sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); - } - } catch (Exception e) { - ShardMBeanFactory.getShardStatsMBean(shardName).incrementFailedReadTransactionsCount(); - sender.tell(new akka.actor.Status.Failure(e), self); - } - - } - }, getContext().dispatcher()); - } - - protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) { - final YangInstanceIdentifier path = message.getPath(); - - try { - Boolean exists = transaction.exists(path).checkedGet(); - getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf()); - } catch (ReadFailedException e) { - getSender().tell(new akka.actor.Status.Failure(e),getSelf()); + private boolean checkClosed(AbstractShardDataTreeTransaction transaction) { + final boolean ret = transaction.isClosed(); + if (ret) { + shardStats.incrementFailedReadTransactionsCount(); + getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf()); } - + return ret; } - protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) { - modification.addModification( - new WriteModification(message.getPath(), message.getData(),schemaContext)); - LOG.debug("writeData at path : " + message.getPath().toString()); - - try { - transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply().toSerializable(), getSelf()); - }catch(Exception e){ - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + protected void readData(AbstractShardDataTreeTransaction transaction, ReadData message) { + if (checkClosed(transaction)) { + return; } - } - protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) { - modification.addModification( - new MergeModification(message.getPath(), message.getData(), schemaContext)); - LOG.debug("mergeData at path : " + message.getPath().toString()); - try { - transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply().toSerializable(), getSelf()); - }catch(Exception e){ - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); - } + final YangInstanceIdentifier path = message.getPath(); + Optional> optional = transaction.getSnapshot().readNode(path); + ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion()); + sender().tell(readDataReply.toSerializable(), self()); } - protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) { - LOG.debug("deleteData at path : " + message.getPath().toString()); - modification.addModification(new DeleteModification(message.getPath())); - try { - transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); - }catch(Exception e){ - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + protected void dataExists(AbstractShardDataTreeTransaction transaction, DataExists message) { + if (checkClosed(transaction)) { + return; } - } - - protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) { - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - ActorRef cohortActor = getContext().actorOf( - ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardName), "cohort"); - getSender() - .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf()); + final YangInstanceIdentifier path = message.getPath(); + boolean exists = transaction.getSnapshot().readNode(path).isPresent(); + getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf()); } private static class ShardTransactionCreator implements Creator { private static final long serialVersionUID = 1L; - final DOMStoreTransaction transaction; + final AbstractShardDataTreeTransaction transaction; final ActorRef shardActor; - final SchemaContext schemaContext; final DatastoreContext datastoreContext; - final String shardName; + final ShardStats shardStats; + final TransactionType type; - ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, DatastoreContext datastoreContext, String shardName) { - this.transaction = transaction; + ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction transaction, ActorRef shardActor, + DatastoreContext datastoreContext, ShardStats shardStats) { + this.transaction = Preconditions.checkNotNull(transaction); this.shardActor = shardActor; - this.shardName = shardName; - this.schemaContext = schemaContext; + this.shardStats = shardStats; this.datastoreContext = datastoreContext; + this.type = type; } @Override public ShardTransaction create() throws Exception { - ShardTransaction tx; - if(transaction instanceof DOMStoreReadWriteTransaction) { - tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction, - shardActor, schemaContext, shardName); - } else if(transaction instanceof DOMStoreReadTransaction) { - tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor, - schemaContext, shardName); - } else { - tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction, - shardActor, schemaContext, shardName); + final ShardTransaction tx; + switch (type) { + case READ_ONLY: + tx = new ShardReadTransaction(transaction, shardActor, shardStats); + break; + case READ_WRITE: + tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor, shardStats); + break; + case WRITE_ONLY: + tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor, shardStats); + break; + default: + throw new IllegalArgumentException("Unhandled transaction type " + type); } tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout()); return tx; } } - - // These classes are in here for test purposes only - - static class GetCompositedModification { - } - - - static class GetCompositeModificationReply { - private final CompositeModification modification; - - - GetCompositeModificationReply(CompositeModification modification) { - this.modification = modification; - } - - - public CompositeModification getModification() { - return modification; - } - } }