package org.opendaylight.controller.cluster.sharding;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
import org.slf4j.Logger;
private final AbstractDataStore distributedDataStore;
private final YangInstanceIdentifier shardPath;
- // This will be useful for signaling back pressure
- private final DataStoreClient client;
-
private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
@GuardedBy("this")
final AbstractDataStore distributedDataStore,
final DOMDataTreeIdentifier prefix,
final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
- this.client = client;
this.distributedDataStore = distributedDataStore;
// TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
// maybe the whole listener logic would be better in the backend shards where we have direct access to the
// dataTree and wont have to cache it redundantly.
- this.dataTree = InMemoryDataTreeFactory.getInstance().create(
- TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
- dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
+ final DataTreeConfiguration baseConfig;
+ switch (prefix.getDatastoreType()) {
+ case CONFIGURATION:
+ baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
+ break;
+ case OPERATIONAL:
+ baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
+ }
+
+ this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
+ .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
+ .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
+ .setRootPath(prefix.getRootIdentifier())
+ .build());
+ // XXX: can we guarantee that the root is present in the schemacontext?
+ this.dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
this.shardPath = prefix.getRootIdentifier();
this.childShards = childShards;
}
findNodeFor(listenerPath.getPathArguments());
// register listener in CDS
- final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
- .registerProxyListener(shardLookup, listenerPath, listener), listener);
+ ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
+ .registerProxyListener(shardLookup, listenerPath, listener);
@SuppressWarnings("unchecked")
final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
listener.close();
DistributedShardChangePublisher.this.removeRegistration(node, this);
registrationRemoved(this);
- proxyReg.close();
+ listenerReg.close();
}
};
addRegistration(node, registration);
return listenerPathArgs;
}
- private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
-
- private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
- private final DOMDataTreeChangeListener listener;
-
- private ProxyRegistration(
- final ListenerRegistration<
- org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
- final DOMDataTreeChangeListener listener) {
- this.proxy = proxy;
- this.listener = listener;
- }
-
- @Override
- public DOMDataTreeChangeListener getInstance() {
- return listener;
- }
-
- @Override
- public void close() {
- proxy.close();
- }
- }
-
synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
- final Collection<DataTreeCandidate> changes) {
+ final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
final DataTreeModification modification = dataTree.takeSnapshot().newModification();
for (final DataTreeCandidate change : changes) {
try {
DataTreeCandidates.applyToModification(modification, change);
} catch (SchemaValidationFailedException e) {
- LOG.error("Validation failed {}", e);
+ LOG.error("Validation failed", e);
}
}
final DataTreeCandidate candidate;
- try {
- dataTree.validate(modification);
- } catch (final DataValidationFailedException e) {
- LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
- modification, dataTree, e);
- throw new RuntimeException("Notification validation failed", e);
- }
+ dataTree.validate(modification);
// strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
candidate = dataTree.prepare(modification);
private final YangInstanceIdentifier listenerPath;
private final DOMDataTreeChangeListener delegate;
-
private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
new ConcurrentHashMap<>();
+ @GuardedBy("this")
+ private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
+
DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
final DOMDataTreeChangeListener delegate) {
this.listenerPath = Preconditions.checkNotNull(listenerPath);
}
@Override
- public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
LOG.debug("Received data changed {}", changes);
- applyChanges(listenerPath, changes);
+
+ if (!stashedDataTreeCandidates.isEmpty()) {
+ LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
+ changes.addAll(stashedDataTreeCandidates);
+ stashedDataTreeCandidates.clear();
+ }
+
+ try {
+ applyChanges(listenerPath, changes);
+ } catch (final DataValidationFailedException e) {
+ // TODO should we fail here? What if stashed changes
+ // (changes from subshards) got ahead more than one generation
+ // from current shard. Than we can fail to apply this changes
+ // upon current data tree, but once we get respective changes
+ // from current shard, we can apply also changes from
+ // subshards.
+ //
+ // However, we can loose ability to notice and report some
+ // errors then. For example, we cannot detect potential lost
+ // changes from current shard.
+ LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
+ changes, dataTree, e);
+ throw new RuntimeException("Notification validation failed", e);
+ }
+
delegate.onDataTreeChanged(changes);
}
final List<DataTreeCandidate> newCandidates = changes.stream()
.map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
.collect(Collectors.toList());
- delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+
+ try {
+ delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+ } catch (final DataValidationFailedException e) {
+ // We cannot apply changes from subshard to current data tree.
+ // Maybe changes from current shard haven't been applied to
+ // data tree yet. Postpone processing of these changes till we
+ // receive changes from current shard.
+ LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
+ pathFromRoot, changes, dataTree, e);
+ stashedDataTreeCandidates.addAll(newCandidates);
+ }
}
void addSubshard(final ChildShardContext context) {
@Nullable
@Override
+ @SuppressWarnings("checkstyle:hiddenField")
public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
return null;
}
@Nonnull
@Override
public Optional<NormalizedNode<?, ?>> getDataAfter() {
- return Optional.absent();
+ return Optional.empty();
}
@Nonnull
@Override
public Optional<NormalizedNode<?, ?>> getDataBefore() {
- return Optional.absent();
+ return Optional.empty();
}
}
}