X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardChangePublisher.java;h=b4b44449c9972df3d4883d115e9e0044e01834d7;hb=77e5860f42832140d27cff8e08e90b0b2947df31;hp=5e49c0069ed4aa266588006f4e0bad882329906e;hpb=20f8f30f4bbf1e982672c1f883a6a18b0e4539de;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java index 5e49c0069e..b4b44449c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java @@ -8,21 +8,22 @@ package org.opendaylight.controller.cluster.sharding; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; @@ -38,10 +39,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException; import org.slf4j.Logger; @@ -53,31 +54,43 @@ public class DistributedShardChangePublisher private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class); - private final DistributedDataStore distributedDataStore; + private final AbstractDataStore distributedDataStore; private final YangInstanceIdentifier shardPath; - // This will be useful for signaling back pressure - private final DataStoreClient client; - private final Map childShards; @GuardedBy("this") private final DataTree dataTree; public DistributedShardChangePublisher(final DataStoreClient client, - final DistributedDataStore distributedDataStore, + final AbstractDataStore distributedDataStore, final DOMDataTreeIdentifier prefix, final Map childShards) { - this.client = client; this.distributedDataStore = distributedDataStore; // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea // maybe the whole listener logic would be better in the backend shards where we have direct access to the // dataTree and wont have to cache it redundantly. - this.dataTree = InMemoryDataTreeFactory.getInstance().create( - TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier()); - dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext()); + final DataTreeConfiguration baseConfig; + switch (prefix.getDatastoreType()) { + case CONFIGURATION: + baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION; + break; + case OPERATIONAL: + baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL; + break; + default: + throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType()); + } + + this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) + .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) + .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) + .setRootPath(prefix.getRootIdentifier()) + .build()); + // XXX: can we guarantee that the root is present in the schemacontext? + this.dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext()); this.shardPath = prefix.getRootIdentifier(); this.childShards = childShards; } @@ -141,8 +154,8 @@ public class DistributedShardChangePublisher findNodeFor(listenerPath.getPathArguments()); // register listener in CDS - final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore - .registerProxyListener(shardLookup, listenerPath, listener), listener); + ListenerRegistration listenerReg = distributedDataStore + .registerProxyListener(shardLookup, listenerPath, listener); @SuppressWarnings("unchecked") final AbstractDOMDataTreeChangeListenerRegistration registration = @@ -152,7 +165,7 @@ public class DistributedShardChangePublisher listener.close(); DistributedShardChangePublisher.this.removeRegistration(node, this); registrationRemoved(this); - proxyReg.close(); + listenerReg.close(); } }; addRegistration(node, registration); @@ -181,32 +194,8 @@ public class DistributedShardChangePublisher return listenerPathArgs; } - private static class ProxyRegistration implements ListenerRegistration { - - private final ListenerRegistration proxy; - private final DOMDataTreeChangeListener listener; - - private ProxyRegistration( - final ListenerRegistration< - org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy, - final DOMDataTreeChangeListener listener) { - this.proxy = proxy; - this.listener = listener; - } - - @Override - public DOMDataTreeChangeListener getInstance() { - return listener; - } - - @Override - public void close() { - proxy.close(); - } - } - synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath, - final Collection changes) { + final Collection changes) throws DataValidationFailedException { final DataTreeModification modification = dataTree.takeSnapshot().newModification(); for (final DataTreeCandidate change : changes) { try { @@ -220,13 +209,7 @@ public class DistributedShardChangePublisher final DataTreeCandidate candidate; - try { - dataTree.validate(modification); - } catch (final DataValidationFailedException e) { - LOG.error("Validation failed for built modification, modification {}, current data tree: {}", - modification, dataTree, e); - throw new RuntimeException("Notification validation failed", e); - } + dataTree.validate(modification); // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree candidate = dataTree.prepare(modification); @@ -251,10 +234,12 @@ public class DistributedShardChangePublisher private final YangInstanceIdentifier listenerPath; private final DOMDataTreeChangeListener delegate; - private final Map> registrations = new ConcurrentHashMap<>(); + @GuardedBy("this") + private final Collection stashedDataTreeCandidates = new LinkedList<>(); + DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath, final DOMDataTreeChangeListener delegate) { this.listenerPath = Preconditions.checkNotNull(listenerPath); @@ -262,9 +247,33 @@ public class DistributedShardChangePublisher } @Override - public void onDataTreeChanged(@Nonnull final Collection changes) { + public synchronized void onDataTreeChanged(@Nonnull final Collection changes) { LOG.debug("Received data changed {}", changes); - applyChanges(listenerPath, changes); + + if (!stashedDataTreeCandidates.isEmpty()) { + LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates); + changes.addAll(stashedDataTreeCandidates); + stashedDataTreeCandidates.clear(); + } + + try { + applyChanges(listenerPath, changes); + } catch (final DataValidationFailedException e) { + // TODO should we fail here? What if stashed changes + // (changes from subshards) got ahead more than one generation + // from current shard. Than we can fail to apply this changes + // upon current data tree, but once we get respective changes + // from current shard, we can apply also changes from + // subshards. + // + // However, we can loose ability to notice and report some + // errors then. For example, we cannot detect potential lost + // changes from current shard. + LOG.error("Validation failed for modification built from changes {}, current data tree: {}", + changes, dataTree, e); + throw new RuntimeException("Notification validation failed", e); + } + delegate.onDataTreeChanged(changes); } @@ -276,7 +285,18 @@ public class DistributedShardChangePublisher final List newCandidates = changes.stream() .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode())) .collect(Collectors.toList()); - delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + + try { + delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + } catch (final DataValidationFailedException e) { + // We cannot apply changes from subshard to current data tree. + // Maybe changes from current shard haven't been applied to + // data tree yet. Postpone processing of these changes till we + // receive changes from current shard. + LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.", + pathFromRoot, changes, dataTree, e); + stashedDataTreeCandidates.addAll(newCandidates); + } } void addSubshard(final ChildShardContext context) { @@ -321,6 +341,7 @@ public class DistributedShardChangePublisher @Nullable @Override + @SuppressWarnings("checkstyle:hiddenField") public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) { return null; } @@ -334,13 +355,13 @@ public class DistributedShardChangePublisher @Nonnull @Override public Optional> getDataAfter() { - return Optional.absent(); + return Optional.empty(); } @Nonnull @Override public Optional> getDataBefore() { - return Optional.absent(); + return Optional.empty(); } } }