BUG-5280: add SimpleDataStoreClientBehavior 92/48292/32
authorRobert Varga <rovarga@cisco.com>
Sat, 12 Nov 2016 03:58:46 +0000 (04:58 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 28 Nov 2016 15:45:45 +0000 (15:45 +0000)
Module-based sharding has a more complex run-time
strategy than the CDT sharding, which instantiates
a client-per-shard.

Create a dedicated behavior and resolver to take
advantage of this simplification.

Change-Id: I289e0c8d914f1ab9a9d8992b4f3a7bd4451af3f9
Signed-off-by: Robert Varga <rovarga@cisco.com>
17 files changed:
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractReceivingClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectionEntry.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmittedConnectionEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java with 93% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java

index 170b150..0e9382d 100644 (file)
@@ -157,11 +157,11 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     /**
-     * Check queue timeouts and return true if a timeout has occurred.
+     * Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress
+     * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
      *
-     * @return True if a timeout occurred
-     * @throws NoProgressException if the queue failed to make progress for an extended
-     *                             time.
+     * @param current Current behavior
+     * @return Next behavior to use
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
index 180ac94..85ca5fe 100644 (file)
@@ -108,7 +108,10 @@ abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends
         }
 
         lastProgress = readTime();
-        maybeEntry.get().complete(envelope.getMessage());
+
+        final TransmittedConnectionEntry entry = maybeEntry.get();
+        LOG.debug("Completing {} with {}", entry, envelope);
+        entry.complete(envelope.getMessage());
 
         // We have freed up a slot, try to transmit something
         final int toSend = remoteMaxMessages() - inflight.size();
index e4b73b1..97d312c 100644 (file)
@@ -16,6 +16,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
@@ -100,14 +101,19 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
         return onCommand(command);
     }
 
-    private void onResponse(final ResponseEnvelope<?> response) {
-        final WritableIdentifier id = response.getMessage().getTarget();
-
-        // FIXME: this will need to be updated for other Request/Response types to extract cookie
-        Preconditions.checkArgument(id instanceof TransactionIdentifier);
-        final TransactionIdentifier txId = (TransactionIdentifier) id;
+    private static long extractCookie(final WritableIdentifier id) {
+        if (id instanceof TransactionIdentifier) {
+            return ((TransactionIdentifier) id).getHistoryId().getCookie();
+        } else if (id instanceof LocalHistoryIdentifier) {
+            return ((LocalHistoryIdentifier) id).getCookie();
+        } else {
+            throw new IllegalArgumentException("Unhandled identifier " + id);
+        }
+    }
 
-        final AbstractClientConnection<T> connection = connections.get(txId.getHistoryId().getCookie());
+    private void onResponse(final ResponseEnvelope<?> response) {
+        final long cookie = extractCookie(response.getMessage().getTarget());
+        final AbstractClientConnection<T> connection = connections.get(cookie);
         if (connection != null) {
             connection.receiveResponse(response);
         } else {
index 64586f0..5475377 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -51,4 +53,13 @@ public class ConnectionEntry implements Immutable {
     final long getEnqueuedTicks() {
         return enqueuedTicks;
     }
+
+    @Override
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+    }
+
+    ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        return toStringHelper.add("request", request);
+    }
 }
index 34cbd49..c14df6e 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
+import com.google.common.base.MoreObjects.ToStringHelper;
+
 /**
  * A {@link ConnectionEntry} which has been transmitted. It holds additional information about the last transmission.
  *
@@ -37,4 +39,8 @@ final class TransmittedConnectionEntry extends ConnectionEntry {
         return txTicks;
     }
 
+    @Override
+    ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        return super.addToStringAttributes(toStringHelper).add("sessionId", sessionId).add("txSequence", txSequence);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientActor.java
new file mode 100644 (file)
index 0000000..5e7fade
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.client.AbstractClientActor;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import scala.Function1;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+
+public abstract class AbstractDataStoreClientActor extends AbstractClientActor {
+    private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
+
+    private final ActorContext actorContext;
+
+    AbstractDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) {
+        super(frontendId);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+    }
+
+    @Override
+    protected final AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context) {
+        return Verify.verifyNotNull(initialBehavior(context, actorContext));
+    }
+
+    abstract AbstractDataStoreClientBehavior initialBehavior(ClientActorContext context, ActorContext actorContext);
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public static DataStoreClient getDistributedDataStoreClient(@Nonnull final ActorRef actor,
+            final long timeout, final TimeUnit unit) {
+        try {
+            return (DataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
+                Timeout.apply(timeout, unit)), Duration.Inf());
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}
index a84715c..5a34b3b 100644 (file)
@@ -33,10 +33,10 @@ import org.slf4j.LoggerFactory;
  *
  * <p>
  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
- * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
+ * Java world outside of actor containment are captured in {@link DataStoreClient}.
  *
  * <p>
- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
  *            When touching internal state, be mindful of the execution context from which execution context, Actor
  *            or POJO, is the state being accessed or modified.
  *
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
-        implements DistributedDataStoreClient {
+        implements DataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
 
     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.java
new file mode 100644 (file)
index 0000000..a81cf33
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.compat.java8.FutureConverters;
+
+/**
+ * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
+ * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
+ * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+    static final class ShardState {
+        private final CompletionStage<ShardBackendInfo> stage;
+        @GuardedBy("this")
+        private ShardBackendInfo result;
+
+        ShardState(final CompletionStage<ShardBackendInfo> stage) {
+            this.stage = Preconditions.checkNotNull(stage);
+            stage.whenComplete(this::onStageResolved);
+        }
+
+        @Nonnull CompletionStage<ShardBackendInfo> getStage() {
+            return stage;
+        }
+
+        @Nullable synchronized ShardBackendInfo getResult() {
+            return result;
+        }
+
+        private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
+            if (failure == null) {
+                this.result = Preconditions.checkNotNull(result);
+            } else {
+                LOG.warn("Failed to resolve shard", failure);
+            }
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
+
+    /**
+     * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
+     * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
+     * non-operational.
+     */
+    // TODO: maybe make this configurable somehow?
+    private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+
+    private final AtomicLong nextSessionId = new AtomicLong();
+    private final Function1<ActorRef, ?> connectFunction;
+    private final ActorContext actorContext;
+
+    // FIXME: we really need just ActorContext.findPrimaryShardAsync()
+    AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+            ABIVersion.current()));
+    }
+
+    protected final void flushCache(final String shardName) {
+        actorContext.getPrimaryShardInfoCache().remove(shardName);
+    }
+
+    protected final ShardState resolveBackendInfo(final String shardName, final long cookie) {
+        LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+
+        return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> {
+            LOG.debug("Looking up primary info for {} from {}", shardName, i);
+            return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
+        }).thenApply(response -> {
+            if (response instanceof RequestFailure) {
+                final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
+                LOG.debug("Connect request failed {}", failure, failure.getCause());
+                throw Throwables.propagate(failure.getCause());
+            }
+
+            LOG.debug("Resolved backend information to {}", response);
+
+            Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
+            final ConnectClientSuccess success = (ConnectClientSuccess) response;
+
+            return new ShardBackendInfo(success.getBackend(),
+                nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
+                success.getDataTree(), success.getMaxMessages());
+        }));
+    }
+}
index 45875ba..69e0d5d 100644 (file)
@@ -7,41 +7,27 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.util.Timeout;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.client.AbstractClientActor;
 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import scala.Function1;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
 
 /**
  * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
  *
  * @author Robert Varga
  */
