X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=inline;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTransaction.java;h=af25df13d2865ba867d6a529d3ad947101b1510a;hb=8fe43e73969a8f189b76dd021c9e84146473a596;hp=5289ad33bfb2b2bc3a5008c0211ffccc3b077721;hpb=8600573a11d7a0fa1c03e6c3f014473096e6a97c;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
index 5289ad33bf..af25df13d2 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
@@ -15,7 +15,7 @@ import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -39,12 +39,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
*
*
- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
- * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
- * time there are no known advantages for creating a read-only or write-only transaction which may change over time
- * at which point we can optimize things in the distributed store as well.
- *
- *
* Handles Messages
* ----------------
*
{@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
@@ -55,27 +49,31 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
* {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
*
*/
-public abstract class ShardTransaction extends AbstractUntypedActor {
+public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
+
+ protected static final boolean SERIALIZED_REPLY = true;
private final ActorRef shardActor;
private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
- protected static final boolean SERIALIZED_REPLY = true;
+ private final short clientTxVersion;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID) {
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
+ this.clientTxVersion = clientTxVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID) {
+ String transactionID, short txnClientVersion) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
- datastoreContext, shardStats, transactionID));
+ datastoreContext, shardStats, transactionID, txnClientVersion));
}
protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -92,6 +90,10 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
return schemaContext;
}
+ protected short getClientTxVersion() {
+ return clientTxVersion;
+ }
+
@Override
public void handleReceive(Object message) throws Exception {
if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
@@ -106,41 +108,36 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
}
}
+ protected boolean returnCloseTransactionReply() {
+ return true;
+ }
+
private void closeTransaction(boolean sendReply) {
getDOMStoreTransaction().close();
- if(sendReply) {
- getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
+ if(sendReply && returnCloseTransactionReply()) {
+ getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
}
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
- protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
- final ActorRef sender = getSender();
- final ActorRef self = getSelf();
- final YangInstanceIdentifier path = message.getPath();
- final CheckedFuture>, ReadFailedException> future =
- transaction.read(path);
-
-
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- Optional> optional = future.checkedGet();
- ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
+ protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+ final boolean returnSerialized) {
- sender.tell((returnSerialized ? readDataReply.toSerializable():
- readDataReply), self);
+ final YangInstanceIdentifier path = message.getPath();
+ try {
+ final CheckedFuture>, ReadFailedException> future = transaction.read(path);
+ Optional> optional = future.checkedGet();
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
- } catch (Exception e) {
- shardStats.incrementFailedReadTransactionsCount();
- sender.tell(new akka.actor.Status.Failure(e), self);
- }
+ sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
- }
- }, getContext().dispatcher());
+ } catch (Exception e) {
+ LOG.debug(String.format("Unexpected error reading path %s", path), e);
+ shardStats.incrementFailedReadTransactionsCount();
+ sender().tell(new akka.actor.Status.Failure(e), self());
+ }
}
protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
@@ -168,16 +165,18 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
+ final short txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID) {
+ ShardStats shardStats, String transactionID, short txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
this.transactionID = transactionID;
+ this.txnClientVersion = txnClientVersion;
}
@Override
@@ -185,13 +184,13 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID);
+ shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardStats, transactionID);
+ schemaContext, shardStats, transactionID, txnClientVersion);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID);
+ shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());