import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionChainIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
- private final String transactionChainId;
+ private final TransactionChainIdentifier transactionChainId;
private final TransactionContextFactory parent;
private volatile State currentState = IDLE_STATE;
TransactionChainProxy(final TransactionContextFactory parent) {
super(parent.getActorContext());
- transactionChainId = parent.getActorContext().getCurrentMemberName() + "-txn-chain-" + CHAIN_COUNTER.incrementAndGet();
+
+ transactionChainId = new TransactionChainIdentifier(parent.getActorContext().getCurrentMemberName(), CHAIN_COUNTER.incrementAndGet());
this.parent = parent;
}
public String getTransactionChainId() {
- return transactionChainId;
+ return transactionChainId.toString();
}
@Override
currentState = CLOSED_STATE;
// Send a close transaction chain request to each and every shard
- getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
+ getActorContext().broadcast(new CloseTransactionChain(transactionChainId.toString()).toSerializable());
}
private TransactionProxy allocateWriteTransaction(final TransactionType type) {
return parent.findPrimaryShard(shardName);
}
- LOG.debug("Waiting for ready futures for on chain {}", getTransactionChainId());
+ final String previousTransactionId;
+
+ if(localState instanceof Pending){
+ previousTransactionId = ((Pending) localState).getIdentifier().toString();
+ LOG.debug("Waiting for ready futures with pending Tx {}", previousTransactionId);
+ } else {
+ previousTransactionId = "";
+ LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
+ }
// Add a callback for completion of the combined Futures.
final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
public void onComplete(final Throwable failure, final Object notUsed) {
if (failure != null) {
// A Ready Future failed so fail the returned Promise.
+ LOG.error("Ready future failed for Tx {}", previousTransactionId);
returnPromise.failure(failure);
} else {
- LOG.debug("Previous Tx readied - proceeding to FindPrimaryShard on chain {}",
- getTransactionChainId());
+ LOG.debug("Previous Tx {} readied - proceeding to FindPrimaryShard",
+ previousTransactionId);
// Send the FindPrimaryShard message and use the resulting Future to complete the
// returned Promise.
@Override
protected TransactionIdentifier nextIdentifier() {
- return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement(), transactionChainId);
+ return transactionChainId.newTransactionIdentifier();
}
}