-public final class DistributedDataStoreClientActor extends AbstractClientActor {
-    private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
-
-    private final ActorContext actorContext;
-
+public final class DistributedDataStoreClientActor extends AbstractDataStoreClientActor {
     private DistributedDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) {
-        super(frontendId);
-        this.actorContext = Preconditions.checkNotNull(actorContext);
+        super(frontendId, actorContext);
     }
 
     @Override
-    protected DistributedDataStoreClientBehavior initialBehavior(final ClientActorContext context) {
+    AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context, final ActorContext actorContext) {
         return new DistributedDataStoreClientBehavior(context, actorContext);
     }
 
@@ -52,15 +38,4 @@ public final class DistributedDataStoreClientActor extends AbstractClientActor {
         return Props.create(DistributedDataStoreClientActor.class,
             () -> new DistributedDataStoreClientActor(frontendId, ctx));
     }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public static DistributedDataStoreClient getDistributedDataStoreClient(@Nonnull final ActorRef actor,
-            final long timeout, final TimeUnit unit) {
-        try {
-            return (DistributedDataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
-                Timeout.apply(timeout, unit)), Duration.Inf());
-        } catch (Exception e) {
-            throw Throwables.propagate(e);
-        }
-    }
 }
index 2b67854..db91424 100644 (file)
@@ -11,7 +11,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 
 /**
- * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DistributedDataStoreClient}.
+ * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DataStoreClient}.
  *
  * @author Robert Varga
  */
