X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=678b7815693382179328a8c13adb567d80d61b13;hp=edaf93567859ee68d9f4f01282073fbc7b14e14f;hb=413bae822cdbf37f4dc16ebe14cab621953e817a;hpb=f9f6d8ed4b12e39386801f6b16b9485f5558336e diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index edaf935678..678b781569 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -15,7 +15,7 @@ import akka.actor.ReceiveTimeout; import akka.japi.Creator; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; @@ -55,26 +55,31 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; *
  • {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction} *

    */ -public abstract class ShardTransaction extends AbstractUntypedActor { +public abstract class ShardTransaction extends AbstractUntypedActorWithMetering { + + protected static final boolean SERIALIZED_REPLY = true; private final ActorRef shardActor; private final SchemaContext schemaContext; private final ShardStats shardStats; private final String transactionID; + private final short clientTxVersion; protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, - ShardStats shardStats, String transactionID) { + ShardStats shardStats, String transactionID, short clientTxVersion) { + super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; this.schemaContext = schemaContext; this.shardStats = shardStats; this.transactionID = transactionID; + this.clientTxVersion = clientTxVersion; } public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats, - String transactionID) { + String transactionID, short txnClientVersion) { return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext, - datastoreContext, shardStats, transactionID)); + datastoreContext, shardStats, transactionID, txnClientVersion)); } protected abstract DOMStoreTransaction getDOMStoreTransaction(); @@ -91,6 +96,10 @@ public abstract class ShardTransaction extends AbstractUntypedActor { return schemaContext; } + protected short getClientTxVersion() { + return clientTxVersion; + } + @Override public void handleReceive(Object message) throws Exception { if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) { @@ -109,13 +118,14 @@ public abstract class ShardTransaction extends AbstractUntypedActor { getDOMStoreTransaction().close(); if(sendReply) { - getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); + getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf()); } getSelf().tell(PoisonPill.getInstance(), getSelf()); } - protected void readData(DOMStoreReadTransaction transaction,ReadData message) { + protected void readData(DOMStoreReadTransaction transaction, ReadData message, + final boolean returnSerialized) { final ActorRef sender = getSender(); final ActorRef self = getSelf(); final YangInstanceIdentifier path = message.getPath(); @@ -127,11 +137,11 @@ public abstract class ShardTransaction extends AbstractUntypedActor { public void run() { try { Optional> optional = future.checkedGet(); - if (optional.isPresent()) { - sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self); - } else { - sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); - } + ReadDataReply readDataReply = new ReadDataReply(optional.orNull()); + + sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): + readDataReply), self); + } catch (Exception e) { shardStats.incrementFailedReadTransactionsCount(); sender.tell(new akka.actor.Status.Failure(e), self); @@ -141,12 +151,15 @@ public abstract class ShardTransaction extends AbstractUntypedActor { }, getContext().dispatcher()); } - protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) { + protected void dataExists(DOMStoreReadTransaction transaction, DataExists message, + final boolean returnSerialized) { final YangInstanceIdentifier path = message.getPath(); try { Boolean exists = transaction.exists(path).checkedGet(); - getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf()); + DataExistsReply dataExistsReply = new DataExistsReply(exists); + getSender().tell(returnSerialized ? dataExistsReply.toSerializable() : + dataExistsReply, getSelf()); } catch (ReadFailedException e) { getSender().tell(new akka.actor.Status.Failure(e),getSelf()); } @@ -163,16 +176,18 @@ public abstract class ShardTransaction extends AbstractUntypedActor { final DatastoreContext datastoreContext; final ShardStats shardStats; final String transactionID; + final short txnClientVersion; ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, SchemaContext schemaContext, DatastoreContext datastoreContext, - ShardStats shardStats, String transactionID) { + ShardStats shardStats, String transactionID, short txnClientVersion) { this.transaction = transaction; this.shardActor = shardActor; this.shardStats = shardStats; this.schemaContext = schemaContext; this.datastoreContext = datastoreContext; this.transactionID = transactionID; + this.txnClientVersion = txnClientVersion; } @Override @@ -180,13 +195,13 @@ public abstract class ShardTransaction extends AbstractUntypedActor { ShardTransaction tx; if(transaction instanceof DOMStoreReadWriteTransaction) { tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID); + shardActor, schemaContext, shardStats, transactionID, txnClientVersion); } else if(transaction instanceof DOMStoreReadTransaction) { tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor, - schemaContext, shardStats, transactionID); + schemaContext, shardStats, transactionID, txnClientVersion); } else { tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID); + shardActor, schemaContext, shardStats, transactionID, txnClientVersion); } tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());