X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardChangePublisher.java;h=8981988196ce5786edf0e40e2c74defc590235a2;hb=b9711f17a53a4fad48197df6c39b58e4faadc862;hp=8ff08e1615b922e4fc3a54839cf66c826144fab2;hpb=ac919f21651e87b9652d02d7924f53e7e2b30471;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java index 8ff08e1615..8981988196 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java @@ -5,22 +5,21 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.sharding; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; @@ -33,15 +32,14 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException; import org.slf4j.Logger; @@ -56,9 +54,6 @@ public class DistributedShardChangePublisher private final AbstractDataStore distributedDataStore; private final YangInstanceIdentifier shardPath; - // This will be useful for signaling back pressure - private final DataStoreClient client; - private final Map childShards; @GuardedBy("this") @@ -68,16 +63,31 @@ public class DistributedShardChangePublisher final AbstractDataStore distributedDataStore, final DOMDataTreeIdentifier prefix, final Map childShards) { - this.client = client; this.distributedDataStore = distributedDataStore; // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea // maybe the whole listener logic would be better in the backend shards where we have direct access to the // dataTree and wont have to cache it redundantly. - this.dataTree = InMemoryDataTreeFactory.getInstance().create( - TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier()); - dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext()); + final DataTreeConfiguration baseConfig; + switch (prefix.getDatastoreType()) { + case CONFIGURATION: + baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION; + break; + case OPERATIONAL: + baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL; + break; + default: + throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType()); + } + + this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) + .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) + .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) + .setRootPath(prefix.getRootIdentifier()) + .build()); + // XXX: can we guarantee that the root is present in the schemacontext? + this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext()); this.shardPath = prefix.getRootIdentifier(); this.childShards = childShards; } @@ -141,18 +151,18 @@ public class DistributedShardChangePublisher findNodeFor(listenerPath.getPathArguments()); // register listener in CDS - final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore - .registerProxyListener(shardLookup, listenerPath, listener), listener); + ListenerRegistration listenerReg = distributedDataStore + .registerProxyListener(shardLookup, listenerPath, listener); @SuppressWarnings("unchecked") final AbstractDOMDataTreeChangeListenerRegistration registration = - new AbstractDOMDataTreeChangeListenerRegistration((L) listener) { + new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) { @Override protected void removeRegistration() { listener.close(); DistributedShardChangePublisher.this.removeRegistration(node, this); registrationRemoved(this); - proxyReg.close(); + listenerReg.close(); } }; addRegistration(node, registration); @@ -181,38 +191,14 @@ public class DistributedShardChangePublisher return listenerPathArgs; } - private static class ProxyRegistration implements ListenerRegistration { - - private final ListenerRegistration proxy; - private final DOMDataTreeChangeListener listener; - - private ProxyRegistration( - final ListenerRegistration< - org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy, - final DOMDataTreeChangeListener listener) { - this.proxy = proxy; - this.listener = listener; - } - - @Override - public DOMDataTreeChangeListener getInstance() { - return listener; - } - - @Override - public void close() { - proxy.close(); - } - } - synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath, - final Collection changes) { + final Collection changes) throws DataValidationFailedException { final DataTreeModification modification = dataTree.takeSnapshot().newModification(); for (final DataTreeCandidate change : changes) { try { DataTreeCandidates.applyToModification(modification, change); } catch (SchemaValidationFailedException e) { - LOG.error("Validation failed {}", e); + LOG.error("Validation failed", e); } } @@ -220,13 +206,7 @@ public class DistributedShardChangePublisher final DataTreeCandidate candidate; - try { - dataTree.validate(modification); - } catch (final DataValidationFailedException e) { - LOG.error("Validation failed for built modification, modification {}, current data tree: {}", - modification, dataTree, e); - throw new RuntimeException("Notification validation failed", e); - } + dataTree.validate(modification); // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree candidate = dataTree.prepare(modification); @@ -236,11 +216,12 @@ public class DistributedShardChangePublisher DataTreeCandidateNode modifiedChild = candidate.getRootNode(); for (final PathArgument pathArgument : listenerPath.getPathArguments()) { - modifiedChild = modifiedChild.getModifiedChild(pathArgument); + modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null); } + if (modifiedChild == null) { - modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument()); + modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument()); } return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild); @@ -251,20 +232,46 @@ public class DistributedShardChangePublisher private final YangInstanceIdentifier listenerPath; private final DOMDataTreeChangeListener delegate; - private final Map> registrations = new ConcurrentHashMap<>(); + @GuardedBy("this") + private final Collection stashedDataTreeCandidates = new LinkedList<>(); + DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath, final DOMDataTreeChangeListener delegate) { - this.listenerPath = Preconditions.checkNotNull(listenerPath); - this.delegate = Preconditions.checkNotNull(delegate); + this.listenerPath = requireNonNull(listenerPath); + this.delegate = requireNonNull(delegate); } @Override - public void onDataTreeChanged(@Nonnull final Collection changes) { + public synchronized void onDataTreeChanged(final Collection changes) { LOG.debug("Received data changed {}", changes); - applyChanges(listenerPath, changes); + + if (!stashedDataTreeCandidates.isEmpty()) { + LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates); + changes.addAll(stashedDataTreeCandidates); + stashedDataTreeCandidates.clear(); + } + + try { + applyChanges(listenerPath, changes); + } catch (final DataValidationFailedException e) { + // TODO should we fail here? What if stashed changes + // (changes from subshards) got ahead more than one generation + // from current shard. Than we can fail to apply this changes + // upon current data tree, but once we get respective changes + // from current shard, we can apply also changes from + // subshards. + // + // However, we can loose ability to notice and report some + // errors then. For example, we cannot detect potential lost + // changes from current shard. + LOG.error("Validation failed for modification built from changes {}, current data tree: {}", + changes, dataTree, e); + throw new RuntimeException("Notification validation failed", e); + } + delegate.onDataTreeChanged(changes); } @@ -276,11 +283,22 @@ public class DistributedShardChangePublisher final List newCandidates = changes.stream() .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode())) .collect(Collectors.toList()); - delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + + try { + delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + } catch (final DataValidationFailedException e) { + // We cannot apply changes from subshard to current data tree. + // Maybe changes from current shard haven't been applied to + // data tree yet. Postpone processing of these changes till we + // receive changes from current shard. + LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.", + pathFromRoot, changes, dataTree, e); + stashedDataTreeCandidates.addAll(newCandidates); + } } void addSubshard(final ChildShardContext context) { - Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher, + checkState(context.getShard() instanceof DOMStoreTreeChangePublisher, "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable"); final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard(); @@ -298,49 +316,4 @@ public class DistributedShardChangePublisher registrations.clear(); } } - - private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode { - - private final PathArgument identifier; - - EmptyDataTreeCandidateNode(final PathArgument identifier) { - this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null"); - } - - @Nonnull - @Override - public PathArgument getIdentifier() { - return identifier; - } - - @Nonnull - @Override - public Collection getChildNodes() { - return Collections.emptySet(); - } - - @Nullable - @Override - public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) { - return null; - } - - @Nonnull - @Override - public ModificationType getModificationType() { - return ModificationType.UNMODIFIED; - } - - @Nonnull - @Override - public Optional> getDataAfter() { - return Optional.absent(); - } - - @Nonnull - @Override - public Optional> getDataBefore() { - return Optional.absent(); - } - } }