X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextImpl.java;h=c722918c5cfed8ad7062e63911ae60fa45aad7fa;hp=b9900889b1125e8ac229f89af9b4f64551209b8b;hb=3c13136499bf20d1fee22bed16e6a86a37af13cf;hpb=2bfa2e8e41e625006069e631b08a668c86b1ba75 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index b9900889b1..c722918c5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -14,13 +15,11 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; @@ -47,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext { private final OperationCompleter operationCompleter; private BatchedModifications batchedModifications; + private int totalBatchedModificationsSent; protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier, String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal, @@ -93,15 +93,11 @@ public class TransactionContextImpl extends AbstractTransactionContext { public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called", getIdentifier()); - // Send the remaining batched modifications if any. + // Send the remaining batched modifications, if any, with the ready flag set. - sendBatchedModifications(); - - // Send the ReadyTransaction message to the Tx actor. - - Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + Future lastModificationsFuture = sendBatchedModifications(true); - return transformReadyReply(readyReplyFuture); + return transformReadyReply(lastModificationsFuture); } protected Future transformReadyReply(final Future readyReplyFuture) { @@ -113,33 +109,31 @@ public class TransactionContextImpl extends AbstractTransactionContext { public ActorSelection checkedApply(Object serializedReadyReply) { LOG.debug("Tx {} readyTransaction", getIdentifier()); - // At this point the rwady operation succeeded and we need to extract the cohort + // At this point the ready operation succeeded and we need to extract the cohort // actor path from the reply. - if (serializedReadyReply instanceof ReadyTransactionReply) { - return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply instanceof BatchedModificationsReply) { - return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = deserializeCohortPath(reply.getCohortPath()); - return actorContext.actorSelection(cohortPath); - } else { - // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", - getIdentifier(), serializedReadyReply.getClass())); + if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) { + ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); + return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); } + + // Throwing an exception here will fail the Future. + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + getIdentifier(), serializedReadyReply.getClass())); } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } - protected String deserializeCohortPath(String cohortPath) { - return cohortPath; + protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { + return readyTxReply.getCohortPath(); + } + + private BatchedModifications newBatchedModifications() { + return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId); } private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + batchedModifications = newBatchedModifications(); } batchedModifications.addModification(modification); @@ -156,17 +150,25 @@ public class TransactionContextImpl extends AbstractTransactionContext { protected Future sendBatchedModifications(boolean ready) { Future sent = null; - if(batchedModifications != null) { + if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) { + if(batchedModifications == null) { + batchedModifications = newBatchedModifications(); + } + if(LOG.isDebugEnabled()) { LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), batchedModifications.getModifications().size(), ready); } batchedModifications.setReady(ready); + batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent); sent = executeOperationAsync(batchedModifications); - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + if(ready) { + batchedModifications = null; + } else { + batchedModifications = newBatchedModifications(); + } } return sent;