Convert sal-distributed-datastore to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
index 5e49c0069ed4aa266588006f4e0bad882329906e..bcb73dbce2f50d418b3388aa5d2c37e145810c20 100644 (file)
@@ -5,24 +5,23 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.sharding;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
@@ -33,15 +32,14 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
 import org.slf4j.Logger;
@@ -53,31 +51,43 @@ public class DistributedShardChangePublisher
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
 
-    private final DistributedDataStore distributedDataStore;
+    private final DistributedDataStoreInterface distributedDataStore;
     private final YangInstanceIdentifier shardPath;
 
-    // This will be useful for signaling back pressure
-    private final DataStoreClient client;
-
     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
 
     @GuardedBy("this")
     private final DataTree dataTree;
 
     public DistributedShardChangePublisher(final DataStoreClient client,
-                                           final DistributedDataStore distributedDataStore,
+                                           final DistributedDataStoreInterface distributedDataStore,
                                            final DOMDataTreeIdentifier prefix,
                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
-        this.client = client;
         this.distributedDataStore = distributedDataStore;
         // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
         // maybe the whole listener logic would be better in the backend shards where we have direct access to the
         // dataTree and wont have to cache it redundantly.
-        this.dataTree = InMemoryDataTreeFactory.getInstance().create(
-                TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
 
-        dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
+        final DataTreeConfiguration baseConfig;
+        switch (prefix.getDatastoreType()) {
+            case CONFIGURATION:
+                baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
+                break;
+            case OPERATIONAL:
+                baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
+        }
+
+        this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
+                .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
+                .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
+                .setRootPath(prefix.getRootIdentifier())
+                .build());
 
+        // XXX: can we guarantee that the root is present in the schemacontext?
+        this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext());
         this.shardPath = prefix.getRootIdentifier();
         this.childShards = childShards;
     }
@@ -141,18 +151,18 @@ public class DistributedShardChangePublisher
                 findNodeFor(listenerPath.getPathArguments());
 
         // register listener in CDS
-        final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
-                .registerProxyListener(shardLookup, listenerPath, listener), listener);
+        ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
+                .registerProxyListener(shardLookup, listenerPath, listener);
 
         @SuppressWarnings("unchecked")
         final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
-            new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
+            new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) {
                 @Override
                 protected void removeRegistration() {
                     listener.close();
                     DistributedShardChangePublisher.this.removeRegistration(node, this);
                     registrationRemoved(this);
-                    proxyReg.close();
+                    listenerReg.close();
                 }
             };
         addRegistration(node, registration);
@@ -181,38 +191,14 @@ public class DistributedShardChangePublisher
         return listenerPathArgs;
     }
 
-    private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
-
-        private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
-        private final DOMDataTreeChangeListener listener;
-
-        private ProxyRegistration(
-                final ListenerRegistration<
-                        org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
-                final DOMDataTreeChangeListener listener) {
-            this.proxy = proxy;
-            this.listener = listener;
-        }
-
-        @Override
-        public DOMDataTreeChangeListener getInstance() {
-            return listener;
-        }
-
-        @Override
-        public void close() {
-            proxy.close();
-        }
-    }
-
     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
-            final Collection<DataTreeCandidate> changes) {
+            final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
         for (final DataTreeCandidate change : changes) {
             try {
                 DataTreeCandidates.applyToModification(modification, change);
             } catch (SchemaValidationFailedException e) {
-                LOG.error("Validation failed {}", e);
+                LOG.error("Validation failed", e);
             }
         }
 
@@ -220,13 +206,7 @@ public class DistributedShardChangePublisher
 
         final DataTreeCandidate candidate;
 
-        try {
-            dataTree.validate(modification);
-        } catch (final DataValidationFailedException e) {
-            LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
-                    modification, dataTree, e);
-            throw new RuntimeException("Notification validation failed", e);
-        }
+        dataTree.validate(modification);
 
         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
         candidate = dataTree.prepare(modification);
