/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
public class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
- private final String transactionChainId;
private final ActorContext actorContext;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
+ private int totalBatchedModificationsSent;
protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
- String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
this.actor = actor;
- this.transactionChainId = transactionChainId;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
}
@Override
- public Future<ActorSelection> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called", getIdentifier());
+ public boolean supportsDirectCommit() {
+ return true;
+ }
- // Send the remaining batched modifications if any.
+ @Override
+ public Future<Object> directCommit() {
+ LOG.debug("Tx {} directCommit called", getIdentifier());
- sendBatchedModifications();
+ // Send the remaining batched modifications, if any, with the ready flag set.
+
+ return sendBatchedModifications(true, true);
+ }
+
+ @Override
+ public Future<ActorSelection> readyTransaction() {
+ LOG.debug("Tx {} readyTransaction called", getIdentifier());
- // Send the ReadyTransaction message to the Tx actor.
+ // Send the remaining batched modifications, if any, with the ready flag set.
- Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
- return transformReadyReply(readyReplyFuture);
+ return transformReadyReply(lastModificationsFuture);
}
protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
public ActorSelection checkedApply(Object serializedReadyReply) {
LOG.debug("Tx {} readyTransaction", getIdentifier());
- // At this point the rwady operation succeeded and we need to extract the cohort
+ // At this point the ready operation succeeded and we need to extract the cohort
// actor path from the reply.
- if (serializedReadyReply instanceof ReadyTransactionReply) {
- return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
- } else if(serializedReadyReply instanceof BatchedModificationsReply) {
- return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
- } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- String cohortPath = deserializeCohortPath(reply.getCohortPath());
- return actorContext.actorSelection(cohortPath);
- } else {
- // Throwing an exception here will fail the Future.
- throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
- getIdentifier(), serializedReadyReply.getClass()));
+ if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+ ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+ return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
}
+
+ // Throwing an exception here will fail the Future.
+ throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+ getIdentifier(), serializedReadyReply.getClass()));
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
- protected String deserializeCohortPath(String cohortPath) {
- return cohortPath;
+ protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+ return readyTxReply.getCohortPath();
+ }
+
+ private BatchedModifications newBatchedModifications() {
+ return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId());
}
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
- transactionChainId);
+ batchedModifications = newBatchedModifications();
}
batchedModifications.addModification(modification);
}
protected Future<Object> sendBatchedModifications() {
- return sendBatchedModifications(false);
+ return sendBatchedModifications(false, false);
}
- protected Future<Object> sendBatchedModifications(boolean ready) {
+ protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
Future<Object> sent = null;
- if(batchedModifications != null) {
+ if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+ if(batchedModifications == null) {
+ batchedModifications = newBatchedModifications();
+ }
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
batchedModifications.getModifications().size(), ready);
}
batchedModifications.setReady(ready);
+ batchedModifications.setDoCommitOnReady(doCommitOnReady);
+ batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
sent = executeOperationAsync(batchedModifications);
- batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
- transactionChainId);
+ if(ready) {
+ batchedModifications = null;
+ } else {
+ batchedModifications = newBatchedModifications();
+ }
}
return sent;