X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextImpl.java;h=3a209630c3344ca149032c2cc1d4f06b134ccf42;hb=edcc020c8fda4b13f22a31d79c13feef0b53b0ee;hp=c1f9c78e69ec683586147e01605ab7168f786e1b;hpb=3ea96ea6cf7fb77ff3c984294c81f0997c667b1f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index c1f9c78e69..3a209630c3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -16,6 +16,7 @@ import com.google.common.util.concurrent.SettableFuture; import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; @@ -40,8 +41,8 @@ import scala.concurrent.Future; public class TransactionContextImpl extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); + private final String transactionChainId; private final ActorContext actorContext; - private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; @@ -49,12 +50,12 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; - protected TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, - ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { + protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, + String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, + short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); - this.transactionPath = transactionPath; this.actor = actor; + this.transactionChainId = transactionChainId; this.actorContext = actorContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; @@ -71,6 +72,10 @@ public class TransactionContextImpl extends AbstractTransactionContext { return actor; } + protected ActorContext getActorContext() { + return actorContext; + } + protected short getRemoteTransactionVersion() { return remoteTransactionVersion; } @@ -93,21 +98,24 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // Send the ReadyTransaction message to the Tx actor. - final Future replyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + return combineRecordedOperationsFutures(readyReplyFuture); + } + + protected Future combineRecordedOperationsFutures(final Future withLastReplyFuture) { // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined // Future will fail. We need all prior operations and the ready operation to succeed // in order to attempt commit. - List> futureList = - Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1); + List> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1); futureList.addAll(recordedOperationFutures); - futureList.add(replyFuture); + futureList.add(withLastReplyFuture); Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, actorContext.getClientDispatcher()); @@ -129,28 +137,15 @@ public class TransactionContextImpl extends AbstractTransactionContext { // de-serializing each reply. // Note the Future get call here won't block as it's complete. - Object serializedReadyReply = replyFuture.value().get().get(); + Object serializedReadyReply = withLastReplyFuture.value().get().get(); if (serializedReadyReply instanceof ReadyTransactionReply) { return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - + } else if(serializedReadyReply instanceof BatchedModificationsReply) { + return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = reply.getCohortPath(); - - // In Helium we used to return the local path of the actor which represented - // a remote ThreePhaseCommitCohort. The local path would then be converted to - // a remote path using this resolvePath method. To maintain compatibility with - // a Helium node we need to continue to do this conversion. - // At some point in the future when upgrades from Helium are not supported - // we could remove this code to resolvePath and just use the cohortPath as the - // resolved cohortPath - if(TransactionContextImpl.this.remoteTransactionVersion < - DataStoreVersions.HELIUM_1_VERSION) { - cohortPath = actorContext.resolvePath(transactionPath, cohortPath); - } - + String cohortPath = deserializeCohortPath(reply.getCohortPath()); return actorContext.actorSelection(cohortPath); - } else { // Throwing an exception here will fail the Future. throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", @@ -160,27 +155,51 @@ public class TransactionContextImpl extends AbstractTransactionContext { }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } + protected String deserializeCohortPath(String cohortPath) { + return cohortPath; + } + private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(remoteTransactionVersion); + batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion, + transactionChainId); } batchedModifications.addModification(modification); if(batchedModifications.getModifications().size() >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { - sendBatchedModifications(); + sendAndRecordBatchedModifications(); } } - private void sendBatchedModifications() { + private void sendAndRecordBatchedModifications() { + Future sentFuture = sendBatchedModifications(); + if(sentFuture != null) { + recordedOperationFutures.add(sentFuture); + } + } + + protected Future sendBatchedModifications() { + return sendBatchedModifications(false); + } + + protected Future sendBatchedModifications(boolean ready) { + Future sent = null; if(batchedModifications != null) { - LOG.debug("Tx {} sending {} batched modifications", identifier, - batchedModifications.getModifications().size()); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier, + batchedModifications.getModifications().size(), ready); + } - recordedOperationFutures.add(executeOperationAsync(batchedModifications)); - batchedModifications = null; + batchedModifications.setReady(ready); + sent = executeOperationAsync(batchedModifications); + + batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion, + transactionChainId); } + + return sent; } @Override @@ -212,7 +231,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read @@ -297,7 +316,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send the remaining batched modifications if any. - sendBatchedModifications(); + sendAndRecordBatchedModifications(); // If there were any previous recorded put/merge/delete operation reply Futures then we // must wait for them to successfully complete. This is necessary to honor the read