@@ -236,11 +216,12 @@ public class DistributedShardChangePublisher
         DataTreeCandidateNode modifiedChild = candidate.getRootNode();
 
         for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
-            modifiedChild = modifiedChild.getModifiedChild(pathArgument);
+            modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null);
         }
 
+
         if (modifiedChild == null) {
-            modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
+            modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument());
         }
 
         return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
@@ -251,20 +232,46 @@ public class DistributedShardChangePublisher
 
         private final YangInstanceIdentifier listenerPath;
         private final DOMDataTreeChangeListener delegate;
-
         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
                 new ConcurrentHashMap<>();
 
+        @GuardedBy("this")
+        private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
+
         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
                                          final DOMDataTreeChangeListener delegate) {
-            this.listenerPath = Preconditions.checkNotNull(listenerPath);
-            this.delegate = Preconditions.checkNotNull(delegate);
+            this.listenerPath = requireNonNull(listenerPath);
+            this.delegate = requireNonNull(delegate);
         }
 
         @Override
-        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+        public synchronized void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
             LOG.debug("Received data changed {}", changes);
-            applyChanges(listenerPath, changes);
+
+            if (!stashedDataTreeCandidates.isEmpty()) {
+                LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
+                changes.addAll(stashedDataTreeCandidates);
+                stashedDataTreeCandidates.clear();
+            }
+
+            try {
+                applyChanges(listenerPath, changes);
+            } catch (final DataValidationFailedException e) {
+                // TODO should we fail here? What if stashed changes
+                // (changes from subshards) got ahead more than one generation
+                // from current shard. Than we can fail to apply this changes
+                // upon current data tree, but once we get respective changes
+                // from current shard, we can apply also changes from
+                // subshards.
+                //
+                // However, we can loose ability to notice and report some
+                // errors then. For example, we cannot detect potential lost
+                // changes from current shard.
+                LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
+                        changes, dataTree, e);
+                throw new RuntimeException("Notification validation failed", e);
+            }
+
             delegate.onDataTreeChanged(changes);
         }
 
@@ -276,11 +283,22 @@ public class DistributedShardChangePublisher
             final List<DataTreeCandidate> newCandidates = changes.stream()
                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
                     .collect(Collectors.toList());
-            delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+
+            try {
+                delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+            } catch (final DataValidationFailedException e) {
+                // We cannot apply changes from subshard to current data tree.
+                // Maybe changes from current shard haven't been applied to
+                // data tree yet. Postpone processing of these changes till we
+                // receive changes from current shard.
+                LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
+                        pathFromRoot, changes, dataTree, e);
+                stashedDataTreeCandidates.addAll(newCandidates);
+            }
         }
 
         void addSubshard(final ChildShardContext context) {
-            Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
+            checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
                     "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
 
             final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
@@ -298,49 +316,4 @@ public class DistributedShardChangePublisher
             registrations.clear();
         }
     }
-
-    private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
-
-        private final PathArgument identifier;
-
-        EmptyDataTreeCandidateNode(final PathArgument identifier) {
-            this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
-        }
-
-        @Nonnull
-        @Override
-        public PathArgument getIdentifier() {
-            return identifier;
-        }
-
-        @Nonnull
-        @Override
-        public Collection<DataTreeCandidateNode> getChildNodes() {
-            return Collections.emptySet();
-        }
-
-        @Nullable
-        @Override
-        public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
-            return null;
-        }
-
-        @Nonnull
-        @Override
-        public ModificationType getModificationType() {
-            return ModificationType.UNMODIFIED;
-        }
-
-        @Nonnull
-        @Override
-        public Optional<NormalizedNode<?, ?>> getDataAfter() {
-            return Optional.absent();
-        }
-
-        @Nonnull
-        @Override
-        public Optional<NormalizedNode<?, ?>> getDataBefore() {
-            return Optional.absent();
-        }
-    }
 }