}
/**
- * Check queue timeouts and return true if a timeout has occurred.
+ * Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress
+ * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
*
- * @return True if a timeout occurred
- * @throws NoProgressException if the queue failed to make progress for an extended
- * time.
+ * @param current Current behavior
+ * @return Next behavior to use
*/
@VisibleForTesting
final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
}
lastProgress = readTime();
- maybeEntry.get().complete(envelope.getMessage());
+
+ final TransmittedConnectionEntry entry = maybeEntry.get();
+ LOG.debug("Completing {} with {}", entry, envelope);
+ entry.complete(envelope.getMessage());
// We have freed up a slot, try to transmit something
final int toSend = remoteMaxMessages() - inflight.size();
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
return onCommand(command);
}
- private void onResponse(final ResponseEnvelope<?> response) {
- final WritableIdentifier id = response.getMessage().getTarget();
-
- // FIXME: this will need to be updated for other Request/Response types to extract cookie
- Preconditions.checkArgument(id instanceof TransactionIdentifier);
- final TransactionIdentifier txId = (TransactionIdentifier) id;
+ private static long extractCookie(final WritableIdentifier id) {
+ if (id instanceof TransactionIdentifier) {
+ return ((TransactionIdentifier) id).getHistoryId().getCookie();
+ } else if (id instanceof LocalHistoryIdentifier) {
+ return ((LocalHistoryIdentifier) id).getCookie();
+ } else {
+ throw new IllegalArgumentException("Unhandled identifier " + id);
+ }
+ }
- final AbstractClientConnection<T> connection = connections.get(txId.getHistoryId().getCookie());
+ private void onResponse(final ResponseEnvelope<?> response) {
+ final long cookie = extractCookie(response.getMessage().getTarget());
+ final AbstractClientConnection<T> connection = connections.get(cookie);
if (connection != null) {
connection.receiveResponse(response);
} else {
package org.opendaylight.controller.cluster.access.client;
import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.concepts.Request;
final long getEnqueuedTicks() {
return enqueuedTicks;
}
+
+ @Override
+ public final String toString() {
+ return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+ }
+
+ ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ return toStringHelper.add("request", request);
+ }
}
*/
package org.opendaylight.controller.cluster.access.client;
+import com.google.common.base.MoreObjects.ToStringHelper;
+
/**
* A {@link ConnectionEntry} which has been transmitted. It holds additional information about the last transmission.
*
return txTicks;
}
+ @Override
+ ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ return super.addToStringAttributes(toStringHelper).add("sessionId", sessionId).add("txSequence", txSequence);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.client.AbstractClientActor;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import scala.Function1;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+
+public abstract class AbstractDataStoreClientActor extends AbstractClientActor {
+ private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
+
+ private final ActorContext actorContext;
+
+ AbstractDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) {
+ super(frontendId);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ }
+
+ @Override
+ protected final AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context) {
+ return Verify.verifyNotNull(initialBehavior(context, actorContext));
+ }
+
+ abstract AbstractDataStoreClientBehavior initialBehavior(ClientActorContext context, ActorContext actorContext);
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static DataStoreClient getDistributedDataStoreClient(@Nonnull final ActorRef actor,
+ final long timeout, final TimeUnit unit) {
+ try {
+ return (DataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
+ Timeout.apply(timeout, unit)), Duration.Inf());
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
*
* <p>
* This class is not visible outside of this package because it breaks the actor containment. Services provided to
- * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
+ * Java world outside of actor containment are captured in {@link DataStoreClient}.
*
* <p>
- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
* When touching internal state, be mindful of the execution context from which execution context, Actor
* or POJO, is the state being accessed or modified.
*
* @author Robert Varga
*/
abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
- implements DistributedDataStoreClient {
+ implements DataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Function1;
+import scala.compat.java8.FutureConverters;
+
+/**
+ * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
+ * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
+ * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+ static final class ShardState {
+ private final CompletionStage<ShardBackendInfo> stage;
+ @GuardedBy("this")
+ private ShardBackendInfo result;
+
+ ShardState(final CompletionStage<ShardBackendInfo> stage) {
+ this.stage = Preconditions.checkNotNull(stage);
+ stage.whenComplete(this::onStageResolved);
+ }
+
+ @Nonnull CompletionStage<ShardBackendInfo> getStage() {
+ return stage;
+ }
+
+ @Nullable synchronized ShardBackendInfo getResult() {
+ return result;
+ }
+
+ private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
+ if (failure == null) {
+ this.result = Preconditions.checkNotNull(result);
+ } else {
+ LOG.warn("Failed to resolve shard", failure);
+ }
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
+
+ /**
+ * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
+ * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
+ * non-operational.
+ */
+ // TODO: maybe make this configurable somehow?
+ private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+
+ private final AtomicLong nextSessionId = new AtomicLong();
+ private final Function1<ActorRef, ?> connectFunction;
+ private final ActorContext actorContext;
+
+ // FIXME: we really need just ActorContext.findPrimaryShardAsync()
+ AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+ ABIVersion.current()));
+ }
+
+ protected final void flushCache(final String shardName) {
+ actorContext.getPrimaryShardInfoCache().remove(shardName);
+ }
+
+ protected final ShardState resolveBackendInfo(final String shardName, final long cookie) {
+ LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+
+ return new ShardState(FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(i -> {
+ LOG.debug("Looking up primary info for {} from {}", shardName, i);
+ return FutureConverters.toJava(ExplicitAsk.ask(i.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
+ }).thenApply(response -> {
+ if (response instanceof RequestFailure) {
+ final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
+ LOG.debug("Connect request failed {}", failure, failure.getCause());
+ throw Throwables.propagate(failure.getCause());
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+
+ return new ShardBackendInfo(success.getBackend(),
+ nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
+ success.getDataTree(), success.getMaxMessages());
+ }));
+ }
+}
* @author Robert Varga
*/
@Beta
-public interface DistributedDataStoreClient extends Identifiable<ClientIdentifier>, AutoCloseable {
+public interface DataStoreClient extends Identifiable<ClientIdentifier>, AutoCloseable {
@Override
@Nonnull ClientIdentifier getIdentifier();
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.util.Timeout;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.client.AbstractClientActor;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendType;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import scala.Function1;
-import scala.concurrent.Await;
-import scala.concurrent.duration.Duration;
/**
* A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
*
* @author Robert Varga
*/
-public final class DistributedDataStoreClientActor extends AbstractClientActor {
- private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
-
- private final ActorContext actorContext;
-
+public final class DistributedDataStoreClientActor extends AbstractDataStoreClientActor {
private DistributedDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) {
- super(frontendId);
- this.actorContext = Preconditions.checkNotNull(actorContext);
+ super(frontendId, actorContext);
}
@Override
- protected DistributedDataStoreClientBehavior initialBehavior(final ClientActorContext context) {
+ AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context, final ActorContext actorContext) {
return new DistributedDataStoreClientBehavior(context, actorContext);
}
return Props.create(DistributedDataStoreClientActor.class,
() -> new DistributedDataStoreClientActor(frontendId, ctx));
}
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- public static DistributedDataStoreClient getDistributedDataStoreClient(@Nonnull final ActorRef actor,
- final long timeout, final TimeUnit unit) {
- try {
- return (DistributedDataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
- Timeout.apply(timeout, unit)), Duration.Inf());
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
}
import com.google.common.base.Preconditions;
/**
- * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DistributedDataStoreClient}.
+ * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DataStoreClient}.
*
* @author Robert Varga
*/
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
private final TransactionIdentifier identifier;
private CursorAwareDataTreeModification modification;
+ private CursorAwareDataTreeSnapshot sealedModification;
LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final CursorAwareDataTreeModification modification) {
@Override
void doSeal() {
modification.ready();
+ sealedModification = modification;
}
DataTreeSnapshot getSnapshot() {
- return modification;
+ Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier);
+ return sealedModification;
}
private void applyModifyTransactionRequest(final ModifyTransactionRequest request,
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.actor.ActorRef;
-import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableBiMap.Builder;
-import com.google.common.primitives.UnsignedLong;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
-import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
-import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Function1;
-import scala.compat.java8.FutureConverters;
/**
* {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
*
* @author Robert Varga
*/
-@SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
- justification = "Pertains to the NULL_FUTURE field below. Null is allowed and is intended")
@ThreadSafe
-final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
- private static final class Entry {
- private final CompletionStage<ShardBackendInfo> stage;
- @GuardedBy("this")
- private ShardBackendInfo result;
-
- Entry(final CompletionStage<ShardBackendInfo> stage) {
- this.stage = Preconditions.checkNotNull(stage);
- stage.whenComplete(this::onStageResolved);
- }
-
- @Nonnull CompletionStage<ShardBackendInfo> getStage() {
- return stage;
- }
-
- synchronized @Nullable ShardBackendInfo getResult() {
- return result;
- }
-
- private synchronized void onStageResolved(final ShardBackendInfo result, final Throwable failure) {
- if (failure == null) {
- this.result = Preconditions.checkNotNull(result);
- } else {
- LOG.warn("Failed to resolve shard", failure);
- }
- }
- }
-
- private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
+final class ModuleShardBackendResolver extends AbstractShardBackendResolver {
private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
- /**
- * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
- * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
- * non-operational.
- */
- // TODO: maybe make this configurable somehow?
- private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
-
- private final ConcurrentMap<Long, Entry> backends = new ConcurrentHashMap<>();
- private final AtomicLong nextSessionId = new AtomicLong();
- private final Function1<ActorRef, ?> connectFunction;
+ private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
private final ActorContext actorContext;
@GuardedBy("this")
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
+ super(clientId, actorContext);
this.actorContext = Preconditions.checkNotNull(actorContext);
- this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
- ABIVersion.current()));
}
Long resolveShardForPath(final YangInstanceIdentifier path) {
return cookie;
}
- private CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+ private ShardState resolveBackendInfo(final Long cookie) {
final String shardName = shards.inverse().get(cookie);
if (shardName == null) {
LOG.warn("Failing request for non-existent cookie {}", cookie);
- return NULL_FUTURE;
+ return null;
}
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
- LOG.debug("Looking up primary info for {} from {}", shardName, info);
- return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
- }).thenApply(response -> {
- if (response instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
- LOG.debug("Connect request failed {}", failure, failure.getCause());
- throw Throwables.propagate(failure.getCause());
- }
-
- LOG.debug("Resolved backend information to {}", response);
-
- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response {}", response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
-
- return new ShardBackendInfo(success.getBackend(),
- nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
- success.getDataTree(), success.getMaxMessages());
- });
+ return resolveBackendInfo(shardName, cookie);
}
@Override
- public CompletionStage<? extends ShardBackendInfo> getBackendInfo(final Long cookie) {
- return backends.computeIfAbsent(cookie, key -> new Entry(resolveBackendInfo(key))).getStage();
+ public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+ return backends.computeIfAbsent(cookie, this::resolveBackendInfo).getStage();
}
@Override
- public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+ public CompletionStage<ShardBackendInfo> refreshBackendInfo(final Long cookie,
final ShardBackendInfo staleInfo) {
- final Entry existing = backends.get(cookie);
+ final ShardState existing = backends.get(cookie);
if (existing != null) {
if (!staleInfo.equals(existing.getResult())) {
return existing.getStage();
}
LOG.debug("Invalidating backend information {}", staleInfo);
- actorContext.getPrimaryShardInfoCache().remove(staleInfo.getShardName());
+ flushCache(staleInfo.getShardName());
- LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), staleInfo);
+ LOG.trace("Invalidated cache %s", staleInfo);
backends.remove(cookie, existing);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.client.AbstractClientActor;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+/**
+ * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
+ *
+ * @author Robert Varga
+ */
+public final class SimpleDataStoreClientActor extends AbstractDataStoreClientActor {
+ private final String shardName;
+
+ private SimpleDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext,
+ final String shardName) {
+ super(frontendId, actorContext);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ }
+
+ @Override
+ AbstractDataStoreClientBehavior initialBehavior(final ClientActorContext context, final ActorContext actorContext) {
+ return new SimpleDataStoreClientBehavior(context, actorContext, shardName);
+ }
+
+ public static Props props(@Nonnull final MemberName memberName, @Nonnull final String storeName,
+ final ActorContext ctx, final String shardName) {
+ final String name = "datastore-" + storeName;
+ final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name));
+ return Props.create(SimpleDataStoreClientActor.class,
+ () -> new SimpleDataStoreClientActor(frontendId, ctx, shardName));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * {@link AbstractDataStoreClientBehavior} which connects to a single shard only.
+ *
+ * @author Robert Varga
+ */
+final class SimpleDataStoreClientBehavior extends AbstractDataStoreClientBehavior {
+ // Pre-boxed instance
+ private static final Long ZERO = Long.valueOf(0);
+
+ private SimpleDataStoreClientBehavior(final ClientActorContext context,
+ final SimpleShardBackendResolver resolver) {
+ super(context, resolver);
+ }
+
+ SimpleDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext,
+ final String shardName) {
+ this(context, new SimpleShardBackendResolver(context.getIdentifier(), actorContext, shardName));
+ }
+
+ @Override
+ Long resolveShardForPath(final YangInstanceIdentifier path) {
+ return ZERO;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletionStage;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Unlike the full
+ * {@link ModuleShardBackendResolver}, this resolver is used in situations where the client corresponds exactly to one
+ * backend shard, e.g. there is only one fixed cookie assigned and the operation path is not consulted at all.
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+final class SimpleShardBackendResolver extends AbstractShardBackendResolver {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleShardBackendResolver.class);
+
+ private final String shardName;
+
+ private volatile ShardState state;
+
+ // FIXME: we really need just ActorContext.findPrimaryShardAsync()
+ SimpleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext,
+ final String shardName) {
+ super(clientId, actorContext);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ }
+
+ private CompletionStage<ShardBackendInfo> getBackendInfo(final long cookie) {
+ Preconditions.checkArgument(cookie == 0);
+
+ ShardState local = state;
+ if (local == null) {
+ synchronized (this) {
+ local = state;
+ if (local == null) {
+ local = resolveBackendInfo(shardName, 0);
+ state = local;
+ }
+ }
+ }
+
+ return local.getStage();
+ }
+
+ @Override
+ public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
+ return getBackendInfo(cookie.longValue());
+ }
+
+ @Override
+ public CompletionStage<? extends ShardBackendInfo> refreshBackendInfo(final Long cookie,
+ final ShardBackendInfo staleInfo) {
+
+ final ShardState existing = state;
+ if (existing != null) {
+ if (!staleInfo.equals(existing.getResult())) {
+ return existing.getStage();
+ }
+
+ synchronized (this) {
+ LOG.debug("Invalidating backend information {}", staleInfo);
+ flushCache(shardName);
+ LOG.trace("Invalidated cache %s", staleInfo);
+ state = null;
+ }
+ }
+
+ return getBackendInfo(cookie);
+ }
+}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
private final ClientIdentifier identifier;
- private final DistributedDataStoreClient client;
+ private final DataStoreClient client;
private final TransactionContextFactory txContextFactory;