import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called", getIdentifier());
- // Send the remaining batched modifications if any.
+ // Send the remaining batched modifications, if any, with the ready flag set.
- sendBatchedModifications();
-
- // Send the ReadyTransaction message to the Tx actor.
-
- Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+ Future<Object> lastModificationsFuture = sendBatchedModifications(true);
- return transformReadyReply(readyReplyFuture);
+ return transformReadyReply(lastModificationsFuture);
}
protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
public ActorSelection checkedApply(Object serializedReadyReply) {
LOG.debug("Tx {} readyTransaction", getIdentifier());
- // At this point the rwady operation succeeded and we need to extract the cohort
+ // At this point the ready operation succeeded and we need to extract the cohort
// actor path from the reply.
- if (serializedReadyReply instanceof ReadyTransactionReply) {
- return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
- } else if(serializedReadyReply instanceof BatchedModificationsReply) {
- return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
- } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- String cohortPath = deserializeCohortPath(reply.getCohortPath());
- return actorContext.actorSelection(cohortPath);
- } else {
- // Throwing an exception here will fail the Future.
- throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
- getIdentifier(), serializedReadyReply.getClass()));
+ if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+ ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+ return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
}
+
+ // Throwing an exception here will fail the Future.
+ throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+ getIdentifier(), serializedReadyReply.getClass()));
}
}, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
- protected String deserializeCohortPath(String cohortPath) {
- return cohortPath;
+ protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+ return readyTxReply.getCohortPath();
+ }
+
+ private BatchedModifications newBatchedModifications() {
+ return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
}
private void batchModification(Modification modification) {
if(batchedModifications == null) {
- batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
- transactionChainId);
+ batchedModifications = newBatchedModifications();
}
batchedModifications.addModification(modification);
protected Future<Object> sendBatchedModifications(boolean ready) {
Future<Object> sent = null;
- if(batchedModifications != null) {
+ if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+ if(batchedModifications == null) {
+ batchedModifications = newBatchedModifications();
+ }
+
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
batchedModifications.getModifications().size(), ready);
batchedModifications.setReady(ready);
sent = executeOperationAsync(batchedModifications);
- batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
- transactionChainId);
+ if(ready) {
+ batchedModifications = null;
+ } else {
+ batchedModifications = newBatchedModifications();
+ }
}
return sent;