X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextImpl.java;h=c722918c5cfed8ad7062e63911ae60fa45aad7fa;hp=c61682d8efe98cf1649bebc03b381b6afeeb1d76;hb=3c13136499bf20d1fee22bed16e6a86a37af13cf;hpb=c58a4036d120482ce18f247bb1b283193cab60d1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index c61682d8ef..c722918c5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -11,18 +12,14 @@ import akka.actor.ActorSelection; import akka.dispatch.Mapper; import akka.dispatch.OnComplete; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; -import java.util.List; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; @@ -49,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; + private int totalBatchedModificationsSent; protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, @@ -93,90 +91,56 @@ public class TransactionContextImpl extends AbstractTransactionContext { @Override public Future readyTransaction() { - LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", - getIdentifier(), recordedOperationCount()); + LOG.debug("Tx {} readyTransaction called", getIdentifier()); - // Send the remaining batched modifications if any. + // Send the remaining batched modifications, if any, with the ready flag set. - sendAndRecordBatchedModifications(); + Future lastModificationsFuture = sendBatchedModifications(true); - // Send the ReadyTransaction message to the Tx actor. - - Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); - - return combineRecordedOperationsFutures(readyReplyFuture); + return transformReadyReply(lastModificationsFuture); } - protected Future combineRecordedOperationsFutures(final Future withLastReplyFuture) { - // Combine all the previously recorded put/merge/delete operation reply Futures and the - // ReadyTransactionReply Future into one Future. If any one fails then the combined - // Future will fail. We need all prior operations and the ready operation to succeed - // in order to attempt commit. - - List> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1); - copyRecordedOperationFutures(futureList); - futureList.add(withLastReplyFuture); - - Future> combinedFutures = akka.dispatch.Futures.sequence(futureList, - actorContext.getClientDispatcher()); - - // Transform the combined Future into a Future that returns the cohort actor path from - // the ReadyTransactionReply. That's the end result of the ready operation. + protected Future transformReadyReply(final Future readyReplyFuture) { + // Transform the last reply Future into a Future that returns the cohort actor path from + // the last reply message. That's the end result of the ready operation. - return combinedFutures.transform(new Mapper, ActorSelection>() { + return readyReplyFuture.transform(new Mapper() { @Override - public ActorSelection checkedApply(Iterable notUsed) { - LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", - getIdentifier()); - - // At this point all the Futures succeeded and we need to extract the cohort - // actor path from the ReadyTransactionReply. For the recorded operations, they - // don't return any data so we're only interested that they completed - // successfully. We could be paranoid and verify the correct reply types but - // that really should never happen so it's not worth the overhead of - // de-serializing each reply. - - // Note the Future get call here won't block as it's complete. - Object serializedReadyReply = withLastReplyFuture.value().get().get(); - if (serializedReadyReply instanceof ReadyTransactionReply) { - return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply instanceof BatchedModificationsReply) { - return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = deserializeCohortPath(reply.getCohortPath()); - return actorContext.actorSelection(cohortPath); - } else { - // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", - getIdentifier(), serializedReadyReply.getClass())); + public ActorSelection checkedApply(Object serializedReadyReply) { + LOG.debug("Tx {} readyTransaction", getIdentifier()); + + // At this point the ready operation succeeded and we need to extract the cohort + // actor path from the reply. + if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) { + ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); + return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); } + + // Throwing an exception here will fail the Future. + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + getIdentifier(), serializedReadyReply.getClass())); } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } - protected String deserializeCohortPath(String cohortPath) { - return cohortPath; + protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { + return readyTxReply.getCohortPath(); + } + + private BatchedModifications newBatchedModifications() { + return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId); } private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + batchedModifications = newBatchedModifications(); } batchedModifications.addModification(modification); if(batchedModifications.getModifications().size() >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) { - sendAndRecordBatchedModifications(); - } - } - - private void sendAndRecordBatchedModifications() { - Future sentFuture = sendBatchedModifications(); - if(sentFuture != null) { - recordOperationFuture(sentFuture); + sendBatchedModifications(); } } @@ -186,17 +150,25 @@ public class TransactionContextImpl extends AbstractTransactionContext { protected Future sendBatchedModifications(boolean ready) { Future sent = null; - if(batchedModifications != null) { + if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) { + if(batchedModifications == null) { + batchedModifications = newBatchedModifications(); + } + if(LOG.isDebugEnabled()) { LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), batchedModifications.getModifications().size(), ready); } batchedModifications.setReady(ready); + batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); sent = executeOperationAsync(batchedModifications); - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + if(ready) { + batchedModifications = null; + } else { + batchedModifications = newBatchedModifications(); + } } return sent; @@ -232,7 +204,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - sendAndRecordBatchedModifications(); + sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override @@ -274,7 +246,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - sendAndRecordBatchedModifications(); + sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override