import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-public class DOMDataBrokerImpl implements DOMDataBroker {
+public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
private final ListeningExecutorService executor;
+ private final AtomicLong txNum = new AtomicLong();
public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
final ListeningExecutorService executor) {
}
private Object newTransactionIdentifier() {
- return new Object();
+ return "DOM-" + txNum.getAndIncrement();
}
@Override
private ListenableFuture<RpcResult<TransactionStatus>> submit(
final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
+ LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
return executor.submit(new CommitCoordination(transaction));
}
this.broker = broker;
}
- public Iterable<DOMStoreThreePhaseCommitCohort> ready() {
+ public synchronized Iterable<DOMStoreThreePhaseCommitCohort> ready() {
checkState(cohorts == null, "Transaction was already marked as ready.");
ImmutableList.Builder<DOMStoreThreePhaseCommitCohort> cohortsBuilder = ImmutableList.builder();
for (DOMStoreWriteTransaction subTx : getSubtransactions()) {
@Override
public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
final NormalizedNode<?, ?> data) {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not implemented yet.");
+ getSubtransaction(store).merge(path,data);
}
@Override
@Override
public RpcResult<TransactionStatus> call() throws Exception {
- Boolean canCommit = canCommit().get();
+ try {
+ Boolean canCommit = canCommit().get();
- if (canCommit) {
- try {
- preCommit().get();
+ if (canCommit) {
try {
- commit().get();
+ preCommit().get();
+ try {
+ commit().get();
+ COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
+ return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
+ Collections.<RpcError> emptySet());
+
+ } catch (InterruptedException | ExecutionException e) {
+ COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
+ }
+
} catch (InterruptedException | ExecutionException e) {
- // ERROR
+ COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
+ transaction.getIdentifier(), e);
}
-
- } catch (InterruptedException | ExecutionException e) {
+ } else {
+ COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
abort().get();
}
- } else {
+ } catch (InterruptedException | ExecutionException e) {
+ COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
+
+ }
+ try {
abort().get();
+ } catch (InterruptedException | ExecutionException e) {
+ COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
}
- return null;
+ return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
}
public ListenableFuture<Void> preCommit() {
}
+ @Override
+ public void close() throws Exception {
+
+ }
+
}