Bug 8116 - Make DistributedShardChangePublisher agnostic to data tree change events...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardChangePublisher.java
index 8ff08e1615b922e4fc3a54839cf66c826144fab2..0eef92ad145cb1d3c8497dc972c695f7f3ee1645 100644 (file)
@@ -14,6 +14,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -206,7 +207,7 @@ public class DistributedShardChangePublisher
     }
 
     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
-            final Collection<DataTreeCandidate> changes) {
+            final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
         for (final DataTreeCandidate change : changes) {
             try {
@@ -220,13 +221,7 @@ public class DistributedShardChangePublisher
 
         final DataTreeCandidate candidate;
 
-        try {
-            dataTree.validate(modification);
-        } catch (final DataValidationFailedException e) {
-            LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
-                    modification, dataTree, e);
-            throw new RuntimeException("Notification validation failed", e);
-        }
+        dataTree.validate(modification);
 
         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
         candidate = dataTree.prepare(modification);
@@ -251,10 +246,12 @@ public class DistributedShardChangePublisher
 
         private final YangInstanceIdentifier listenerPath;
         private final DOMDataTreeChangeListener delegate;
-
         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
                 new ConcurrentHashMap<>();
 
+        @GuardedBy("this")
+        private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
+
         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
                                          final DOMDataTreeChangeListener delegate) {
             this.listenerPath = Preconditions.checkNotNull(listenerPath);
@@ -262,9 +259,33 @@ public class DistributedShardChangePublisher
         }
 
         @Override
-        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+        public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
             LOG.debug("Received data changed {}", changes);
-            applyChanges(listenerPath, changes);
+
+            if (!stashedDataTreeCandidates.isEmpty()) {
+                LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
+                changes.addAll(stashedDataTreeCandidates);
+                stashedDataTreeCandidates.clear();
+            }
+
+            try {
+                applyChanges(listenerPath, changes);
+            } catch (final DataValidationFailedException e) {
+                // TODO should we fail here? What if stashed changes
+                // (changes from subshards) got ahead more than one generation
+                // from current shard. Than we can fail to apply this changes
+                // upon current data tree, but once we get respective changes
+                // from current shard, we can apply also changes from
+                // subshards.
+                //
+                // However, we can loose ability to notice and report some
+                // errors then. For example, we cannot detect potential lost
+                // changes from current shard.
+                LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
+                        changes, dataTree, e);
+                throw new RuntimeException("Notification validation failed", e);
+            }
+
             delegate.onDataTreeChanged(changes);
         }
 
@@ -276,7 +297,18 @@ public class DistributedShardChangePublisher
             final List<DataTreeCandidate> newCandidates = changes.stream()
                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
                     .collect(Collectors.toList());
-            delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+
+            try {
+                delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+            } catch (final DataValidationFailedException e) {
+                // We cannot apply changes from subshard to current data tree.
+                // Maybe changes from current shard haven't been applied to
+                // data tree yet. Postpone processing of these changes till we
+                // receive changes from current shard.
+                LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
+                        pathFromRoot, changes, dataTree);
+                stashedDataTreeCandidates.addAll(newCandidates);
+            }
         }
 
         void addSubshard(final ChildShardContext context) {