index 7b652f4..3869e66 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -61,6 +62,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     private final TransactionIdentifier identifier;
 
     private CursorAwareDataTreeModification modification;
+    private CursorAwareDataTreeSnapshot sealedModification;
 
     LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
         final CursorAwareDataTreeModification modification) {
@@ -125,10 +127,12 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     @Override
     void doSeal() {
         modification.ready();
+        sealedModification = modification;
     }
 
     DataTreeSnapshot getSnapshot() {
-        return modification;
+        Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier);
+        return sealedModification;
     }
 
     private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
index 9e6485b..dde2292 100644 (file)
@@ -7,39 +7,22 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
-import akka.util.Timeout;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableBiMap.Builder;
-import com.google.common.primitives.UnsignedLong;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.compat.java8.FutureConverters;
 
 /**
  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
@@ -48,51 +31,11 @@ import scala.compat.java8.FutureConverters;
  *
  * @author Robert Varga
  */
-@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
-                    justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended")
 @ThreadSafe
-final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
-    private static final class Entry {
-        private final CompletionStage<ShardBackendInfo> stage;
-        @GuardedBy("this")
-        private ShardBackendInfo result;
-
-        Entry(final CompletionStage<ShardBackendInfo> stage) {
-            this.stage = Preconditions.checkNotNull(stage);
-            stage.whenComplete(this::onStageResolved);
-        }
-
-        @Nonnull CompletionStage<ShardBackendInfo> getStage() {
-            return stage;
-        }
-
-        synchronized @Nullable ShardBackendInfo getResult() {
-            return result;
-        }
-
-        private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
-            if (failure == null) {
-                this.result = Preconditions.checkNotNull(result);
-            } else {
-                LOG.warn("Failed to resolve shard", failure);
-            }
-        }
-    }
-
-    private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
+final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
 
-    /**
-     * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
-     * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
-     * non-operational.
-     */
-    // TODO: maybe make this configurable somehow?
-    private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
-
-    private final ConcurrentMap<Long, Entry> backends = new ConcurrentHashMap<>();
-    private final AtomicLong nextSessionId = new AtomicLong();
-    private final Function1<ActorRef, ?> connectFunction;
+    private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
     private final ActorContext actorContext;
 
     @GuardedBy("this")
@@ -102,9 +45,8 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
     ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
+        super(clientId, actorContext);
         this.actorContext = Preconditions.checkNotNull(actorContext);
-        this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
-            ABIVersion.current()));
     }
 
     Long resolveShardForPath(final YangInstanceIdentifier path) {
@@ -127,54 +69,36 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
         return cookie;
     }
 
-    private CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+    private ShardState resolveBackendInfo(final Long cookie) {
         final String shardName = shards.inverse().get(cookie);
         if (shardName == null) {
             LOG.warn("Failing request for non-existent cookie {}", cookie);
-            return NULL_FUTURE;
+            return null;
         }
 
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
 
-        return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
-            LOG.debug("Looking up primary info for {} from {}", shardName, info);
-            return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
-        }).thenApply(response -> {
-            if (response instanceof RequestFailure) {
-                final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
-                LOG.debug("Connect request failed {}", failure, failure.getCause());
-                throw Throwables.propagate(failure.getCause());
-            }
-
-            LOG.debug("Resolved backend information to {}", response);
-
-            Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
-            final ConnectClientSuccess success = (ConnectClientSuccess) response;
-
-            return new ShardBackendInfo(success.getBackend(),
-                nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
-                success.getDataTree(), success.getMaxMessages());
-        });
+        return resolveBackendInfo(shardName, cookie);
     }
 
     @Override
-    public CompletionStage<? extends ShardBackendInfo> getBackendInfo(final Long cookie) {
-        return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage();
+    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+        return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
     }
 
     @Override
