X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardChangePublisher.java;h=0eef92ad145cb1d3c8497dc972c695f7f3ee1645;hb=b712eb01354ddb5878008e2a2e8f03fb19b92555;hp=8ff08e1615b922e4fc3a54839cf66c826144fab2;hpb=ac919f21651e87b9652d02d7924f53e7e2b30471;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java index 8ff08e1615..0eef92ad14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -206,7 +207,7 @@ public class DistributedShardChangePublisher } synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath, - final Collection changes) { + final Collection changes) throws DataValidationFailedException { final DataTreeModification modification = dataTree.takeSnapshot().newModification(); for (final DataTreeCandidate change : changes) { try { @@ -220,13 +221,7 @@ public class DistributedShardChangePublisher final DataTreeCandidate candidate; - try { - dataTree.validate(modification); - } catch (final DataValidationFailedException e) { - LOG.error("Validation failed for built modification, modification {}, current data tree: {}", - modification, dataTree, e); - throw new RuntimeException("Notification validation failed", e); - } + dataTree.validate(modification); // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree candidate = dataTree.prepare(modification); @@ -251,10 +246,12 @@ public class DistributedShardChangePublisher private final YangInstanceIdentifier listenerPath; private final DOMDataTreeChangeListener delegate; - private final Map> registrations = new ConcurrentHashMap<>(); + @GuardedBy("this") + private final Collection stashedDataTreeCandidates = new LinkedList<>(); + DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath, final DOMDataTreeChangeListener delegate) { this.listenerPath = Preconditions.checkNotNull(listenerPath); @@ -262,9 +259,33 @@ public class DistributedShardChangePublisher } @Override - public void onDataTreeChanged(@Nonnull final Collection changes) { + public synchronized void onDataTreeChanged(@Nonnull final Collection changes) { LOG.debug("Received data changed {}", changes); - applyChanges(listenerPath, changes); + + if (!stashedDataTreeCandidates.isEmpty()) { + LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates); + changes.addAll(stashedDataTreeCandidates); + stashedDataTreeCandidates.clear(); + } + + try { + applyChanges(listenerPath, changes); + } catch (final DataValidationFailedException e) { + // TODO should we fail here? What if stashed changes + // (changes from subshards) got ahead more than one generation + // from current shard. Than we can fail to apply this changes + // upon current data tree, but once we get respective changes + // from current shard, we can apply also changes from + // subshards. + // + // However, we can loose ability to notice and report some + // errors then. For example, we cannot detect potential lost + // changes from current shard. + LOG.error("Validation failed for modification built from changes {}, current data tree: {}", + changes, dataTree, e); + throw new RuntimeException("Notification validation failed", e); + } + delegate.onDataTreeChanged(changes); } @@ -276,7 +297,18 @@ public class DistributedShardChangePublisher final List newCandidates = changes.stream() .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode())) .collect(Collectors.toList()); - delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + + try { + delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + } catch (final DataValidationFailedException e) { + // We cannot apply changes from subshard to current data tree. + // Maybe changes from current shard haven't been applied to + // data tree yet. Postpone processing of these changes till we + // receive changes from current shard. + LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.", + pathFromRoot, changes, dataTree); + stashedDataTreeCandidates.addAll(newCandidates); + } } void addSubshard(final ChildShardContext context) {