package org.opendaylight.controller.md.sal.dom.store.impl;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction<String> transaction;
- private final DOMStoreThreePhaseCommitCohort delegate;
+final class ChainedTransactionCommitImpl extends InMemoryDOMStoreThreePhaseCommitCohort {
private final DOMStoreTransactionChainImpl txChain;
- ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction<String> transaction,
- final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.delegate = Preconditions.checkNotNull(delegate);
+ ChainedTransactionCommitImpl(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> transaction,
+ final DataTreeModification modification, final DOMStoreTransactionChainImpl txChain) {
+ super(store, transaction, modification);
this.txChain = Preconditions.checkNotNull(txChain);
}
- @Override
- protected DOMStoreThreePhaseCommitCohort delegate() {
- return delegate;
- }
-
@Override
public ListenableFuture<Void> commit() {
- ListenableFuture<Void> commitFuture = super.commit();
- Futures.addCallback(commitFuture, new FutureCallback<Void>() {
- @Override
- public void onFailure(final Throwable t) {
- txChain.transactionFailed(transaction, t);
- }
-
- @Override
- public void onSuccess(final Void result) {
- txChain.transactionCommited(transaction);
- }
- });
- return commitFuture;
+ ListenableFuture<Void> ret = super.commit();
+ txChain.transactionCommited(getTransaction());
+ return ret;
}
}
\ No newline at end of file
}
@Override
- protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification tree) {
- return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+ protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+ return new ChainedTransactionCommitImpl(store, tx, modification, this);
}
@Override
return store.getDebugTransactions();
}
- void transactionFailed(final SnapshotBackedWriteTransaction<String> transaction, final Throwable cause) {
- super.onTransactionFailed(transaction, cause);
- }
-
void transactionCommited(final SnapshotBackedWriteTransaction<String> transaction) {
super.onTransactionCommited(transaction);
}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
*/
public class InMemoryDOMDataStore extends TransactionReadyPrototype<String> implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
- private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
- private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
}
@Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification tree) {
- LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
- return new ThreePhaseCommitImpl(tx, tree);
+ protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+ LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), modification);
+ return new InMemoryDOMStoreThreePhaseCommitCohort(this, tx, modification);
}
String nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
- private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
- final Throwable ctx = transaction.getDebugContext();
- if (ctx != null) {
- LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
- }
+ void validate(final DataTreeModification modification) throws DataValidationFailedException {
+ dataTree.validate(modification);
}
- private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction<String> transaction;
- private final DataTreeModification modification;
-
- private ResolveDataChangeEventsTask listenerResolver;
- private DataTreeCandidate candidate;
-
- public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction<String> writeTransaction, final DataTreeModification modification) {
- this.transaction = writeTransaction;
- this.modification = modification;
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- try {
- dataTree.validate(modification);
- LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
- return CAN_COMMIT_FUTURE;
- } catch (ConflictingModificationAppliedException e) {
- LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
- e.getPath());
- warnDebugContext(transaction);
- return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
- } catch (DataValidationFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
- e.getPath(), e);
- warnDebugContext(transaction);
-
- // For debugging purposes, allow dumping of the modification. Coupled with the above
- // precondition log, it should allow us to understand what went on.
- LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, dataTree);
-
- return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
- } catch (Exception e) {
- LOG.warn("Unexpected failure in validation phase", e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- try {
- candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
- return SUCCESSFUL_FUTURE;
- } catch (Exception e) {
- LOG.warn("Unexpected failure in pre-commit phase", e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- candidate = null;
- return SUCCESSFUL_FUTURE;
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- checkState(candidate != null, "Proposed subtree must be computed");
-
- /*
- * The commit has to occur atomically with regard to listener
- * registrations.
- */
- synchronized (InMemoryDOMDataStore.this) {
- dataTree.commit(candidate);
- changePublisher.publishChange(candidate);
- listenerResolver.resolve(dataChangeListenerNotificationManager);
- }
+ DataTreeCandidate prepare(final DataTreeModification modification) {
+ return dataTree.prepare(modification);
+ }
- return SUCCESSFUL_FUTURE;
- }
+ synchronized void commit(final DataTreeCandidate candidate) {
+ dataTree.commit(candidate);
+ changePublisher.publishChange(candidate);
+ ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager);
}
}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreThreePhaseCommitCohort.class);
+ private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+ private final SnapshotBackedWriteTransaction<String> transaction;
+ private final DataTreeModification modification;
+ private final InMemoryDOMDataStore store;
+ private DataTreeCandidate candidate;
+
+ public InMemoryDOMStoreThreePhaseCommitCohort(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> writeTransaction, final DataTreeModification modification) {
+ this.transaction = Preconditions.checkNotNull(writeTransaction);
+ this.modification = Preconditions.checkNotNull(modification);
+ this.store = Preconditions.checkNotNull(store);
+ }
+
+ private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
+ final Throwable ctx = transaction.getDebugContext();
+ if (ctx != null) {
+ LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Boolean> canCommit() {
+ try {
+ store.validate(modification);
+ LOG.debug("Store Transaction: {} can be committed", getTransaction().getIdentifier());
+ return CAN_COMMIT_FUTURE;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("Store Tx: {} Conflicting modification for {}.", getTransaction().getIdentifier(),
+ e.getPath());
+ warnDebugContext(getTransaction());
+ return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+ } catch (DataValidationFailedException e) {
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", getTransaction().getIdentifier(),
+ e.getPath(), e);
+ warnDebugContext(getTransaction());
+
+ // For debugging purposes, allow dumping of the modification. Coupled with the above
+ // precondition log, it should allow us to understand what went on.
+ LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, store);
+
+ return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in validation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Void> preCommit() {
+ try {
+ candidate = store.prepare(modification);
+ return SUCCESSFUL_FUTURE;
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in pre-commit phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Void> abort() {
+ candidate = null;
+ return SUCCESSFUL_FUTURE;
+ }
+
+ protected final SnapshotBackedWriteTransaction<String> getTransaction() {
+ return transaction;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ checkState(candidate != null, "Proposed subtree must be computed");
+
+ /*
+ * The commit has to occur atomically with regard to listener
+ * registrations.
+ */
+ store.commit(candidate);
+ return SUCCESSFUL_FUTURE;
+ }
+}
\ No newline at end of file