import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+ private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
+
+ private final ListeningExecutorService executor;
+ private final String name;
private final AtomicLong txCounter = new AtomicLong(0);
private DataAndMetadataSnapshot snapshot;
- private ModificationApplyOperation operation;
+ private ModificationApplyOperation operationTree;
+ private final ListenerRegistrationNode listenerTree;
+
- private final ListeningExecutorService executor;
- private final String name;
private SchemaContext schemaContext;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
this.executor = executor;
this.name = name;
- this.operation = new AllwaysFailOperation();
+ this.operationTree = new AllwaysFailOperation();
this.snapshot = DataAndMetadataSnapshot.createEmpty();
+ this.listenerTree = ListenerRegistrationNode.createRoot();
}
@Override
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operation);
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operation);
+ return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
}
@Override
public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
- operation = SchemaAwareApplyOperationRoot.from(ctx);
+ operationTree = SchemaAwareApplyOperationRoot.from(ctx);
schemaContext = ctx;
}
@Override
public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
- return null;
+
+ Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
+ checkState(listenerNode.isPresent());
+ synchronized (listener) {
+ notifyInitialState(path, listener);
+ }
+ return listenerNode.get().registerDataChangeListener(listener, scope);
+ }
+
+ private void notifyInitialState(final InstanceIdentifier path,
+ final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+ Optional<StoreMetadataNode> currentState = snapshot.read(path);
+ try {
+ if (currentState.isPresent()) {
+ NormalizedNode<?, ?> data = currentState.get().getData();
+ listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+ .setAfter(data) //
+ .addCreated(path, data) //
+ .build() //
+ );
+ }
+ } catch (Exception e) {
+ LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
+ }
+
}
private synchronized DOMStoreThreePhaseCommitCohort submit(
return name + "-" + txCounter.getAndIncrement();
}
+ private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+ final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
+ LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
+ checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
+ snapshot = DataAndMetadataSnapshot.builder() //
+ .setMetadataTree(newDataTree) //
+ .setSchemaContext(schemaContext) //
+ .build();
+
+ for(ChangeListenerNotifyTask task : listenerTasks) {
+ executor.submit(task);
+ }
+
+ }
+
private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
private DataAndMetadataSnapshot stableSnapshot;
@Override
public synchronized DOMStoreThreePhaseCommitCohort ready() {
ready = true;
- LOG.debug("Store transaction: {} : Ready",getIdentifier());
+ LOG.debug("Store transaction: {} : Ready", getIdentifier());
mutableTree.seal();
return store.submit(this);
}
private DataAndMetadataSnapshot storeSnapshot;
private Optional<StoreMetadataNode> proposedSubtree;
+ private Iterable<ChangeListenerNotifyTask> listenerTasks;
public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
this.transaction = writeTransaction;
@Override
public ListenableFuture<Boolean> canCommit() {
final DataAndMetadataSnapshot snapshotCapture = snapshot;
- final ModificationApplyOperation snapshotOperation = operation;
+ final ModificationApplyOperation snapshotOperation = operationTree;
return executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- boolean applicable = snapshotOperation.isApplicable(modification, Optional.of(snapshotCapture.getMetadataTree()));
- LOG.debug("Store Transcation: {} : canCommit : {}",transaction.getIdentifier(),applicable);
+ boolean applicable = snapshotOperation.isApplicable(modification,
+ Optional.of(snapshotCapture.getMetadataTree()));
+ LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
return applicable;
}
});
storeSnapshot = snapshot;
return executor.submit(new Callable<Void>() {
+
+
@Override
public Void call() throws Exception {
StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
- proposedSubtree = operation.apply(modification, Optional.of(metadataTree),increase(metadataTree.getSubtreeVersion()));
+
+ proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
+ increase(metadataTree.getSubtreeVersion()));
+
+
+ listenerTasks = DataChangeEventResolver.create() //
+ .setRootPath(PUBLIC_ROOT_PATH) //
+ .setBeforeRoot(Optional.of(metadataTree)) //
+ .setAfterRoot(proposedSubtree) //
+ .setModificationRoot(modification) //
+ .setListenerRoot(listenerTree) //
+ .resolve();
+
return null;
}
});
checkState(proposedSubtree != null);
checkState(storeSnapshot != null);
// return ImmediateFuture<>;
- InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree);
+ InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
return Futures.<Void> immediateFuture(null);
}
}
- private synchronized void commit(final DataAndMetadataSnapshot storeSnapshot,
- final Optional<StoreMetadataNode> proposedSubtree) {
- //LOG.info("Updating Store snaphot.");
- checkState(snapshot == storeSnapshot, "Store snapshot and transaction snapshot differs");
- snapshot = DataAndMetadataSnapshot.builder().setMetadataTree(proposedSubtree.get())
- .setSchemaContext(schemaContext).build();
- }
-
private class AllwaysFailOperation implements ModificationApplyOperation {
@Override
public Optional<StoreMetadataNode> apply(final NodeModification modification,
- final Optional<StoreMetadataNode> storeMeta,final UnsignedLong subtreeVersion) {
+ final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
throw new IllegalStateException("Schema Context is not available.");
}