import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
}
synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
- final Collection<DataTreeCandidate> changes) {
+ final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
final DataTreeModification modification = dataTree.takeSnapshot().newModification();
for (final DataTreeCandidate change : changes) {
try {
final DataTreeCandidate candidate;
- try {
- dataTree.validate(modification);
- } catch (final DataValidationFailedException e) {
- LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
- modification, dataTree, e);
- throw new RuntimeException("Notification validation failed", e);
- }
+ dataTree.validate(modification);
// strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
candidate = dataTree.prepare(modification);
private final YangInstanceIdentifier listenerPath;
private final DOMDataTreeChangeListener delegate;
-
private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
new ConcurrentHashMap<>();
+ @GuardedBy("this")
+ private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
+
DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
final DOMDataTreeChangeListener delegate) {
this.listenerPath = Preconditions.checkNotNull(listenerPath);
}
@Override
- public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
LOG.debug("Received data changed {}", changes);
- applyChanges(listenerPath, changes);
+
+ if (!stashedDataTreeCandidates.isEmpty()) {
+ LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
+ changes.addAll(stashedDataTreeCandidates);
+ stashedDataTreeCandidates.clear();
+ }
+
+ try {
+ applyChanges(listenerPath, changes);
+ } catch (final DataValidationFailedException e) {
+ // TODO should we fail here? What if stashed changes
+ // (changes from subshards) got ahead more than one generation
+ // from current shard. Than we can fail to apply this changes
+ // upon current data tree, but once we get respective changes
+ // from current shard, we can apply also changes from
+ // subshards.
+ //
+ // However, we can loose ability to notice and report some
+ // errors then. For example, we cannot detect potential lost
+ // changes from current shard.
+ LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
+ changes, dataTree, e);
+ throw new RuntimeException("Notification validation failed", e);
+ }
+
delegate.onDataTreeChanged(changes);
}
final List<DataTreeCandidate> newCandidates = changes.stream()
.map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
.collect(Collectors.toList());
- delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+
+ try {
+ delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+ } catch (final DataValidationFailedException e) {
+ // We cannot apply changes from subshard to current data tree.
+ // Maybe changes from current shard haven't been applied to
+ // data tree yet. Postpone processing of these changes till we
+ // receive changes from current shard.
+ LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
+ pathFromRoot, changes, dataTree);
+ stashedDataTreeCandidates.addAll(newCandidates);
+ }
}
void addSubshard(final ChildShardContext context) {