From 0175a376323f6c916b5a4340a27751ebef22fc83 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 19 May 2020 16:37:47 +0200 Subject: [PATCH] Add support for root DTCL listening on all shards in DS MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Devide DTCLProxy into DTCLSingleShardProxy, DTCLMultiShardProxy and DTCLPrefixShardProxy to address the different registration mechanisms used in all three cases. JIRA: CONTROLLER-1932 Change-Id: I48732577f26fa5844b69a2feaddb02fe53909da7 Signed-off-by: Tibor Král Signed-off-by: Robert Varga --- .../AbstractDOMBrokerWriteTransaction.java | 4 +- .../cluster/datastore/AbstractDataStore.java | 18 +- .../DataTreeChangeListenerActor.java | 17 +- .../RootDataTreeChangeListenerActor.java | 138 +++++++++++ .../RootDataTreeChangeListenerProxy.java | 227 ++++++++++++++++++ 5 files changed, 394 insertions(+), 10 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java index 2e66571cea..39703f0f67 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java @@ -84,8 +84,10 @@ public abstract class AbstractDOMBrokerWriteTransaction data) { checkArgument(data != null, "Attempted to store null data at %s", path); final PathArgument lastArg = path.getLastPathArgument(); - checkArgument(lastArg == data.getIdentifier() || lastArg != null && lastArg.equals(data.getIdentifier()), + if (lastArg != null) { + checkArgument(lastArg.equals(data.getIdentifier()), "Instance identifier references %s but data identifier is %s", lastArg, data); + } } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index cddf234ffc..8537c85698 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; @@ -17,6 +18,7 @@ import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -159,6 +161,21 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface requireNonNull(treeId, "treeId should not be null"); requireNonNull(listener, "listener should not be null"); + /* + * We need to potentially deal with multi-shard composition for registration targeting the root of the data + * store. If that is the case, we delegate to a more complicated setup invol + */ + if (treeId.isEmpty()) { + // User is targeting root of the datastore. If there is more than one shard, we have to register with them + // all and perform data composition. + final Set shardNames = actorUtils.getConfiguration().getAllShardNames(); + if (shardNames.size() > 1) { + checkArgument(listener instanceof ClusteredDOMDataTreeChangeListener, + "Cannot listen on root without non-clustered listener %s", listener); + return new RootDataTreeChangeListenerProxy<>(actorUtils, listener, shardNames); + } + } + final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId); LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); @@ -169,7 +186,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface return listenerRegistrationProxy; } - @Override public DOMDataTreeCommitCohortRegistration registerCommitCohort( final DOMDataTreeIdentifier subtree, final C cohort) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java index 8df53b276a..5533fc8fad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java @@ -24,25 +24,26 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating * DataTreeChanged messages and dispatching their context to the user. */ -final class DataTreeChangeListenerActor extends AbstractUntypedActor { +class DataTreeChangeListenerActor extends AbstractUntypedActor { private final DOMDataTreeChangeListener listener; private final YangInstanceIdentifier registeredPath; + private boolean notificationsEnabled = false; private long notificationCount; private String logContext = ""; - private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, + DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { this.listener = requireNonNull(listener); this.registeredPath = requireNonNull(registeredPath); } @Override - protected void handleReceive(final Object message) { + protected final void handleReceive(final Object message) { if (message instanceof DataTreeChanged) { - dataChanged((DataTreeChanged)message); + dataTreeChanged((DataTreeChanged) message); } else if (message instanceof OnInitialData) { - onInitialData(); + onInitialData((OnInitialData) message); } else if (message instanceof EnableNotification) { enableNotification((EnableNotification) message); } else if (message instanceof GetInfo) { @@ -54,7 +55,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onInitialData() { + void onInitialData(final OnInitialData message) { LOG.debug("{}: Notifying onInitialData to listener {}", logContext, listener); try { @@ -65,7 +66,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void dataChanged(final DataTreeChanged message) { + void dataTreeChanged(final DataTreeChanged message) { // Do nothing if notifications are not enabled if (!notificationsEnabled) { LOG.debug("{}: Notifications not enabled for listener {} - dropping change notification", @@ -99,7 +100,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor { listener); } - public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { + static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) { return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java new file mode 100644 index 0000000000..963dea98af --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.collect.Iterables; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; +import org.opendaylight.controller.cluster.datastore.messages.OnInitialData; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor { + private final int shardCount; + + // Initial messages, retaining order in which we have received them + private Map initialMessages = new LinkedHashMap<>(); + private Deque otherMessages = new ArrayDeque<>(); + + private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) { + super(listener, YangInstanceIdentifier.empty()); + this.shardCount = shardCount; + } + + @Override + void onInitialData(final OnInitialData message) { + final ActorRef sender = getSender(); + verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender); + + final Object prev = initialMessages.put(sender, message); + verify(prev == null, "Received OnInitialData from %s after %s", sender, prev); + checkInitialConvergence(); + } + + @Override + void dataTreeChanged(final DataTreeChanged message) { + if (initialMessages == null) { + super.dataTreeChanged(message); + } else { + processMessage(message); + } + } + + private void processMessage(final DataTreeChanged message) { + // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash + // it to other messages for later processing. + if (initialMessages.putIfAbsent(getSender(), message) == null) { + checkInitialConvergence(); + } else { + otherMessages.addLast(message); + } + } + + private void checkInitialConvergence() { + if (initialMessages.size() != shardCount) { + // We do not have initial state from all shards yet + return; + } + + /* + * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where + * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree + * is rooted at YangInstanceIdentifier.empty(), but their contents vary: + * + * 1) non-default shards contain immediate children of root from one module + * 2) default shard contains everything else + * 3) there is no overlap between shards + * + * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate + * view from each shard's perspective, but it does not reflect the aggregate reality. + * + * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all + * reported initial state reports, report that node as written and then report any additional deltas. + */ + final Deque initialChanges = new ArrayDeque<>(); + final DataContainerNodeBuilder rootBuilder = Builders.containerBuilder() + .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME)); + for (Object message : initialMessages.values()) { + if (message instanceof DataTreeChanged) { + final Collection changes = ((DataTreeChanged) message).getChanges(); + final DataTreeCandidate initial; + if (changes.size() != 1) { + final Iterator it = changes.iterator(); + initial = it.next(); + // Append to changes to report as initial. This should not be happening (often?). + it.forEachRemaining(initialChanges::addLast); + } else { + initial = Iterables.get(changes, 0); + } + + final NormalizedNode root = initial.getRootNode().getDataAfter().orElseThrow(); + verify(root instanceof ContainerNode, "Unexpected root node %s", root); + ((ContainerNode) root).getValue().forEach(rootBuilder::withChild); + } + } + // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible + initialMessages = null; + + // Prepend combined initial changed and report initial changes and clear the map + initialChanges.addFirst(DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(), + DataTreeCandidateNodes.written(rootBuilder.build()))); + super.dataTreeChanged(new DataTreeChanged(initialChanges)); + + // Now go through all messages we have held back and report them. Note we are removing them from the queue + // to allow them to be reclaimed as soon as possible. + for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) { + super.dataTreeChanged(message); + } + otherMessages = null; + } + + static Props props(final DOMDataTreeChangeListener instance, final int shardCount) { + return Props.create(RootDataTreeChangeListenerActor.class, shardCount); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java new file mode 100644 index 0000000000..6f4a5f1d06 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import com.google.common.collect.Maps; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class RootDataTreeChangeListenerProxy + extends AbstractListenerRegistration { + private abstract static class State { + + } + + private static final class ResolveShards extends State { + final Map localShards = new HashMap<>(); + final int shardCount; + + ResolveShards(final int shardCount) { + this.shardCount = shardCount; + } + } + + private static final class Subscribed extends State { + final List subscriptions; + final ActorRef dtclActor; + + Subscribed(final ActorRef dtclActor, final int shardCount) { + this.dtclActor = requireNonNull(dtclActor); + subscriptions = new ArrayList<>(shardCount); + } + } + + private static final class Terminated extends State { + + } + + private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class); + + private final ActorUtils actorUtils; + + @GuardedBy("this") + private State state; + + RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener, + final Set shardNames) { + super(listener); + this.actorUtils = requireNonNull(actorUtils); + this.state = new ResolveShards(shardNames.size()); + + for (String shardName : shardNames) { + actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef success) { + onFindLocalShardComplete(shardName, failure, success); + } + }, actorUtils.getClientDispatcher()); + } + } + + @Override + protected synchronized void removeRegistration() { + if (state instanceof Terminated) { + // Trivial case: we have already terminated on a failure, so this is a no-op + } else if (state instanceof ResolveShards) { + // Simple case: just mark the fact we were closed, terminating when resolution finishes + state = new Terminated(); + } else if (state instanceof Subscribed) { + terminate((Subscribed) state); + } else { + throw new IllegalStateException("Unhandled close in state " + state); + } + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure, + final ActorRef shard) { + if (state instanceof ResolveShards) { + localShardsResolved((ResolveShards) state, shardName, failure, shard); + } else { + LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state); + } + } + + @Holding("this") + private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure, + final ActorRef shard) { + final Object result = failure != null ? failure : verifyNotNull(shard); + LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result); + current.localShards.put(shardName, result); + + if (current.localShards.size() == current.shardCount) { + // We have all the responses we need + if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) { + reportFailure(current.localShards); + } else { + subscribeToShards(current.localShards); + } + } + } + + @Holding("this") + private void reportFailure(final Map localShards) { + for (Entry entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) { + final Throwable cause = (Throwable) entry.getValue(); + LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(), + getInstance(), cause); + } + state = new Terminated(); + } + + @Holding("this") + private void subscribeToShards(final Map localShards) { + // Safety check before we start doing anything + for (Entry entry : localShards.entrySet()) { + final Object obj = entry.getValue(); + verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey()); + } + + // Instantiate the DTCL actor and update state + final ActorRef dtclActor = actorUtils.getActorSystem().actorOf( + RootDataTreeChangeListenerActor.props(getInstance(), localShards.size()) + .withDispatcher(actorUtils.getNotificationDispatcherPath())); + state = new Subscribed(dtclActor, localShards.size()); + + // Subscribe to all shards + final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener( + YangInstanceIdentifier.empty(), dtclActor, true); + for (Entry entry : localShards.entrySet()) { + // Do not retain references to localShards + final String shardName = entry.getKey(); + final ActorRef shard = (ActorRef) entry.getValue(); + + actorUtils.executeOperationAsync(shard, regMessage, + actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() { + @Override + public void onComplete(final Throwable failure, final Object result) { + onShardSubscribed(shardName, failure, result); + } + }, actorUtils.getClientDispatcher()); + } + } + + @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "https://github.com/spotbugs/spotbugs/issues/811") + private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) { + if (state instanceof Subscribed) { + final Subscribed current = (Subscribed) state; + if (failure != null) { + LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(), + shardName,getInstance(), failure); + terminate(current); + } else { + onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result); + } + } else { + terminateSubscription(shardName, failure, result); + } + } + + @Holding("this") + private void onSuccessfulSubscription(final Subscribed current, final String shardName, + final RegisterDataTreeNotificationListenerReply reply) { + final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath()); + LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor); + current.subscriptions.add(regActor); + } + + @Holding("this") + private void terminate(final Subscribed current) { + // Terminate the listener + current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + // Terminate all subscriptions + for (ActorSelection regActor : current.subscriptions) { + regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); + } + state = new Terminated(); + } + + // This method should not modify internal state + private void terminateSubscription(final String shardName, final Throwable failure, final Object result) { + if (failure == null) { + final ActorSelection regActor = actorUtils.actorSelection( + ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath()); + LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName, + regActor); + regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender()); + } else { + LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure); + } + } + + private String logContext() { + return actorUtils.getDatastoreContext().getLogicalStoreType().toString(); + } +} -- 2.36.6