X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardChangePublisher.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FDistributedShardChangePublisher.java;h=0000000000000000000000000000000000000000;hp=db8e1edebde2e6a2d3f1b72253e6d5a1f27e263e;hb=5f587c3e2bfabc09fec49463d04a6fbeba414e9c;hpb=a35607c5040d0fd561529fde3032c9f49393deeb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java deleted file mode 100644 index db8e1edebd..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.sharding; - -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import org.checkerframework.checker.lock.qual.GuardedBy; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; -import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; -import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree; -import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode; -import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated(forRemoval = true) -public class DistributedShardChangePublisher - extends AbstractRegistrationTree> - implements DOMStoreTreeChangePublisher { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class); - - private final DistributedDataStoreInterface distributedDataStore; - private final YangInstanceIdentifier shardPath; - - private final Map childShards; - - @GuardedBy("this") - private final DataTree dataTree; - - public DistributedShardChangePublisher(final DataStoreClient client, - final DistributedDataStoreInterface distributedDataStore, - final DOMDataTreeIdentifier prefix, - final Map childShards) { - this.distributedDataStore = distributedDataStore; - // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea - // maybe the whole listener logic would be better in the backend shards where we have direct access to the - // dataTree and wont have to cache it redundantly. - - final DataTreeConfiguration baseConfig; - switch (prefix.getDatastoreType()) { - case CONFIGURATION: - baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION; - break; - case OPERATIONAL: - baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL; - break; - default: - throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType()); - } - - this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType()) - .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled()) - .setUniqueIndexes(baseConfig.isUniqueIndexEnabled()) - .setRootPath(prefix.getRootIdentifier()) - .build()); - - // XXX: can we guarantee that the root is present in the schemacontext? - this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext()); - this.shardPath = prefix.getRootIdentifier(); - this.childShards = childShards; - } - - protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { - LOG.debug("Closing registration {}", registration); - } - - @Override - public AbstractDOMDataTreeChangeListenerRegistration - registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) { - takeLock(); - try { - return setupListenerContext(path, listener); - } finally { - releaseLock(); - } - } - - private AbstractDOMDataTreeChangeListenerRegistration - setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) { - // we need to register the listener registration path based on the shards root - // we have to strip the shard path from the listener path and then register - YangInstanceIdentifier strippedIdentifier = listenerPath; - if (!shardPath.isEmpty()) { - strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath)); - } - - final DOMDataTreeListenerWithSubshards subshardListener = - new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener); - final AbstractDOMDataTreeChangeListenerRegistration reg = - setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener); - - for (final ChildShardContext maybeAffected : childShards.values()) { - if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) { - // consumer has initialDataChangeEvent subshard somewhere on lower level - // register to the notification manager with snapshot and forward child notifications to parent - LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath); - subshardListener.addSubshard(maybeAffected); - } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) { - // bind path is inside subshard - // TODO can this happen? seems like in ShardedDOMDataTree we are - // already registering to the lowest shard possible - throw new UnsupportedOperationException("Listener should be registered directly " - + "into initialDataChangeEvent subshard"); - } - } - - return reg; - } - - private AbstractDOMDataTreeChangeListenerRegistration - setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup, - final YangInstanceIdentifier listenerPath, - final DOMDataTreeListenerWithSubshards listener) { - - LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath); - - // register in the shard tree - final RegistrationTreeNode> node = - findNodeFor(listenerPath.getPathArguments()); - - // register listener in CDS - ListenerRegistration listenerReg = distributedDataStore - .registerProxyListener(shardLookup, listenerPath, listener); - - @SuppressWarnings("unchecked") - final AbstractDOMDataTreeChangeListenerRegistration registration = - new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) { - @Override - protected void removeRegistration() { - listener.close(); - DistributedShardChangePublisher.this.removeRegistration(node, this); - registrationRemoved(this); - listenerReg.close(); - } - }; - addRegistration(node, registration); - - return registration; - } - - private static Iterable stripShardPath(final YangInstanceIdentifier shardPath, - final YangInstanceIdentifier listenerPath) { - if (shardPath.isEmpty()) { - return listenerPath.getPathArguments(); - } - - final List listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments()); - final Iterator shardIter = shardPath.getPathArguments().iterator(); - final Iterator listenerIter = listenerPathArgs.iterator(); - - while (shardIter.hasNext()) { - if (shardIter.next().equals(listenerIter.next())) { - listenerIter.remove(); - } else { - break; - } - } - - return listenerPathArgs; - } - - synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath, - final Collection changes) throws DataValidationFailedException { - final DataTreeModification modification = dataTree.takeSnapshot().newModification(); - for (final DataTreeCandidate change : changes) { - try { - DataTreeCandidates.applyToModification(modification, change); - } catch (SchemaValidationFailedException e) { - LOG.error("Validation failed", e); - } - } - - modification.ready(); - - final DataTreeCandidate candidate; - - dataTree.validate(modification); - - // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree - candidate = dataTree.prepare(modification); - dataTree.commit(candidate); - - - DataTreeCandidateNode modifiedChild = candidate.getRootNode(); - - for (final PathArgument pathArgument : listenerPath.getPathArguments()) { - modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null); - } - - - if (modifiedChild == null) { - modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument()); - } - - return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild); - } - - - private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener { - - private final YangInstanceIdentifier listenerPath; - private final DOMDataTreeChangeListener delegate; - private final Map> registrations = - new ConcurrentHashMap<>(); - - @GuardedBy("this") - private final Collection stashedDataTreeCandidates = new LinkedList<>(); - - DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath, - final DOMDataTreeChangeListener delegate) { - this.listenerPath = requireNonNull(listenerPath); - this.delegate = requireNonNull(delegate); - } - - @Override - public synchronized void onDataTreeChanged(final Collection changes) { - LOG.debug("Received data changed {}", changes); - - if (!stashedDataTreeCandidates.isEmpty()) { - LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates); - changes.addAll(stashedDataTreeCandidates); - stashedDataTreeCandidates.clear(); - } - - try { - applyChanges(listenerPath, changes); - } catch (final DataValidationFailedException e) { - // TODO should we fail here? What if stashed changes - // (changes from subshards) got ahead more than one generation - // from current shard. Than we can fail to apply this changes - // upon current data tree, but once we get respective changes - // from current shard, we can apply also changes from - // subshards. - // - // However, we can loose ability to notice and report some - // errors then. For example, we cannot detect potential lost - // changes from current shard. - LOG.error("Validation failed for modification built from changes {}, current data tree: {}", - changes, dataTree, e); - throw new RuntimeException("Notification validation failed", e); - } - - delegate.onDataTreeChanged(changes); - } - - synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot, - final Collection changes) { - final YangInstanceIdentifier changeId = - YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot)); - - final List newCandidates = changes.stream() - .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode())) - .collect(Collectors.toList()); - - try { - delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); - } catch (final DataValidationFailedException e) { - // We cannot apply changes from subshard to current data tree. - // Maybe changes from current shard haven't been applied to - // data tree yet. Postpone processing of these changes till we - // receive changes from current shard. - LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.", - pathFromRoot, changes, dataTree, e); - stashedDataTreeCandidates.addAll(newCandidates); - } - } - - void addSubshard(final ChildShardContext context) { - checkState(context.getShard() instanceof DOMStoreTreeChangePublisher, - "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable"); - - final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard(); - // since this is going into subshard we want to listen for ALL changes in the subshard - registrations.put(context.getPrefix().getRootIdentifier(), - listenableShard.registerTreeChangeListener( - context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged( - context.getPrefix().getRootIdentifier(), changes))); - } - - void close() { - for (final ListenerRegistration registration : registrations.values()) { - registration.close(); - } - registrations.clear(); - } - } -}