import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
}
synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
- final Collection<DataTreeCandidate> changes) {
+ final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
final DataTreeModification modification = dataTree.takeSnapshot().newModification();
for (final DataTreeCandidate change : changes) {
try {
final DataTreeCandidate candidate;
- try {
- dataTree.validate(modification);
- } catch (final DataValidationFailedException e) {
- LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
- modification, dataTree, e);
- throw new RuntimeException("Notification validation failed", e);
- }
+ dataTree.validate(modification);
// strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
candidate = dataTree.prepare(modification);
private final YangInstanceIdentifier listenerPath;
private final DOMDataTreeChangeListener delegate;
-
private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
new ConcurrentHashMap<>();
+ @GuardedBy("this")
+ private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
+
DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
final DOMDataTreeChangeListener delegate) {
this.listenerPath = Preconditions.checkNotNull(listenerPath);
}
@Override
- public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
LOG.debug("Received data changed {}", changes);
- applyChanges(listenerPath, changes);
+
+ if (!stashedDataTreeCandidates.isEmpty()) {
+ LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
+ changes.addAll(stashedDataTreeCandidates);
+ stashedDataTreeCandidates.clear();
+ }
+
+ try {
+ applyChanges(listenerPath, changes);
+ } catch (final DataValidationFailedException e) {
+ // TODO should we fail here? What if stashed changes
+ // (changes from subshards) got ahead more than one generation
+ // from current shard. Than we can fail to apply this changes
+ // upon current data tree, but once we get respective changes
+ // from current shard, we can apply also changes from
+ // subshards.
+ //
+ // However, we can loose ability to notice and report some
+ // errors then. For example, we cannot detect potential lost
+ // changes from current shard.
+ LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
+ changes, dataTree, e);
+ throw new RuntimeException("Notification validation failed", e);
+ }
+
delegate.onDataTreeChanged(changes);
}
final List<DataTreeCandidate> newCandidates = changes.stream()
.map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
.collect(Collectors.toList());
- delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+
+ try {
+ delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+ } catch (final DataValidationFailedException e) {
+ // We cannot apply changes from subshard to current data tree.
+ // Maybe changes from current shard haven't been applied to
+ // data tree yet. Postpone processing of these changes till we
+ // receive changes from current shard.
+ LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
+ pathFromRoot, changes, dataTree);
+ stashedDataTreeCandidates.addAll(newCandidates);
+ }
}
void addSubshard(final ChildShardContext context) {
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
}
// top level shard at TEST element, with subshards on each outer-list map entry
- @Ignore("https://bugs.opendaylight.org/show_bug.cgi?id=8116")
@Test
public void testMultipleShardLevels() throws Exception {
initEmptyDatastores();
leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
true, Collections.emptyList());
- // need 6 invocations, first initial thats from the parent shard, and then each individual subshard
- verify(mockedDataTreeListener, timeout(20000).times(6)).onDataTreeChanged(captorForChanges.capture(),
+ verify(mockedDataTreeListener, timeout(35000).atLeast(2)).onDataTreeChanged(captorForChanges.capture(),
captorForSubtrees.capture());
verifyNoMoreInteractions(mockedDataTreeListener);
final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();