import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.concurrent.GuardedBy;
+
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
/**
* In-memory DOM Data Store
*
- * Implementation of {@link DOMStore} which uses {@link DataTree}
- * and other classes such as {@link SnapshotBackedWriteTransaction}.
+ * Implementation of {@link DOMStore} which uses {@link DataTree} and other
+ * classes such as {@link SnapshotBackedWriteTransaction}.
* {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener, TransactionReadyPrototype {
+public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
+ TransactionReadyPrototype {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
- private SnapshotBackedWriteTransaction previousOutstandingTx;
+ @GuardedBy("this")
+ private SnapshotBackedWriteTransaction latestOutstandingTx;
+
+ private boolean chainFailed = false;
+
+ private void checkFailed() {
+ Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
+ }
@Override
public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
final DataTreeSnapshot snapshot;
- if(previousOutstandingTx != null) {
- checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = previousOutstandingTx.getMutatedView();
+ checkFailed();
+ if (latestOutstandingTx != null) {
+ checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+ snapshot = latestOutstandingTx.getMutatedView();
} else {
snapshot = dataTree.takeSnapshot();
}
@Override
public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
final DataTreeSnapshot snapshot;
- if(previousOutstandingTx != null) {
- checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = previousOutstandingTx.getMutatedView();
+ checkFailed();
+ if (latestOutstandingTx != null) {
+ checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+ snapshot = latestOutstandingTx.getMutatedView();
} else {
- snapshot = dataTree.takeSnapshot().newModification();
+ snapshot = dataTree.takeSnapshot();
}
- SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot,this);
+ final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
+ snapshot, this);
+ latestOutstandingTx = ret;
return ret;
}
@Override
public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
final DataTreeSnapshot snapshot;
- if(previousOutstandingTx != null) {
- checkState(previousOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
- snapshot = previousOutstandingTx.getMutatedView();
+ checkFailed();
+ if (latestOutstandingTx != null) {
+ checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
+ snapshot = latestOutstandingTx.getMutatedView();
} else {
- snapshot = dataTree.takeSnapshot().newModification();
+ snapshot = dataTree.takeSnapshot();
}
- SnapshotBackedWriteTransaction ret =new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,this);
+ final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
+ this);
+ latestOutstandingTx = ret;
return ret;
}
@Override
public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
- // FIXME: We probably want to add Transaction Chain cohort
- return storeCohort;
+ return new ChainedTransactionCommitImpl(tx, storeCohort, this);
}
@Override
public void close() {
- // TODO Auto-generated method stub
}
+ protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
+ final Throwable t) {
+ chainFailed = true;
+
+ }
+
+ public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
+ // If commited transaction is latestOutstandingTx we clear
+ // latestOutstandingTx
+ // field in order to base new transactions on Datastore Data Tree
+ // directly.
+ if (transaction.equals(latestOutstandingTx)) {
+ latestOutstandingTx = null;
+ }
+ }
+
+ }
+
+ private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+
+ private final SnapshotBackedWriteTransaction transaction;
+ private final DOMStoreThreePhaseCommitCohort delegate;
+
+ private final DOMStoreTransactionChainImpl txChain;
+
+ protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+ final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
+ super();
+ this.transaction = transaction;
+ this.delegate = delegate;
+ this.txChain = txChain;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return delegate.canCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegate.preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ ListenableFuture<Void> commitFuture = delegate.commit();
+ Futures.addCallback(commitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onFailure(final Throwable t) {
+ txChain.onTransactionFailed(transaction, t);
+ }
+
+ @Override
+ public void onSuccess(final Void result) {
+ txChain.onTransactionCommited(transaction);
+ }
+
+ });
+ return commitFuture;
+ }
+
}
private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
return true;
} catch (DataPreconditionFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
+ e.getPath(), e);
return false;
}
}