import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionChainIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
- private final String transactionChainId;
+ private final TransactionChainIdentifier transactionChainId;
private final TransactionContextFactory parent;
private volatile State currentState = IDLE_STATE;
TransactionChainProxy(final TransactionContextFactory parent) {
super(parent.getActorContext());
- transactionChainId = parent.getActorContext().getCurrentMemberName() + "-txn-chain-" + CHAIN_COUNTER.incrementAndGet();
+
+ transactionChainId = new TransactionChainIdentifier(parent.getActorContext().getCurrentMemberName(), CHAIN_COUNTER.incrementAndGet());
this.parent = parent;
}
public String getTransactionChainId() {
- return transactionChainId;
+ return transactionChainId.toString();
}
@Override
currentState = CLOSED_STATE;
// Send a close transaction chain request to each and every shard
- getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
- parent.removeTransactionChain(this);
+ getActorContext().broadcast(new CloseTransactionChain(transactionChainId.toString()).toSerializable());
}
private TransactionProxy allocateWriteTransaction(final TransactionType type) {
return ret;
}
- @Override
- protected DataTree dataTreeForFactory(final LocalTransactionChain factory) {
- return factory.getDataTree();
- }
-
/**
* This method is overridden to ensure the previous Tx's ready operations complete
* before we initiate the next Tx in the chain to avoid creation failures if the
* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+ protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final String txId) {
// Read current state atomically
final State localState = currentState;
// There are no outstanding futures, shortcut
final Future<?> previous = localState.previousFuture();
if (previous == null) {
- return parent.findPrimaryShard(shardName);
+ return parent.findPrimaryShard(shardName, txId);
}
- LOG.debug("Waiting for ready futures for on chain {}", getTransactionChainId());
+ final String previousTransactionId;
+
+ if(localState instanceof Pending){
+ previousTransactionId = ((Pending) localState).getIdentifier().toString();
+ LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
+ } else {
+ previousTransactionId = "";
+ LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
+ }
// Add a callback for completion of the combined Futures.
final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
public void onComplete(final Throwable failure, final Object notUsed) {
if (failure != null) {
// A Ready Future failed so fail the returned Promise.
+ LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId);
returnPromise.failure(failure);
} else {
- LOG.debug("Previous Tx readied - proceeding to FindPrimaryShard on chain {}",
- getTransactionChainId());
+ LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard",
+ txId, previousTransactionId);
// Send the FindPrimaryShard message and use the resulting Future to complete the
// returned Promise.
- returnPromise.completeWith(parent.findPrimaryShard(shardName));
+ returnPromise.completeWith(parent.findPrimaryShard(shardName, txId));
}
}
};
@Override
protected TransactionIdentifier nextIdentifier() {
- return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement(), transactionChainId);
+ return transactionChainId.newTransactionIdentifier();
}
}