From: Robert Varga Date: Tue, 19 May 2020 14:37:47 +0000 (+0200) Subject: Add support for root DTCL listening on all shards in DS X-Git-Tag: v2.0.1~15 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=0175a376323f6c916b5a4340a27751ebef22fc83 Add support for root DTCL listening on all shards in DS Devide DTCLProxy into DTCLSingleShardProxy, DTCLMultiShardProxy and DTCLPrefixShardProxy to address the different registration mechanisms used in all three cases. JIRA: CONTROLLER-1932 Change-Id: I48732577f26fa5844b69a2feaddb02fe53909da7 Signed-off-by: Tibor Král Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java index 2e66571cea..39703f0f67 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java @@ -84,8 +84,10 @@ public abstract class AbstractDOMBrokerWriteTransaction data) { checkArgument(data != null, "Attempted to store null data at %s", path); final PathArgument lastArg = path.getLastPathArgument(); - checkArgument(lastArg == data.getIdentifier() || lastArg != null && lastArg.equals(data.getIdentifier()), + if (lastArg != null) { + checkArgument(lastArg.equals(data.getIdentifier()), "Instance identifier references %s but data identifier is %s", lastArg, data); + } } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index cddf234ffc..8537c85698 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; @@ -17,6 +18,7 @@ import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -159,6 +161,21 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface requireNonNull(treeId, "treeId should not be null"); requireNonNull(listener, "listener should not be null"); + /* + * We need to potentially deal with multi-shard composition for registration targeting the root of the data + * store. If that is the case, we delegate to a more complicated setup invol + */ + if (treeId.isEmpty()) { + // User is targeting root of the datastore. If there is more than one shard, we have to register with them + // all and perform data composition. + final Set shardNames = actorUtils.getConfiguration().getAllShardNames(); + if (shardNames.size() > 1) { + checkArgument(listener instanceof ClusteredDOMDataTreeChangeListener, + "Cannot listen on root without non-clustered listener %s", listener); + return new RootDataTreeChangeListenerProxy<>(actorUtils, listener, shardNames); + } + } + final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); @@ -169,7 +186,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface return listenerRegistrationProxy; } - @Override public DOMDataTreeCommitCohortRegistration registerCommitCohort( final DOMDataTreeIdentifier subtree, final C cohort) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java index 8df53b276a..5533fc8fad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java @@ -24,25 +24,26 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating * DataTreeChanged messages and dispatching their context to the user. */ -final class DataTreeChangeListenerActor extends AbstractUntypedActor { +class DataTreeChangeListenerActor extends AbstractUntypedActor { private final DOMDataTreeChangeListener listener; private final YangInstanceIdentifier registeredPath; + private boolean notificationsEnabled = false; private long notificationCount; private String logContext = ""; - private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, + DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { this.listener = requireNonNull(listener); this.registeredPath = requireNonNull(registeredPath); } @Override - protected void handleReceive(final Object message) { + protected final void handleReceive(final Object message) { if (message instanceof DataTreeChanged) { - dataChanged((DataTreeChanged)message); + dataTreeChanged((DataTreeChanged) message); } else if (message instanceof OnInitialData) { - onInitialData(); + onInitialData((OnInitialData) message); } else if (message instanceof EnableNotification) { enableNotification((EnableNotification) message); } else if (message instanceof GetInfo) { @@ -54,7 +55,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onInitialData() { + void onInitialData(final OnInitialData message) { LOG.debug("{}: Notifying onInitialData to listener {}", logContext, listener); try { @@ -65,7 +66,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void dataChanged(final DataTreeChanged message) { + void dataTreeChanged(final DataTreeChanged message) { // Do nothing if notifications are not enabled if (!notificationsEnabled) { LOG.debug("{}: Notifications not enabled for listener {} - dropping change notification", @@ -99,7 +100,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { listener); } - public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { + static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java new file mode 100644 index 0000000000..963dea98af --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.collect.Iterables; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; +import org.opendaylight.controller.cluster.datastore.messages.OnInitialData; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor { + private final int shardCount; + + // Initial messages, retaining order in which we have received them + private Map initialMessages = new LinkedHashMap<>(); + private Deque otherMessages = new ArrayDeque<>(); + + private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) { + super(listener, YangInstanceIdentifier.empty()); + this.shardCount = shardCount; + } + + @Override + void onInitialData(final OnInitialData message) { + final ActorRef sender = getSender(); + verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender); + + final Object prev = initialMessages.put(sender, message); + verify(prev == null, "Received OnInitialData from %s after %s", sender, prev); + checkInitialConvergence(); + } + + @Override + void dataTreeChanged(final DataTreeChanged message) { + if (initialMessages == null) { + super.dataTreeChanged(message); + } else { + processMessage(message); + } + } + + private void processMessage(final DataTreeChanged message) { + // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash + // it to other messages for later processing. + if (initialMessages.putIfAbsent(getSender(), message) == null) { + checkInitialConvergence(); + } else { + otherMessages.addLast(message); + } + } + + private void checkInitialConvergence() { + if (initialMessages.size() != shardCount) { + // We do not have initial state from all shards yet + return; + } + + /* + * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where + * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree + * is rooted at YangInstanceIdentifier.empty(), but their contents vary: + * + * 1) non-default shards contain immediate children of root from one module + * 2) default shard contains everything else + * 3) there is no overlap between shards + * + * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate + * view from each shard's perspective, but it does not reflect the aggregate reality. + * + * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all + * reported initial state reports, report that node as written and then report any additional deltas. + */ + final Deque initialChanges = new ArrayDeque<>(); + final DataContainerNodeBuilder rootBuilder = Builders.containerBuilder() + .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME)); + for (Object message : initialMessages.values()) { + if (message instanceof DataTreeChanged) { + final Collection changes = ((DataTreeChanged) message).getChanges(); + final DataTreeCandidate initial; + if (changes.size() != 1) { + final Iterator it = changes.iterator(); + initial = it.next(); + // Append to changes to report as initial. This should not be happening (often?). + it.forEachRemaining(initialChanges::addLast); + } else { + initial = Iterables.get(changes, 0); + } + + final NormalizedNode root = initial.getRootNode().getDataAfter().orElseThrow(); + verify(root instanceof ContainerNode, "Unexpected root node %s", root); + ((ContainerNode) root).getValue().forEach(rootBuilder::withChild); + } + } + // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible + initialMessages = null; + + // Prepend combined initial changed and report initial changes and clear the map + initialChanges.addFirst(DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(), + DataTreeCandidateNodes.written(rootBuilder.build()))); + super.dataTreeChanged(new DataTreeChanged(initialChanges)); + + // Now go through all messages we have held back and report them. Note we are removing them from the queue + // to allow them to be reclaimed as soon as possible. + for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) { + super.dataTreeChanged(message); + } + otherMessages = null; + } + + static Props props(final DOMDataTreeChangeListener instance, final int shardCount) { + return Props.create(RootDataTreeChangeListenerActor.class, shardCount); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java new file mode 100644 index 0000000000..6f4a5f1d06 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import com.google.common.collect.Maps; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class RootDataTreeChangeListenerProxy + extends AbstractListenerRegistration { + private abstract static class State { + + } + + private static final class ResolveShards extends State { + final Map localShards = new HashMap<>(); + final int shardCount; + + ResolveShards(final int shardCount) { + this.shardCount = shardCount; + } + } + + private static final class Subscribed extends State { + final List subscriptions; + final ActorRef dtclActor; + + Subscribed(final ActorRef dtclActor, final int shardCount) { + this.dtclActor = requireNonNull(dtclActor); + subscriptions = new ArrayList<>(shardCount); + } + } + + private static final class Terminated extends State { + + } + + private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class); + + private final ActorUtils actorUtils; + + @GuardedBy("this") + private State state; + + RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener, + final Set shardNames) { + super(listener); + this.actorUtils = requireNonNull(actorUtils); + this.state = new ResolveShards(shardNames.size()); + + for (String shardName : shardNames) { + actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef success) { + onFindLocalShardComplete(shardName, failure, success); + } + }, actorUtils.getClientDispatcher()); + } + } + + @Override + protected synchronized void removeRegistration() { + if (state instanceof Terminated) { + // Trivial case: we have already terminated on a failure, so this is a no-op + } else if (state instanceof ResolveShards) { + // Simple case: just mark the fact we were closed, terminating when resolution finishes + state = new Terminated(); + } else if (state instanceof Subscribed) { + terminate((Subscribed) state); + } else { + throw new IllegalStateException("Unhandled close in state " + state); + } + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure, + final ActorRef shard) { + if (state instanceof ResolveShards) { + localShardsResolved((ResolveShards) state, shardName, failure, shard); + } else { + LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state); + } + } + + @Holding("this") + private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure, + final ActorRef shard) { + final Object result = failure != null ? failure : verifyNotNull(shard); + LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result); + current.localShards.put(shardName, result); + + if (current.localShards.size() == current.shardCount) { + // We have all the responses we need + if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) { + reportFailure(current.localShards); + } else { + subscribeToShards(current.localShards); + } + } + } + + @Holding("this") + private void reportFailure(final Map localShards) { + for (Entry entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) { + final Throwable cause = (Throwable) entry.getValue(); + LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(), + getInstance(), cause); + } + state = new Terminated(); + } + + @Holding("this") + private void subscribeToShards(final Map localShards) { + // Safety check before we start doing anything + for (Entry entry : localShards.entrySet()) { + final Object obj = entry.getValue(); + verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey()); + } + + // Instantiate the DTCL actor and update state + final ActorRef dtclActor = actorUtils.getActorSystem().actorOf( + RootDataTreeChangeListenerActor.props(getInstance(), localShards.size()) + .withDispatcher(actorUtils.getNotificationDispatcherPath())); + state = new Subscribed(dtclActor, localShards.size()); + + // Subscribe to all shards + final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener( + YangInstanceIdentifier.empty(), dtclActor, true); + for (Entry entry : localShards.entrySet()) { + // Do not retain references to localShards + final String shardName = entry.getKey(); + final ActorRef shard = (ActorRef) entry.getValue(); + + actorUtils.executeOperationAsync(shard, regMessage, + actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable failure, final Object result) { + onShardSubscribed(shardName, failure, result); + } + }, actorUtils.getClientDispatcher()); + } + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) { + if (state instanceof Subscribed) { + final Subscribed current = (Subscribed) state; + if (failure != null) { + LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(), + shardName,getInstance(), failure); + terminate(current); + } else { + onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result); + } + } else { + terminateSubscription(shardName, failure, result); + } + } + + @Holding("this") + private void onSuccessfulSubscription(final Subscribed current, final String shardName, + final RegisterDataTreeNotificationListenerReply reply) { + final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath()); + LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor); + current.subscriptions.add(regActor); + } + + @Holding("this") + private void terminate(final Subscribed current) { + // Terminate the listener + current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + // Terminate all subscriptions + for (ActorSelection regActor : current.subscriptions) { + regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); + } + state = new Terminated(); + } + + // This method should not modify internal state + private void terminateSubscription(final String shardName, final Throwable failure, final Object result) { + if (failure == null) { + final ActorSelection regActor = actorUtils.actorSelection( + ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath()); + LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName, + regActor); + regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); + } else { + LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure); + } + } + + private String logContext() { + return actorUtils.getDatastoreContext().getLogicalStoreType().toString(); + } +}