Bug 8116 - Make DistributedShardChangePublisher agnostic to data tree change events... 61/56061/2
authorJakub Morvay <jmorvay@cisco.com>
Mon, 24 Apr 2017 13:21:29 +0000 (15:21 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Thu, 27 Apr 2017 10:56:33 +0000 (10:56 +0000)
DistributedShardChangePublisher allows for registering DCTLs on
DistributedShardFrontend. Internally, DistributedShardChangePublisher
sets up DataTreeChangeListenerProxies on respective backend shard and
also on all of its backend subshards. Upon receiving data tree change
events from backend shards, DistributedShardChangePublisher updates
its own data tree. With the help of this tree, it finally constructs
data tree chnage events for registered DCTLs.

DistributedShardChangePublisher relies on specific ordering of backend
shards data tree change events. If it receives subshard's data tree
change event prior to current shard data tree change event, updating
internal data tree can fail. Subshard's data tree change event can
expect some changes from its parent shard.

Clearly, we don't have control on ordering of these events. Do not rely
on this. If we cannot apply subshard's change to data tree, cache it
and try to apply it once we have also its parent's change.

Change-Id: I3bd9b2d217d01974bce02465529c6cdbf8c3d633
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
(cherry picked from commit b73d37a30e750c6ef7a6f6614f3b00a01b1fdd4c)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java

index 8ff08e1615b922e4fc3a54839cf66c826144fab2..0eef92ad145cb1d3c8497dc972c695f7f3ee1645 100644 (file)
@@ -14,6 +14,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -206,7 +207,7 @@ public class DistributedShardChangePublisher
     }
 
     synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
-            final Collection<DataTreeCandidate> changes) {
+            final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
         final DataTreeModification modification = dataTree.takeSnapshot().newModification();
         for (final DataTreeCandidate change : changes) {
             try {
@@ -220,13 +221,7 @@ public class DistributedShardChangePublisher
 
         final DataTreeCandidate candidate;
 
-        try {
-            dataTree.validate(modification);
-        } catch (final DataValidationFailedException e) {
-            LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
-                    modification, dataTree, e);
-            throw new RuntimeException("Notification validation failed", e);
-        }
+        dataTree.validate(modification);
 
         // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
         candidate = dataTree.prepare(modification);
@@ -251,10 +246,12 @@ public class DistributedShardChangePublisher
 
         private final YangInstanceIdentifier listenerPath;
         private final DOMDataTreeChangeListener delegate;
-
         private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
                 new ConcurrentHashMap<>();
 
+        @GuardedBy("this")
+        private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
+
         DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
                                          final DOMDataTreeChangeListener delegate) {
             this.listenerPath = Preconditions.checkNotNull(listenerPath);
@@ -262,9 +259,33 @@ public class DistributedShardChangePublisher
         }
 
         @Override
-        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+        public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
             LOG.debug("Received data changed {}", changes);
-            applyChanges(listenerPath, changes);
+
+            if (!stashedDataTreeCandidates.isEmpty()) {
+                LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
+                changes.addAll(stashedDataTreeCandidates);
+                stashedDataTreeCandidates.clear();
+            }
+
+            try {
+                applyChanges(listenerPath, changes);
+            } catch (final DataValidationFailedException e) {
+                // TODO should we fail here? What if stashed changes
+                // (changes from subshards) got ahead more than one generation
+                // from current shard. Than we can fail to apply this changes
+                // upon current data tree, but once we get respective changes
+                // from current shard, we can apply also changes from
+                // subshards.
+                //
+                // However, we can loose ability to notice and report some
+                // errors then. For example, we cannot detect potential lost
+                // changes from current shard.
+                LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
+                        changes, dataTree, e);
+                throw new RuntimeException("Notification validation failed", e);
+            }
+
             delegate.onDataTreeChanged(changes);
         }
 
@@ -276,7 +297,18 @@ public class DistributedShardChangePublisher
             final List<DataTreeCandidate> newCandidates = changes.stream()
                     .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
                     .collect(Collectors.toList());
-            delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+
+            try {
+                delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+            } catch (final DataValidationFailedException e) {
+                // We cannot apply changes from subshard to current data tree.
+                // Maybe changes from current shard haven't been applied to
+                // data tree yet. Postpone processing of these changes till we
+                // receive changes from current shard.
+                LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
+                        pathFromRoot, changes, dataTree);
+                stashedDataTreeCandidates.addAll(newCandidates);
+            }
         }
 
         void addSubshard(final ChildShardContext context) {
index 41329641d165b9bf35683824bc80bb36d58d5b5a..6cf7e7bb23d99b0fce061a7ef4aadd34d609c83b 100644 (file)
@@ -41,7 +41,6 @@ import java.util.concurrent.CompletionStage;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
@@ -335,7 +334,6 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     }
 
     // top level shard at TEST element, with subshards on each outer-list map entry
-    @Ignore("https://bugs.opendaylight.org/show_bug.cgi?id=8116")
     @Test
     public void testMultipleShardLevels() throws Exception {
         initEmptyDatastores();
@@ -397,8 +395,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
                 true, Collections.emptyList());
 
-        // need 6 invocations, first initial thats from the parent shard, and then each individual subshard
-        verify(mockedDataTreeListener, timeout(20000).times(6)).onDataTreeChanged(captorForChanges.capture(),
+        verify(mockedDataTreeListener, timeout(35000).atLeast(2)).onDataTreeChanged(captorForChanges.capture(),
                 captorForSubtrees.capture());
         verifyNoMoreInteractions(mockedDataTreeListener);
         final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();