final TransactionContextWrapper transactionContextWrapper =
new TransactionContextWrapper(parent.getIdentifier(), actorContext);
- Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
+ Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier().toString());
if(findPrimaryFuture.isCompleted()) {
Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
if(maybe.isSuccess()) {
* @param shardName Shard name
* @return Future containing shard information.
*/
- protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
+ protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName, String txId);
/**
* Create local transaction factory for specified shard, backed by specified shard leader
* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+ protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final String txId) {
// Read current state atomically
final State localState = currentState;
// There are no outstanding futures, shortcut
final Future<?> previous = localState.previousFuture();
if (previous == null) {
- return parent.findPrimaryShard(shardName);
+ return parent.findPrimaryShard(shardName, txId);
}
final String previousTransactionId;
if(localState instanceof Pending){
previousTransactionId = ((Pending) localState).getIdentifier().toString();
- LOG.debug("Waiting for ready futures with pending Tx {}", previousTransactionId);
+ LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
} else {
previousTransactionId = "";
LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
public void onComplete(final Throwable failure, final Object notUsed) {
if (failure != null) {
// A Ready Future failed so fail the returned Promise.
- LOG.error("Ready future failed for Tx {}", previousTransactionId);
+ LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId);
returnPromise.failure(failure);
} else {
- LOG.debug("Previous Tx {} readied - proceeding to FindPrimaryShard",
- previousTransactionId);
+ LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard",
+ txId, previousTransactionId);
// Send the FindPrimaryShard message and use the resulting Future to complete the
// returned Promise.
- returnPromise.completeWith(parent.findPrimaryShard(shardName));
+ returnPromise.completeWith(parent.findPrimaryShard(shardName, txId));
}
}
};
}
@Override
- protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+ protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final String txId) {
return getActorContext().findPrimaryShardAsync(shardName);
}
*
* @return A new snapshot
*/
- protected abstract DataTreeSnapshot getSnapshot();
+ protected abstract DataTreeSnapshot getSnapshot(Object transactionId);
}
private static final class Idle extends State {
}
@Override
- protected DataTreeSnapshot getSnapshot() {
+ protected DataTreeSnapshot getSnapshot(Object transactionId) {
return chain.takeSnapshot();
}
}
}
@Override
- protected DataTreeSnapshot getSnapshot() {
+ protected DataTreeSnapshot getSnapshot(Object transactionId) {
final DataTreeSnapshot ret = snapshot;
- Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+ Preconditions.checkState(ret != null, "Could not get snapshot for transaction %s - previous transaction %s is not ready yet",
+ transactionId, transaction.getIdentifier());
return ret;
}
}
@Override
- protected DataTreeSnapshot getSnapshot() {
+ protected DataTreeSnapshot getSnapshot(Object transactionId) {
throw new IllegalStateException(message);
}
}
state = idleState;
}
- private Entry<State, DataTreeSnapshot> getSnapshot() {
+ private Entry<State, DataTreeSnapshot> getSnapshot(T transactionId) {
final State localState = state;
- return new SimpleEntry<>(localState, localState.getSnapshot());
+ return new SimpleEntry<>(localState, localState.getSnapshot(transactionId));
}
private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
}
protected DOMStoreReadTransaction newReadOnlyTransaction(T transactionId) {
- final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+ final Entry<State, DataTreeSnapshot> entry = getSnapshot(transactionId);
return SnapshotBackedTransactions.newReadTransaction(transactionId, getDebugTransactions(), entry.getValue());
}
DOMStoreReadWriteTransaction ret;
do {
- entry = getSnapshot();
+ entry = getSnapshot(transactionId);
ret = new SnapshotBackedReadWriteTransaction<T>(transactionId, getDebugTransactions(), entry.getValue(), this);
} while (!recordTransaction(entry.getKey(), ret));
DOMStoreWriteTransaction ret;
do {
- entry = getSnapshot();
+ entry = getSnapshot(transactionId);
ret = new SnapshotBackedWriteTransaction<T>(transactionId, getDebugTransactions(), entry.getValue(), this);
} while (!recordTransaction(entry.getKey(), ret));