import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private final AtomicLong txCounter = new AtomicLong(0);
private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
+ private final InMemoryDOMStoreTreeChangePublisher changePublisher;
private final ExecutorService dataChangeListenerExecutor;
private final boolean debugTransactions;
private final String name;
new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
"DataChangeListenerQueueMgr");
+ changePublisher = new InMemoryDOMStoreTreeChangePublisher(this.dataChangeListenerExecutor, maxDataChangeListenerQueueSize);
}
public void setCloseable(final AutoCloseable closeable) {
};
}
+ @Override
+ public synchronized <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+ /*
+ * Make sure commit is not occurring right now. Listener has to be
+ * registered and its state capture enqueued at a consistent point.
+ */
+ return changePublisher.registerTreeChangeListener(treeId, listener, dataTree.takeSnapshot());
+ }
+
@Override
protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
LOG.debug("Tx: {} is closed.", tx.getIdentifier());
return name + "-" + txCounter.getAndIncrement();
}
+ private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
+ final Throwable ctx = transaction.getDebugContext();
+ if (ctx != null) {
+ LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+ }
+ }
+
private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
private final SnapshotBackedWriteTransaction transaction;
private final DataTreeModification modification;
} catch (ConflictingModificationAppliedException e) {
LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
e.getPath());
- transaction.warnDebugContext(LOG);
+ warnDebugContext(transaction);
return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
} catch (DataValidationFailedException e) {
LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
e.getPath(), e);
- transaction.warnDebugContext(LOG);
+ warnDebugContext(transaction);
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
*/
synchronized (InMemoryDOMDataStore.this) {
dataTree.commit(candidate);
+ changePublisher.publishChange(candidate);
listenerResolver.resolve(dataChangeListenerNotificationManager);
}