private ListenableFuture<RpcResult<TransactionStatus>> submit(
final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
- LOG.debug("Tx: {} is submitted for execution.",transaction.getIdentifier());
+ LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
return executor.submit(new CommitCoordination(transaction));
}
this.broker = broker;
}
- public Iterable<DOMStoreThreePhaseCommitCohort> ready() {
+ public synchronized Iterable<DOMStoreThreePhaseCommitCohort> ready() {
checkState(cohorts == null, "Transaction was already marked as ready.");
ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
@Override
public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
final NormalizedNode<?, ?> data) {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not implemented yet.");
+ getSubtransaction(store).merge(path,data);
}
@Override
final InstanceIdentifier path) {
return getSubtransaction(store).read(path);
}
-
- @Override
- public void merge(final LogicalDatastoreType store, final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
-
- }
}
private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
@Override
public RpcResult<TransactionStatus> call() throws Exception {
- Boolean canCommit = canCommit().get();
try {
+ Boolean canCommit = canCommit().get();
+
if (canCommit) {
try {
preCommit().get();
try {
commit().get();
- COORDINATOR_LOG.debug("Tx: {} Is commited.",transaction.getIdentifier());
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.<RpcError>emptySet());
+ COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
+ return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
+ Collections.<RpcError> emptySet());
+
} catch (InterruptedException | ExecutionException e) {
COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
}
transaction.getIdentifier(), e);
}
} else {
- COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.");
+ COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
abort().get();
}
} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
}
- return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError>emptySet());
+ return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
}
public ListenableFuture<Void> preCommit() {