-    public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+    public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
             final ShardBackendInfo staleInfo) {
-        final Entry existing = backends.get(cookie);
+        final ShardState existing = backends.get(cookie);
         if (existing != null) {
             if (!staleInfo.equals(existing.getResult())) {
                 return existing.getStage();
             }
 
             LOG.debug("Invalidating backend information {}", staleInfo);
-            actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName());
+            flushCache(staleInfo.getShardName());
 
-            LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo);
+            LOG.trace("Invalidated cache %s", staleInfo);
             backends.remove(cookie, existing);
         }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientActor.java
new file mode 100644 (file)
index 0000000..7068b27
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.client.AbstractClientActor;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+/**
+ * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
+ *
+ * @author Robert Varga
+ */
+public final class SimpleDataStoreClientActor extends AbstractDataStoreClientActor {
+    private final String shardName;
+
+    private SimpleDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext,
+            final String shardName) {
+        super(frontendId, actorContext);
+        this.shardName = Preconditions.checkNotNull(shardName);
+    }
+
+    @Override
+    AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context, final ActorContext actorContext) {
+        return new SimpleDataStoreClientBehavior(context, actorContext, shardName);
+    }
+
+    public static Props props(@Nonnull final MemberName memberName, @Nonnull final String storeName,
+            final ActorContext ctx, final String shardName) {
+        final String name = "datastore-" + storeName;
+        final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name));
+        return Props.create(SimpleDataStoreClientActor.class,
+            () -> new SimpleDataStoreClientActor(frontendId, ctx, shardName));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleDataStoreClientBehavior.java
new file mode 100644 (file)
index 0000000..d6818d3
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * {@link AbstractDataStoreClientBehavior} which connects to a single shard only.
+ *
+ * @author Robert Varga
+ */
+final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
+    // Pre-boxed instance
+    private static final Long ZERO = Long.valueOf(0);
+
+    private SimpleDataStoreClientBehavior(final ClientActorContext context,
+            final SimpleShardBackendResolver resolver) {
+        super(context, resolver);
+    }
+
+    SimpleDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext,
+            final String shardName) {
+        this(context, new SimpleShardBackendResolver(context.getIdentifier(), actorContext, shardName));
+    }
+
+    @Override
+    Long resolveShardForPath(final YangInstanceIdentifier path) {
+        return ZERO;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SimpleShardBackendResolver.java
new file mode 100644 (file)
index 0000000..056a1ea
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletionStage;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Unlike the full
+ * {@link ModuleShardBackendResolver}, this resolver is used in situations where the client corresponds exactly to one
+ * backend shard, e.g. there is only one fixed cookie assigned and the operation path is not consulted at all.
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+final class SimpleShardBackendResolver extends AbstractShardBackendResolver {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleShardBackendResolver.class);
+
+    private final String shardName;
+
+    private volatile ShardState state;
+
+    // FIXME: we really need just ActorContext.findPrimaryShardAsync()
+    SimpleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext,
+            final String shardName) {
+        super(clientId, actorContext);
+        this.shardName = Preconditions.checkNotNull(shardName);
+    }
+
+    private CompletionStage<ShardBackendInfo> getBackendInfo(final long cookie) {
+        Preconditions.checkArgument(cookie == 0);
+
+        ShardState local = state;
+        if (local == null) {
+            synchronized (this) {
+                local = state;
+                if (local == null) {
+                    local = resolveBackendInfo(shardName, 0);
+                    state = local;
+                }
+            }
+        }
+
+        return local.getStage();
+    }
+
+    @Override
+    public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+        return getBackendInfo(cookie.longValue());
+    }
+
+    @Override
+    public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+            final ShardBackendInfo staleInfo) {
+
+        final ShardState existing = state;
+        if (existing != null) {
+            if (!staleInfo.equals(existing.getResult())) {
+                return existing.getStage();
+            }
+
+            synchronized (this) {
+                LOG.debug("Invalidating backend information {}", staleInfo);
+                flushCache(shardName);
+                LOG.trace("Invalidated cache %s", staleInfo);
+                state = null;
+            }
+        }
+
+        return getBackendInfo(cookie);
+    }
+}
index 8bfad09..4ae64ac 100644 (file)
@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -73,7 +73,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
 
     private final ClientIdentifier identifier;
-    private final DistributedDataStoreClient client;
+    private final DataStoreClient client;
 
     private final TransactionContextFactory txContextFactory;
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.