import akka.actor.Props;
import akka.pattern.ExplicitAskSupport;
import akka.util.Timeout;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.FrontendType;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.actors.client.AbstractClientActor;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import scala.Function1;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
}
};
- private DistributedDataStoreClientActor(final FrontendIdentifier frontendId) {
+ private final ActorContext actorContext;
+
+ private DistributedDataStoreClientActor(final FrontendIdentifier frontendId, final ActorContext actorContext) {
super(frontendId);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
}
@Override
- protected ClientActorBehavior initialBehavior(final ClientActorContext context) {
- return new DistributedDataStoreClientBehavior(context);
+ protected DistributedDataStoreClientBehavior initialBehavior(final ClientActorContext context) {
+ return new DistributedDataStoreClientBehavior(context, actorContext);
}
- public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName) {
+ public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName, final ActorContext ctx) {
final String name = "DistributedDataStore:storeName='" + storeName + "'";
final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name));
- return Props.create(DistributedDataStoreClientActor.class, () -> new DistributedDataStoreClientActor(frontendId));
+ return Props.create(DistributedDataStoreClientActor.class,
+ () -> new DistributedDataStoreClientActor(frontendId, ctx));
}
public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor,
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
+ private final ModuleShardBackendResolver resolver;
private long nextHistoryId;
- DistributedDataStoreClientBehavior(final ClientActorContext context) {
+ DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
super(context);
+ resolver = new ModuleShardBackendResolver(actorContext);
}
//
public void close() {
context().executeInActor(this::shutdown);
}
+
+ @Override
+ protected ModuleShardBackendResolver resolver() {
+ return resolver;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.primitives.UnsignedLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo;
+import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.FutureConverters;
+
+/**
+ * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
+ * shard is assigned a single cookie and this mapping is stored in a bidirectional map. Information about corresponding
+ * shard leader is resolved via {@link ActorContext}. The product of resolution is {@link ShardBackendInfo}.
+ *
+ * @author Robert Varga
+ */
+final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+ private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
+ /**
+ * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
+ * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
+ * non-operational.
+ */
+ // TODO: maybe make this configurable somehow?
+ private static final Timeout DEAD_TIMEOUT = Timeout.apply(15, TimeUnit.MINUTES);
+
+ private final ActorContext actorContext;
+
+ private volatile BiMap<String, Long> shards = ImmutableBiMap.of();
+
+ // FIXME: we really need just ActorContext.findPrimaryShardAsync()
+ ModuleShardBackendResolver(final ActorContext actorContext) {
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ }
+
+ @Override
+ protected void invalidateBackendInfo(final CompletionStage<? extends BackendInfo> info) {
+ LOG.trace("Initiated invalidation of backend information {}", info);
+ info.thenAccept(this::invalidate);
+ }
+
+ private void invalidate(final BackendInfo result) {
+ Preconditions.checkArgument(result instanceof ShardBackendInfo);
+ LOG.debug("Invalidating backend information {}", result);
+ actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName());
+ }
+
+ @Override
+ protected CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+ final String shardName = shards.inverse().get(cookie);
+ if (shardName == null) {
+ LOG.warn("Failing request for non-existent cookie {}", cookie);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
+ return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName))
+ .thenApply(o -> createBackendInfo(o, shardName, cookie));
+ }
+
+ private static ABIVersion toABIVersion(final short version) {
+ switch (version) {
+ case DataStoreVersions.BORON_VERSION:
+ return ABIVersion.BORON;
+ }
+
+ throw new IllegalArgumentException("Unsupported version " + version);
+ }
+
+ private static ShardBackendInfo createBackendInfo(final Object result, final String shardName, final Long cookie) {
+ Preconditions.checkArgument(result instanceof PrimaryShardInfo);
+ final PrimaryShardInfo info = (PrimaryShardInfo) result;
+
+ LOG.debug("Creating backend information for {}", info);
+ return new ShardBackendInfo(info.getPrimaryShardActor().resolveOne(DEAD_TIMEOUT).value().get().get(),
+ toABIVersion(info.getPrimaryShardVersion()), shardName, UnsignedLong.fromLongBits(cookie),
+ info.getLocalShardDataTree());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedLong;
+import java.util.Optional;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Combined backend tracking. Aside from usual {@link BackendInfo}, this object also tracks the cookie assigned
+ * to the shard. This assignment remains constant for as long as the client is not restarted.
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+final class ShardBackendInfo extends BackendInfo {
+ private final Optional<DataTree> dataTree;
+ private final UnsignedLong cookie;
+ private final String shardName;
+
+ ShardBackendInfo(final ActorRef actor, final ABIVersion version, final String shardName, final UnsignedLong cookie,
+ final Optional<DataTree> dataTree) {
+ super(actor, version);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.cookie = Preconditions.checkNotNull(cookie);
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ }
+
+ UnsignedLong getCookie() {
+ return cookie;
+ }
+
+ Optional<DataTree> getDataTree() {
+ return dataTree;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ LocalHistoryIdentifier brandHistory(final LocalHistoryIdentifier id) {
+ Preconditions.checkArgument(id.getCookie() == 0, "History %s is already branded", id);
+ return new LocalHistoryIdentifier(id.getClientId(), id.getHistoryId(), cookie.longValue());
+ }
+
+ @Override
+ protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ return super.addToStringAttributes(toStringHelper).add("cookie", cookie).add("shard", shardName);
+ }
+}
private final TransactionContextFactory txContextFactory;
- public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
- Configuration configuration, DatastoreContextFactory datastoreContextFactory,
- DatastoreSnapshot restoreFromSnapshot) {
+ public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+ final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+ final DatastoreSnapshot restoreFromSnapshot) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
- final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreName());
- final ActorRef clientActor = actorSystem.actorOf(clientProps);
- try {
- client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to get actor for {}", clientProps, e);
- clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- throw Throwables.propagate(e);
- }
-
- identifier = client.getIdentifier();
- LOG.debug("Distributed data store client {} started", identifier);
-
String shardManagerId = ShardManagerIdentifier.builder()
.type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ identifier = client.getIdentifier();
+ LOG.debug("Distributed data store client {} started", identifier);
+
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
}
@VisibleForTesting
- DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) {
+ DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
this.client = null;
this.identifier = Preconditions.checkNotNull(identifier);
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
}
- public void setCloseable(AutoCloseable closeable) {
+ public void setCloseable(final AutoCloseable closeable) {
this.closeable = closeable;
}
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
ListenerRegistration<L> registerChangeListener(
- final YangInstanceIdentifier path, L listener,
- AsyncDataBroker.DataChangeScope scope) {
+ final YangInstanceIdentifier path, final L listener,
+ final AsyncDataBroker.DataChangeScope scope) {
Preconditions.checkNotNull(path, "path should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
}
@Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
Preconditions.checkNotNull(treeId, "treeId should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
@Override
public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
- DOMDataTreeIdentifier subtree, C cohort) {
+ final DOMDataTreeIdentifier subtree, final C cohort) {
YangInstanceIdentifier treeId =
Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
Preconditions.checkNotNull(cohort, "listener should not be null");
}
@Override
- public void onGlobalContextUpdated(SchemaContext schemaContext) {
+ public void onGlobalContextUpdated(final SchemaContext schemaContext) {
actorContext.setSchemaContext(schemaContext);
}
@Override
- public void onDatastoreContextUpdated(DatastoreContextFactory contextFactory) {
+ public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
actorContext.setDatastoreContext(contextFactory);
}
}
- private static ActorRef createShardManager(ActorSystem actorSystem, ShardManagerCreator creator,
- String shardDispatcher, String shardManagerId) {
+ private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
+ final String shardDispatcher, final String shardManagerId) {
Exception lastException = null;
for(int i=0;i<100;i++) {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+
+/**
+ * Basic information about how to talk to the backend. ClientActorBehavior uses this information to dispatch requests
+ * to the backend.
+ *
+ * This class is not final so concrete actor behavior implementations may subclass it and track more information about
+ * the backend. The {@link #hashCode()} and {@link #equals(Object)} methods are made final to ensure subclasses compare
+ * on identity.
+ *
+ * @author Robert Varga
+ */
+public class BackendInfo {
+ private final ABIVersion version;
+ private final ActorRef actor;
+
+ protected BackendInfo(final ActorRef actor, final ABIVersion version) {
+ this.version = Preconditions.checkNotNull(version);
+ this.actor = Preconditions.checkNotNull(actor);
+ }
+
+ public final ActorRef getActor() {
+ return actor;
+ }
+
+ public final ABIVersion getVersion() {
+ return version;
+ }
+
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public final boolean equals(final Object obj) {
+ return super.equals(obj);
+ }
+
+ @Override
+ public final String toString() {
+ return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+ }
+
+ protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+ return toStringHelper.add("actor", actor).add("version", version);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Caching resolver which resolves a cookie to a leader {@link ActorRef}. This class needs to be specialized by the
+ * client. It is used by {@link ClientActorBehavior} for request dispatch. Results are cached until they are invalidated
+ * by either the client actor (when a message timeout is detected) and by the specific frontend (on explicit
+ * invalidation or when updated information becomes available).
+ *
+ * @author Robert Varga
+ */
+@ThreadSafe
+public abstract class BackendInfoResolver<T extends BackendInfo> {
+ private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class);
+ private final ConcurrentMap<Long, CompletionStage<T>> backends = new ConcurrentHashMap<>();
+
+ // This is what the client needs to start processing. For as long as we do not have this, we should not complete
+ // this stage until we have this information
+ public final CompletionStage<? extends T> getBackendInfo(final long cookie) {
+ return backends.computeIfAbsent(cookie, this::resolveBackendInfo);
+ }
+
+ /**
+ * Invalidate a particular instance of {@link BackendInfo}, typically as a response to a request timing out. If
+ * the provided information is not the one currently cached this method does nothing.
+ *
+ * @param cookie Backend cookie
+ * @param info Previous information to be invalidated
+ */
+ public final void invalidateBackend(final long cookie, final @Nonnull CompletionStage<? extends BackendInfo> info) {
+ if (backends.remove(cookie, Preconditions.checkNotNull(info))) {
+ LOG.trace("Invalidated cache %s -> %s", Long.toUnsignedString(cookie), info);
+ invalidateBackendInfo(info);
+ }
+ }
+
+ /**
+ * Request new resolution of a particular backend identified by a cookie. This method is invoked when a client
+ * requests information which is not currently cached.
+ *
+ * @param cookie Backend cookie
+ * @return A {@link CompletionStage} resulting in information about the backend
+ */
+ protected abstract @Nonnull CompletionStage<T> resolveBackendInfo(final @Nonnull Long cookie);
+
+ /**
+ * Invalidate previously-resolved shard information. This method is invoked when a timeout is detected
+ * and the information may need to be refreshed.
+ *
+ * @param info Previous promise of backend information
+ */
+ protected abstract void invalidateBackendInfo(@Nonnull CompletionStage<? extends BackendInfo> info);
+}
* @return Next behavior to use, null if this actor should shut down.
*/
protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command);
+
+ /**
+ * Override this method to provide a backend resolver instance.
+ *
+ * @return
+ */
+ protected abstract @Nonnull BackendInfoResolver<?> resolver();
}