From 32b322afd58f120a78208c939a01422aa224d0cf Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sat, 12 Nov 2016 04:58:46 +0100 Subject: [PATCH] BUG-5280: add SimpleDataStoreClientBehavior Module-based sharding has a more complex run-time strategy than the CDT sharding, which instantiates a client-per-shard. Create a dedicated behavior and resolver to take advantage of this simplification. Change-Id: I289e0c8d914f1ab9a9d8992b4f3a7bd4451af3f9 Signed-off-by: Robert Varga --- .../client/AbstractClientConnection.java | 8 +- .../AbstractReceivingClientConnection.java | 5 +- .../access/client/ClientActorBehavior.java | 20 +-- .../access/client/ConnectionEntry.java | 11 ++ .../client/TransmittedConnectionEntry.java | 6 + .../dds/AbstractDataStoreClientActor.java | 53 ++++++++ .../dds/AbstractDataStoreClientBehavior.java | 6 +- .../dds/AbstractShardBackendResolver.java | 119 ++++++++++++++++++ ...aStoreClient.java => DataStoreClient.java} | 2 +- .../dds/DistributedDataStoreClientActor.java | 31 +---- .../actors/dds/GetClientRequest.java | 2 +- .../actors/dds/LocalProxyTransaction.java | 6 +- .../dds/ModuleShardBackendResolver.java | 100 ++------------- .../dds/SimpleDataStoreClientActor.java | 46 +++++++ .../dds/SimpleDataStoreClientBehavior.java | 37 ++++++ .../dds/SimpleShardBackendResolver.java | 83 ++++++++++++ .../datastore/DistributedDataStore.java | 4 +- 17 files changed, 403 insertions(+), 136 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java rename opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/{DistributedDataStoreClient.java => DataStoreClient.java} (93%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 170b1507a9..0e9382dbba 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -157,11 +157,11 @@ public abstract class AbstractClientConnection { } /** - * Check queue timeouts and return true if a timeout has occurred. + * Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress + * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted. * - * @return True if a timeout occurred - * @throws NoProgressException if the queue failed to make progress for an extended - * time. + * @param current Current behavior + * @return Next behavior to use */ @VisibleForTesting final ClientActorBehavior runTimer(final ClientActorBehavior current) { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java index 180ac942a3..85ca5fee65 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java @@ -108,7 +108,10 @@ abstract class AbstractReceivingClientConnection extends } lastProgress = readTime(); - maybeEntry.get().complete(envelope.getMessage()); + + final TransmittedConnectionEntry entry = maybeEntry.get(); + LOG.debug("Completing {} with {}", entry, envelope); + entry.complete(envelope.getMessage()); // We have freed up a slot, try to transmit something final int toSend = remoteMaxMessages() - inflight.size(); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index e4b73b14f0..97d312ce39 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -16,6 +16,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; @@ -100,14 +101,19 @@ public abstract class ClientActorBehavior extends return onCommand(command); } - private void onResponse(final ResponseEnvelope response) { - final WritableIdentifier id = response.getMessage().getTarget(); - - // FIXME: this will need to be updated for other Request/Response types to extract cookie - Preconditions.checkArgument(id instanceof TransactionIdentifier); - final TransactionIdentifier txId = (TransactionIdentifier) id; + private static long extractCookie(final WritableIdentifier id) { + if (id instanceof TransactionIdentifier) { + return ((TransactionIdentifier) id).getHistoryId().getCookie(); + } else if (id instanceof LocalHistoryIdentifier) { + return ((LocalHistoryIdentifier) id).getCookie(); + } else { + throw new IllegalArgumentException("Unhandled identifier " + id); + } + } - final AbstractClientConnection connection = connections.get(txId.getHistoryId().getCookie()); + private void onResponse(final ResponseEnvelope response) { + final long cookie = extractCookie(response.getMessage().getTarget()); + final AbstractClientConnection connection = connections.get(cookie); if (connection != null) { connection.receiveResponse(response); } else { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java index 64586f05ee..547537764a 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; +import com.google.common.base.MoreObjects; +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import java.util.function.Consumer; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -51,4 +53,13 @@ public class ConnectionEntry implements Immutable { final long getEnqueuedTicks() { return enqueuedTicks; } + + @Override + public final String toString() { + return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); + } + + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return toStringHelper.add("request", request); + } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java index 34cbd49536..c14df6ecf3 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.access.client; +import com.google.common.base.MoreObjects.ToStringHelper; + /** * A {@link ConnectionEntry} which has been transmitted. It holds additional information about the last transmission. * @@ -37,4 +39,8 @@ final class TransmittedConnectionEntry extends ConnectionEntry { return txTicks; } + @Override + ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + return super.addToStringAttributes(toStringHelper).add("sessionId", sessionId).add("txSequence", txSequence); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java new file mode 100644 index 0000000000..5e7fade8eb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import akka.util.Timeout; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.base.Verify; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.client.AbstractClientActor; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import scala.Function1; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + +public abstract class AbstractDataStoreClientActor extends AbstractClientActor { + private static final Function1 GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t)); + + private final ActorContext actorContext; + + AbstractDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) { + super(frontendId); + this.actorContext = Preconditions.checkNotNull(actorContext); + } + + @Override + protected final AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context) { + return Verify.verifyNotNull(initialBehavior(context, actorContext)); + } + + abstract AbstractDataStoreClientBehavior initialBehavior(ClientActorContext context, ActorContext actorContext); + + @SuppressWarnings("checkstyle:IllegalCatch") + public static DataStoreClient getDistributedDataStoreClient(@Nonnull final ActorRef actor, + final long timeout, final TimeUnit unit) { + try { + return (DataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY, + Timeout.apply(timeout, unit)), Duration.Inf()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java index a84715c843..5a34b3b77e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory; * *

* This class is not visible outside of this package because it breaks the actor containment. Services provided to - * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}. + * Java world outside of actor containment are captured in {@link DataStoreClient}. * *

- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract. + * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract. * When touching internal state, be mindful of the execution context from which execution context, Actor * or POJO, is the state being accessed or modified. * @@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior - implements DistributedDataStoreClient { + implements DataStoreClient { private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class); private final Map histories = new ConcurrentHashMap<>(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java new file mode 100644 index 0000000000..a81cf337ac --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import akka.util.Timeout; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.primitives.UnsignedLong; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; +import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.compat.java8.FutureConverters; + +/** + * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named + * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding + * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}. + * + * @author Robert Varga + */ +@ThreadSafe +abstract class AbstractShardBackendResolver extends BackendInfoResolver { + static final class ShardState { + private final CompletionStage stage; + @GuardedBy("this") + private ShardBackendInfo result; + + ShardState(final CompletionStage stage) { + this.stage = Preconditions.checkNotNull(stage); + stage.whenComplete(this::onStageResolved); + } + + @Nonnull CompletionStage getStage() { + return stage; + } + + @Nullable synchronized ShardBackendInfo getResult() { + return result; + } + + private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) { + if (failure == null) { + this.result = Preconditions.checkNotNull(result); + } else { + LOG.warn("Failed to resolve shard", failure); + } + } + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class); + + /** + * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. + * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain + * non-operational. + */ + // TODO: maybe make this configurable somehow? + private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES); + + private final AtomicLong nextSessionId = new AtomicLong(); + private final Function1 connectFunction; + private final ActorContext actorContext; + + // FIXME: we really need just ActorContext.findPrimaryShardAsync() + AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { + this.actorContext = Preconditions.checkNotNull(actorContext); + this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON, + ABIVersion.current())); + } + + protected final void flushCache(final String shardName) { + actorContext.getPrimaryShardInfoCache().remove(shardName); + } + + protected final ShardState resolveBackendInfo(final String shardName, final long cookie) { + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); + + return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> { + LOG.debug("Looking up primary info for {} from {}", shardName, i); + return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); + }).thenApply(response -> { + if (response instanceof RequestFailure) { + final RequestFailure failure = (RequestFailure) response; + LOG.debug("Connect request failed {}", failure, failure.getCause()); + throw Throwables.propagate(failure.getCause()); + } + + LOG.debug("Resolved backend information to {}", response); + + Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); + final ConnectClientSuccess success = (ConnectClientSuccess) response; + + return new ShardBackendInfo(success.getBackend(), + nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), + success.getDataTree(), success.getMaxMessages()); + })); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java similarity index 93% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java index 273fd89919..63dc87b4b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java @@ -22,7 +22,7 @@ import org.opendaylight.yangtools.concepts.Identifiable; * @author Robert Varga */ @Beta -public interface DistributedDataStoreClient extends Identifiable, AutoCloseable { +public interface DataStoreClient extends Identifiable, AutoCloseable { @Override @Nonnull ClientIdentifier getIdentifier(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java index 45875baa15..69e0d5db50 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java @@ -7,41 +7,27 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; import akka.actor.Props; -import akka.util.Timeout; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.client.AbstractClientActor; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; import org.opendaylight.controller.cluster.access.concepts.FrontendType; import org.opendaylight.controller.cluster.access.concepts.MemberName; -import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import scala.Function1; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; /** * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore. * * @author Robert Varga */ -public final class DistributedDataStoreClientActor extends AbstractClientActor { - private static final Function1 GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t)); - - private final ActorContext actorContext; - +public final class DistributedDataStoreClientActor extends AbstractDataStoreClientActor { private DistributedDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) { - super(frontendId); - this.actorContext = Preconditions.checkNotNull(actorContext); + super(frontendId, actorContext); } @Override - protected DistributedDataStoreClientBehavior initialBehavior(final ClientActorContext context) { + AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context, final ActorContext actorContext) { return new DistributedDataStoreClientBehavior(context, actorContext); } @@ -52,15 +38,4 @@ public final class DistributedDataStoreClientActor extends AbstractClientActor { return Props.create(DistributedDataStoreClientActor.class, () -> new DistributedDataStoreClientActor(frontendId, ctx)); } - - @SuppressWarnings("checkstyle:IllegalCatch") - public static DistributedDataStoreClient getDistributedDataStoreClient(@Nonnull final ActorRef actor, - final long timeout, final TimeUnit unit) { - try { - return (DistributedDataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY, - Timeout.apply(timeout, unit)), Duration.Inf()); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java index 2b6785496e..db91424747 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java @@ -11,7 +11,7 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; /** - * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DistributedDataStoreClient}. + * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DataStoreClient}. * * @author Robert Varga */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 7b652f474b..3869e66d3e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -33,6 +33,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; @@ -61,6 +62,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { private final TransactionIdentifier identifier; private CursorAwareDataTreeModification modification; + private CursorAwareDataTreeSnapshot sealedModification; LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, final CursorAwareDataTreeModification modification) { @@ -125,10 +127,12 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override void doSeal() { modification.ready(); + sealedModification = modification; } DataTreeSnapshot getSnapshot() { - return modification; + Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier); + return sealedModification; } private void applyModifyTransactionRequest(final ModifyTransactionRequest request, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 9e6485b296..dde2292241 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,39 +7,22 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; -import akka.util.Timeout; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableBiMap.Builder; -import com.google.common.primitives.UnsignedLong; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; -import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; -import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; -import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestFailure; -import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.compat.java8.FutureConverters; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -48,51 +31,11 @@ import scala.compat.java8.FutureConverters; * * @author Robert Varga */ -@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION", - justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended") @ThreadSafe -final class ModuleShardBackendResolver extends BackendInfoResolver { - private static final class Entry { - private final CompletionStage stage; - @GuardedBy("this") - private ShardBackendInfo result; - - Entry(final CompletionStage stage) { - this.stage = Preconditions.checkNotNull(stage); - stage.whenComplete(this::onStageResolved); - } - - @Nonnull CompletionStage getStage() { - return stage; - } - - synchronized @Nullable ShardBackendInfo getResult() { - return result; - } - - private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) { - if (failure == null) { - this.result = Preconditions.checkNotNull(result); - } else { - LOG.warn("Failed to resolve shard", failure); - } - } - } - - private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); +final class ModuleShardBackendResolver extends AbstractShardBackendResolver { private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); - /** - * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. - * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain - * non-operational. - */ - // TODO: maybe make this configurable somehow? - private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES); - - private final ConcurrentMap backends = new ConcurrentHashMap<>(); - private final AtomicLong nextSessionId = new AtomicLong(); - private final Function1 connectFunction; + private final ConcurrentMap backends = new ConcurrentHashMap<>(); private final ActorContext actorContext; @GuardedBy("this") @@ -102,9 +45,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver new ConnectClientRequest(clientId, t, ABIVersion.BORON, - ABIVersion.current())); } Long resolveShardForPath(final YangInstanceIdentifier path) { @@ -127,54 +69,36 @@ final class ModuleShardBackendResolver extends BackendInfoResolver resolveBackendInfo(final Long cookie) { + private ShardState resolveBackendInfo(final Long cookie) { final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return NULL_FUTURE; + return null; } LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { - LOG.debug("Looking up primary info for {} from {}", shardName, info); - return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); - }).thenApply(response -> { - if (response instanceof RequestFailure) { - final RequestFailure failure = (RequestFailure) response; - LOG.debug("Connect request failed {}", failure, failure.getCause()); - throw Throwables.propagate(failure.getCause()); - } - - LOG.debug("Resolved backend information to {}", response); - - Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response); - final ConnectClientSuccess success = (ConnectClientSuccess) response; - - return new ShardBackendInfo(success.getBackend(), - nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), - success.getDataTree(), success.getMaxMessages()); - }); + return resolveBackendInfo(shardName, cookie); } @Override - public CompletionStage getBackendInfo(final Long cookie) { - return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage(); + public CompletionStage getBackendInfo(final Long cookie) { + return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage(); } @Override - public CompletionStage refreshBackendInfo(final Long cookie, + public CompletionStage refreshBackendInfo(final Long cookie, final ShardBackendInfo staleInfo) { - final Entry existing = backends.get(cookie); + final ShardState existing = backends.get(cookie); if (existing != null) { if (!staleInfo.equals(existing.getResult())) { return existing.getStage(); } LOG.debug("Invalidating backend information {}", staleInfo); - actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName()); + flushCache(staleInfo.getShardName()); - LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo); + LOG.trace("Invalidated cache %s", staleInfo); backends.remove(cookie, existing); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java new file mode 100644 index 0000000000..7068b274a1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.Props; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.client.AbstractClientActor; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; + +/** + * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore. + * + * @author Robert Varga + */ +public final class SimpleDataStoreClientActor extends AbstractDataStoreClientActor { + private final String shardName; + + private SimpleDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext, + final String shardName) { + super(frontendId, actorContext); + this.shardName = Preconditions.checkNotNull(shardName); + } + + @Override + AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context, final ActorContext actorContext) { + return new SimpleDataStoreClientBehavior(context, actorContext, shardName); + } + + public static Props props(@Nonnull final MemberName memberName, @Nonnull final String storeName, + final ActorContext ctx, final String shardName) { + final String name = "datastore-" + storeName; + final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name)); + return Props.create(SimpleDataStoreClientActor.class, + () -> new SimpleDataStoreClientActor(frontendId, ctx, shardName)); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java new file mode 100644 index 0000000000..d6818d3382 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import org.opendaylight.controller.cluster.access.client.ClientActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * {@link AbstractDataStoreClientBehavior} which connects to a single shard only. + * + * @author Robert Varga + */ +final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavior { + // Pre-boxed instance + private static final Long ZERO = Long.valueOf(0); + + private SimpleDataStoreClientBehavior(final ClientActorContext context, + final SimpleShardBackendResolver resolver) { + super(context, resolver); + } + + SimpleDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext, + final String shardName) { + this(context, new SimpleShardBackendResolver(context.getIdentifier(), actorContext, shardName)); + } + + @Override + Long resolveShardForPath(final YangInstanceIdentifier path) { + return ZERO; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java new file mode 100644 index 0000000000..056a1ea7e2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import java.util.concurrent.CompletionStage; +import javax.annotation.concurrent.ThreadSafe; +import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Unlike the full + * {@link ModuleShardBackendResolver}, this resolver is used in situations where the client corresponds exactly to one + * backend shard, e.g. there is only one fixed cookie assigned and the operation path is not consulted at all. + * + * @author Robert Varga + */ +@ThreadSafe +final class SimpleShardBackendResolver extends AbstractShardBackendResolver { + private static final Logger LOG = LoggerFactory.getLogger(SimpleShardBackendResolver.class); + + private final String shardName; + + private volatile ShardState state; + + // FIXME: we really need just ActorContext.findPrimaryShardAsync() + SimpleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext, + final String shardName) { + super(clientId, actorContext); + this.shardName = Preconditions.checkNotNull(shardName); + } + + private CompletionStage getBackendInfo(final long cookie) { + Preconditions.checkArgument(cookie == 0); + + ShardState local = state; + if (local == null) { + synchronized (this) { + local = state; + if (local == null) { + local = resolveBackendInfo(shardName, 0); + state = local; + } + } + } + + return local.getStage(); + } + + @Override + public CompletionStage getBackendInfo(final Long cookie) { + return getBackendInfo(cookie.longValue()); + } + + @Override + public CompletionStage refreshBackendInfo(final Long cookie, + final ShardBackendInfo staleInfo) { + + final ShardState existing = state; + if (existing != null) { + if (!staleInfo.equals(existing.getResult())) { + return existing.getStage(); + } + + synchronized (this) { + LOG.debug("Invalidating backend information {}", staleInfo); + flushCache(shardName); + LOG.trace("Invalidated cache %s", staleInfo); + state = null; + } + } + + return getBackendInfo(cookie); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 8bfad09ed8..4ae64ac582 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -19,7 +19,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -73,7 +73,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1); private final ClientIdentifier identifier; - private final DistributedDataStoreClient client; + private final DataStoreClient client; private final TransactionContextFactory txContextFactory; -- 2.36.6