import static com.google.common.base.Preconditions.checkState;
import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+ private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
- private final AtomicLong txCounter = new AtomicLong(0);
-
- private DataAndMetadataSnapshot snapshot;
- private ModificationApplyOperation operation;
private final ListeningExecutorService executor;
private final String name;
+ private final AtomicLong txCounter = new AtomicLong(0);
+ private final ListenerRegistrationNode listenerTree;
+ private final AtomicReference<DataAndMetadataSnapshot> snapshot;
+
+ private ModificationApplyOperation operationTree;
private SchemaContext schemaContext;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
- this.executor = executor;
- this.name = name;
- this.operation = new AllwaysFailOperation();
- this.snapshot = DataAndMetadataSnapshot.createEmpty();
+ this.name = Preconditions.checkNotNull(name);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.listenerTree = ListenerRegistrationNode.createRoot();
+ this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
+ this.operationTree = new AlwaysFailOperation();
}
@Override
- public String getIdentifier() {
+ public final String getIdentifier() {
return name;
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
+ return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operation);
+ return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operation);
+ return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
}
@Override
public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
- operation = SchemaAwareApplyOperationRoot.from(ctx);
+ operationTree = SchemaAwareApplyOperationRoot.from(ctx);
schemaContext = ctx;
}
@Override
public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
- return null;
+ LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
+ ListenerRegistrationNode listenerNode = listenerTree;
+ for(PathArgument arg : path.getPath()) {
+ listenerNode = listenerNode.ensureChild(arg);
+ }
+
+ /*
+ * Make sure commit is not occurring right now. Listener has to be registered and its
+ * state capture enqueued at a consistent point.
+ *
+ * FIXME: improve this to read-write lock, such that multiple listener registrations
+ * can occur simultaneously
+ */
+ final DataChangeListenerRegistration<L> reg;
+ synchronized (this) {
+ reg = listenerNode.registerDataChangeListener(path, listener, scope);
+
+ Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
+ if (currentState.isPresent()) {
+ final NormalizedNode<?, ?> data = currentState.get().getData();
+
+ final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
+ .setAfter(data) //
+ .addCreated(path, data) //
+ .build();
+ executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
+ }
+ }
+
+ return reg;
}
private synchronized DOMStoreThreePhaseCommitCohort submit(
- final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) {
- return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction);
+ final SnaphostBackedWriteTransaction writeTx) {
+ LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
+ return new ThreePhaseCommitImpl(writeTx);
}
private Object nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
+ private void commit(final DataAndMetadataSnapshot currentSnapshot,
+ final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
+ LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
+
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
+ }
+
+ final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
+ .setMetadataTree(newDataTree) //
+ .setSchemaContext(schemaContext) //
+ .build();
+
+ /*
+ * The commit has to occur atomically with regard to listener registrations.
+ */
+ synchronized (this) {
+ final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
+ checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
+
+ for (ChangeListenerNotifyTask task : listenerTasks) {
+ executor.submit(task);
+ }
+ }
+ }
+
private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
private DataAndMetadataSnapshot stableSnapshot;
public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
this.identifier = identifier;
this.stableSnapshot = snapshot;
+ LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
+
}
@Override
this.identifier = identifier;
mutableTree = MutableDataTree.from(snapshot, applyOper);
this.store = store;
+ LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
}
@Override
@Override
public synchronized DOMStoreThreePhaseCommitCohort ready() {
ready = true;
- LOG.debug("Store transaction: {} : Ready",getIdentifier());
+ LOG.debug("Store transaction: {} : Ready", getIdentifier());
mutableTree.seal();
return store.submit(this);
}
private DataAndMetadataSnapshot storeSnapshot;
private Optional<StoreMetadataNode> proposedSubtree;
+ private Iterable<ChangeListenerNotifyTask> listenerTasks;
public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
this.transaction = writeTransaction;
@Override
public ListenableFuture<Boolean> canCommit() {
- final DataAndMetadataSnapshot snapshotCapture = snapshot;
- final ModificationApplyOperation snapshotOperation = operation;
+ final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
+ final ModificationApplyOperation snapshotOperation = operationTree;
return executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- boolean applicable = snapshotOperation.isApplicable(modification, Optional.of(snapshotCapture.getMetadataTree()));
- LOG.debug("Store Transcation: {} : canCommit : {}",transaction.getIdentifier(),applicable);
+ boolean applicable = snapshotOperation.isApplicable(modification,
+ Optional.of(snapshotCapture.getMetadataTree()));
+ LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
return applicable;
}
});
@Override
public ListenableFuture<Void> preCommit() {
- storeSnapshot = snapshot;
+ storeSnapshot = snapshot.get();
+ if(modification.getModificationType() == ModificationType.UNMODIFIED) {
+ return Futures.immediateFuture(null);
+ }
return executor.submit(new Callable<Void>() {
+
+
@Override
public Void call() throws Exception {
StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
- proposedSubtree = operation.apply(modification, Optional.of(metadataTree),increase(metadataTree.getSubtreeVersion()));
+
+ proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
+ increase(metadataTree.getSubtreeVersion()));
+
+ listenerTasks = DataChangeEventResolver.create() //
+ .setRootPath(PUBLIC_ROOT_PATH) //
+ .setBeforeRoot(Optional.of(metadataTree)) //
+ .setAfterRoot(proposedSubtree) //
+ .setModificationRoot(modification) //
+ .setListenerRoot(listenerTree) //
+ .resolve();
+
return null;
}
});
@Override
public ListenableFuture<Void> commit() {
- checkState(proposedSubtree != null);
- checkState(storeSnapshot != null);
+ if(modification.getModificationType() == ModificationType.UNMODIFIED) {
+ return Futures.immediateFuture(null);
+ }
+
+ checkState(proposedSubtree != null,"Proposed subtree must be computed");
+ checkState(storeSnapshot != null,"Proposed subtree must be computed");
// return ImmediateFuture<>;
- InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree);
+ InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
return Futures.<Void> immediateFuture(null);
}
}
- private synchronized void commit(final DataAndMetadataSnapshot storeSnapshot,
- final Optional<StoreMetadataNode> proposedSubtree) {
- //LOG.info("Updating Store snaphot.");
- checkState(snapshot == storeSnapshot, "Store snapshot and transaction snapshot differs");
- snapshot = DataAndMetadataSnapshot.builder().setMetadataTree(proposedSubtree.get())
- .setSchemaContext(schemaContext).build();
- }
-
- private class AllwaysFailOperation implements ModificationApplyOperation {
+ private static final class AlwaysFailOperation implements ModificationApplyOperation {
@Override
public Optional<StoreMetadataNode> apply(final NodeModification modification,
- final Optional<StoreMetadataNode> storeMeta,final UnsignedLong subtreeVersion) {
+ final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
throw new IllegalStateException("Schema Context is not available.");
}