import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
@After
public void tearDown() {
- for(MemberNode m: memberNodes) {
+ for (MemberNode m : Lists.reverse(memberNodes)) {
m.cleanup();
}
+ memberNodes.clear();
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Client-side view of a local history. This class tracks all state related to a particular history and routes
+ * frontend requests towards the backend.
+ *
+ * This interface is used by the world outside of the actor system and in the actor system it is manifested via
+ * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
+ *
+ * @author Robert Varga
+ */
+@Beta
+@NotThreadSafe
+public final class ClientLocalHistory implements AutoCloseable {
+ private static final AtomicIntegerFieldUpdater<ClientLocalHistory> CLOSED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state");
+ private static final int IDLE_STATE = 0;
+ private static final int CLOSED_STATE = 1;
+
+ private final LocalHistoryIdentifier historyId;
+ private final ActorRef backendActor;
+ private final ActorRef clientActor;
+
+ private volatile int state = IDLE_STATE;
+
+ ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId,
+ final ActorRef backendActor) {
+ this.clientActor = client.self();
+ this.backendActor = Preconditions.checkNotNull(backendActor);
+ this.historyId = new LocalHistoryIdentifier(client.getIdentifier(), historyId);
+ }
+
+ private void checkNotClosed() {
+ Preconditions.checkState(state != CLOSED_STATE, "Local history %s has been closed", historyId);
+ }
+
+ @Override
+ public void close() {
+ if (CLOSED_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) {
+ // FIXME: signal close to both client actor and backend actor
+ } else if (state != CLOSED_STATE) {
+ throw new IllegalStateException("Cannot close history with an open transaction");
+ }
+ }
+
+ // FIXME: add client requests related to a particular local history
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Command sent from the Java world to the client actor to create a new local history.
+ *
+ * @author Robert Varga
+ */
+final class CreateLocalHistoryCommand {
+ private final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
+
+ CompletableFuture<ClientLocalHistory> future() {
+ return future;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+/**
+ * Client interface for interacting with the frontend actor. This interface is the primary access point through
+ * which the DistributedDataStore frontend interacts with backend Shards.
+ *
+ * Keep this interface as clean as possible, as it needs to be implemented in thread-safe and highly-efficient manner.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface DistributedDataStoreClient extends Identifiable<ClientIdentifier>, AutoCloseable {
+ @Override
+ ClientIdentifier getIdentifier();
+
+ @Override
+ void close();
+
+ /**
+ * Create a new local history. This method initiates an asynchronous instantiation of a local history on the back
+ * end. ClientLocalHistory represents the interface exposed to the client.
+ *
+ * @return Future client history handle
+ */
+ CompletionStage<ClientLocalHistory> createLocalHistory();
+
+ // TODO: add methods required by DistributedDataStore
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.ExplicitAskSupport;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.actors.client.AbstractClientActor;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import scala.Function1;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
+ *
+ * @author Robert Varga
+ */
+public final class DistributedDataStoreClientActor extends AbstractClientActor {
+ // Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message.
+ // In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead
+ // of akka.japi.Function.
+ private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$;
+ private static final Function1<ActorRef, Object> GET_CLIENT_FACTORY = new AbstractFunction1<ActorRef, Object>() {
+ @Override
+ public Object apply(final ActorRef askSender) {
+ return new GetClientRequest(askSender);
+ }
+ };
+
+ private DistributedDataStoreClientActor(final FrontendIdentifier frontendId) {
+ super(frontendId);
+ }
+
+ @Override
+ protected ClientActorBehavior initialBehavior(final ClientActorContext context) {
+ return new DistributedDataStoreClientBehavior(context);
+ }
+
+ public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName) {
+ final String name = "DistributedDataStore:storeName='" + storeName + "'";
+ final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name));
+ return Props.create(DistributedDataStoreClientActor.class, () -> new DistributedDataStoreClientActor(frontendId));
+ }
+
+ public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor,
+ final long timeout, final TimeUnit unit) {
+ try {
+ return (DistributedDataStoreClient) Await.result(ASK_SUPPORT.ask(actor, GET_CLIENT_FACTORY,
+ Timeout.apply(timeout, unit)), Duration.Inf());
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
+ * frontend.
+ *
+ * This class is not visible outside of this package because it breaks the actor containment. Services provided to
+ * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
+ *
+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
+ * When touching internal state, be mindful of the execution context from which execution context, Actor
+ * or POJO, is the state being accessed or modified.
+ *
+ * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
+ * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
+ * doubt, feel free to synchronize on this object.
+ *
+ * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
+ * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
+ * for correctness and performance impact.
+ *
+ * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
+ * for performing work and charging applications for it. That has two positive effects:
+ * - CPU usage is distributed across applications, minimizing work done in the actor thread
+ * - CPU usage provides back-pressure towards the application.
+ *
+ * @author Robert Varga
+ */
+final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
+ private static final Object SHUTDOWN = new Object() {
+ @Override
+ public String toString() {
+ return "SHUTDOWN";
+ }
+ };
+
+ private long nextHistoryId;
+
+ DistributedDataStoreClientBehavior(final ClientActorContext context) {
+ super(context);
+ }
+
+ //
+ //
+ // Methods below are invoked from the client actor thread
+ //
+ //
+
+ private void createLocalHistory(final CreateLocalHistoryCommand command) {
+ final CompletableFuture<ClientLocalHistory> future = command.future();
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
+ LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
+
+ // FIXME: initiate backend instantiation
+ future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+ }
+
+ @Override
+ protected ClientActorBehavior onCommand(final Object command) {
+ if (command instanceof CreateLocalHistoryCommand) {
+ createLocalHistory((CreateLocalHistoryCommand) command);
+ } else if (command instanceof GetClientRequest) {
+ ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
+ } else if (SHUTDOWN.equals(command)) {
+ // Add shutdown procedures here
+ return null;
+ } else {
+ LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
+ }
+
+ return this;
+ }
+
+ //
+ //
+ // Methods below are invoked from application threads
+ //
+ //
+
+ @Override
+ public CompletionStage<ClientLocalHistory> createLocalHistory() {
+ final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
+ self().tell(command, ActorRef.noSender());
+ return command.future();
+ }
+
+ @Override
+ public void close() {
+ self().tell(SHUTDOWN, ActorRef.noSender());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+
+/**
+ * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DistributedDataStoreClient}.
+ *
+ * @author Robert Varga
+ */
+final class GetClientRequest {
+ private final ActorRef replyTo;
+
+ public GetClientRequest(final ActorRef replyTo) {
+ this.replyTo = Preconditions.checkNotNull(replyTo);
+ }
+
+ ActorRef getReplyTo() {
+ return replyTo;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ * This package contains implementation required by the DistributedDataStore frontend.
+ *
+ * @author Robert Varga
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
\ No newline at end of file
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
- private static final String UNKNOWN_TYPE = "unknown";
private static final long READY_WAIT_FACTOR = 3;
private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
- private final String type;
+ private final ClientIdentifier identifier;
+ private final DistributedDataStoreClient client;
private final TransactionContextFactory txContextFactory;
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
- this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+ final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreName());
+ final ActorRef clientActor = actorSystem.actorOf(clientProps);
+ try {
+ client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to get actor for {}", clientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ throw Throwables.propagate(e);
+ }
+
+ identifier = client.getIdentifier();
+ LOG.debug("Distributed data store client {} started", identifier);
- String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+ String shardManagerId = ShardManagerIdentifier.builder()
+ .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
LOG.info("Creating ShardManager : {}", shardManagerId);
}
@VisibleForTesting
- DistributedDataStore(ActorContext actorContext) {
+ DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) {
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.client = null;
+ this.identifier = Preconditions.checkNotNull(identifier);
this.txContextFactory = TransactionContextFactory.create(actorContext);
- this.type = UNKNOWN_TYPE;
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
}
@Override
public void close() {
- LOG.info("Closing data store {}", type);
+ LOG.info("Closing data store {}", identifier);
if (datastoreConfigMXBean != null) {
datastoreConfigMXBean.unregisterMBean();
txContextFactory.close();
actorContext.shutdown();
+
+ if (client != null) {
+ client.close();
+ }
}
@Override
}
public void waitTillReady(){
- LOG.info("Beginning to wait for data store to become ready : {}", type);
+ LOG.info("Beginning to wait for data store to become ready : {}", identifier);
try {
if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
- LOG.debug("Data store {} is now ready", type);
+ LOG.debug("Data store {} is now ready", identifier);
} else {
LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.persistence.UntypedPersistentActor;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend actor which takes care of persisting generations and creates an appropriate ClientIdentifier.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractClientActor extends UntypedPersistentActor {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractClientActor.class);
+ private AbstractClientActorBehavior<?> currentBehavior;
+
+ protected AbstractClientActor(final FrontendIdentifier frontendId) {
+ currentBehavior = new RecoveringClientActorBehavior(
+ new InitialClientActorContext(this, frontendId.toString()), frontendId);
+ }
+
+ @Override
+ public final String persistenceId() {
+ return currentBehavior.persistenceId();
+ }
+
+ private void switchBehavior(final AbstractClientActorBehavior<?> nextBehavior) {
+ if (!currentBehavior.equals(nextBehavior)) {
+ if (nextBehavior == null) {
+ LOG.debug("{}: shutting down", persistenceId());
+ self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ } else {
+ LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior);
+ }
+
+ currentBehavior = nextBehavior;
+ }
+ }
+
+ @Override
+ public final void onReceiveCommand(final Object command) {
+ if (command == null) {
+ LOG.debug("{}: ignoring null command", persistenceId());
+ return;
+ }
+
+ if (currentBehavior != null) {
+ switchBehavior(currentBehavior.onReceiveCommand(command));
+ } else {
+ LOG.debug("{}: shutting down, ignoring command {}", persistenceId(), command);
+ }
+ }
+
+ @Override
+ public final void onReceiveRecover(final Object recover) {
+ switchBehavior(currentBehavior.onReceiveRecover(recover));
+ }
+
+ protected abstract ClientActorBehavior initialBehavior(ClientActorContext context);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Base behavior attached to {@link AbstractClientActor}. Exposes
+ * @author user
+ *
+ * @param <C> Type of associated context
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractClientActorBehavior<C extends AbstractClientActorContext> {
+ private final C context;
+
+ AbstractClientActorBehavior(final @Nonnull C context) {
+ // Hidden to prevent outside subclasses. Users instantiated this via ClientActorBehavior
+ this.context = Preconditions.checkNotNull(context);
+ }
+
+ /**
+ * Return an {@link AbstractClientActorContext} associated with this {@link AbstractClientActor}.
+ *
+ * @return A client actor context instance.
+ */
+ protected final @Nonnull C context() {
+ return context;
+ }
+
+ /**
+ * Return the persistence identifier associated with this {@link AbstractClientActor}. This identifier should be
+ * used in logging to identify this actor.
+ *
+ * @return Persistence identifier
+ */
+ protected final @Nonnull String persistenceId() {
+ return context.persistenceId();
+ }
+
+ /**
+ * Return an {@link ActorRef} of this ClientActor.
+ *
+ * @return
+ */
+ public final @Nonnull ActorRef self() {
+ return context.self();
+ }
+
+ /**
+ * Implementation-internal method for handling an incoming command message.
+ *
+ * @param command Command message
+ * @return Behavior which should be used with the next message. Return null if this actor should shut down.
+ */
+ abstract @Nullable AbstractClientActorBehavior<?> onReceiveCommand(@Nonnull Object command);
+
+ /**
+ * Implementation-internal method for handling an incoming recovery message coming from persistence.
+ *
+ * @param recover Recover message
+ * @return Behavior which should be used with the next message. Return null if this actor should shut down.
+ */
+ abstract @Nullable AbstractClientActorBehavior<?> onReceiveRecover(@Nonnull Object recover);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.Mutable;
+
+/**
+ * Common, externally-invisible superclass of contexts associated with a {@link AbstractClientActor}. End users pass this
+ * object via opaque {@link ClientActorContext}.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractClientActorContext implements Mutable {
+ private final String persistenceId;
+ private final ActorRef self;
+
+ AbstractClientActorContext(final @Nonnull ActorRef self, final @Nonnull String persistenceId) {
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.self = Preconditions.checkNotNull(self);
+ }
+
+ final @Nonnull String persistenceId() {
+ return persistenceId;
+ }
+
+ final @Nonnull ActorRef self() {
+ return self;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import com.google.common.annotations.Beta;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+/**
+ * A behavior, which handles messages sent to a {@link AbstractClientActor}.
+ *
+ * @param <T> Frontend type
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<ClientActorContext>
+ implements Identifiable<ClientIdentifier> {
+ protected ClientActorBehavior(final @Nonnull ClientActorContext context) {
+ super(context);
+ }
+
+ @Override
+ final ClientActorBehavior onReceiveCommand(final Object command) {
+ // TODO: any client-common logic (such as validation and common dispatch) needs to go here
+ return onCommand(command);
+ }
+
+ @Override
+ public final @Nonnull ClientIdentifier getIdentifier() {
+ return context().getIdentifier();
+ }
+
+ /**
+ * Override this method to handle any command which is not handled by the base behavior.
+ *
+ * @param command
+ * @return Next behavior to use, null if this actor should shut down.
+ */
+ protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+/**
+ * An actor context associated with this {@link AbstractClientActor}
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
+ private final ClientIdentifier identifier;
+
+ ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) {
+ super(self, persistenceId);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ }
+
+ @Override
+ public ClientIdentifier getIdentifier() {
+ return identifier;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+
+/**
+ *
+ * @author Robert Varga
+ */
+final class InitialClientActorContext extends AbstractClientActorContext {
+ private final AbstractClientActor actor;
+
+ InitialClientActorContext(final AbstractClientActor actor, final String persistenceId) {
+ super(actor.self(), persistenceId);
+ this.actor = Preconditions.checkNotNull(actor);
+ }
+
+ void saveSnapshot(final ClientIdentifier snapshot) {
+ actor.saveSnapshot(snapshot);
+ }
+
+ ClientActorBehavior createBehavior(final ClientActorContext context) {
+ return actor.initialBehavior(context);
+ }
+
+ void stash() {
+ actor.stash();
+ }
+
+ void unstash() {
+ actor.unstashAll();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+/**
+ * @param <C> Concrete context type
+ *
+ * @author Robert Varga
+ */
+abstract class RecoveredClientActorBehavior<C extends AbstractClientActorContext> extends AbstractClientActorBehavior<C> {
+
+ RecoveredClientActorBehavior(final C context) {
+ super(context);
+ }
+
+ @Override
+ final AbstractClientActorBehavior<?> onReceiveRecover(Object recover) {
+ throw new IllegalStateException("Frontend has been recovered");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param <T> Frontend type
+ *
+ * @author Robert Varga
+ */
+final class RecoveringClientActorBehavior extends AbstractClientActorBehavior<InitialClientActorContext> {
+ private static final Logger LOG = LoggerFactory.getLogger(RecoveringClientActorBehavior.class);
+ private final FrontendIdentifier currentFrontend;
+ private ClientIdentifier lastId = null;
+
+ RecoveringClientActorBehavior(final InitialClientActorContext context, final FrontendIdentifier frontendId) {
+ super(context);
+ currentFrontend = Preconditions.checkNotNull(frontendId);
+ }
+
+ @Override
+ AbstractClientActorBehavior<?> onReceiveCommand(final Object command) {
+ throw new IllegalStateException("Frontend is recovering");
+ }
+
+ @Override
+ AbstractClientActorBehavior<?> onReceiveRecover(final Object recover) {
+ if (recover instanceof RecoveryCompleted) {
+ final ClientIdentifier nextId;
+ if (lastId != null) {
+ if (!currentFrontend.equals(lastId.getFrontendId())) {
+ LOG.error("Mismatched frontend identifier, shutting down. Current: {} Saved: {}", currentFrontend,
+ lastId.getFrontendId());
+ return null;
+ }
+
+ nextId = ClientIdentifier.create(currentFrontend, lastId.getGeneration() + 1);
+ } else {
+ nextId = ClientIdentifier.create(currentFrontend, 0);
+ }
+
+ LOG.debug("{}: persisting new identifier {}", persistenceId(), nextId);
+ context().saveSnapshot(nextId);
+ return new SavingClientActorBehavior(context(), nextId);
+ } else if (recover instanceof SnapshotOffer) {
+ lastId = (ClientIdentifier) ((SnapshotOffer)recover).snapshot();
+ LOG.debug("{}: recovered identifier {}", lastId);
+ } else {
+ LOG.warn("{}: ignoring recovery message {}", recover);
+ }
+
+ return this;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Robert Varga
+ */
+final class SavingClientActorBehavior extends RecoveredClientActorBehavior<InitialClientActorContext> {
+ private static final Logger LOG = LoggerFactory.getLogger(SavingClientActorBehavior.class);
+ private final ClientIdentifier myId;
+
+ SavingClientActorBehavior(final InitialClientActorContext context, final ClientIdentifier nextId) {
+ super(context);
+ this.myId = Preconditions.checkNotNull(nextId);
+ }
+
+ @Override
+ AbstractClientActorBehavior<?> onReceiveCommand(final Object command) {
+ if (command instanceof SaveSnapshotFailure) {
+ LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) command).cause());
+ return null;
+ } else if (command instanceof SaveSnapshotSuccess) {
+ context().unstash();
+ return context().createBehavior(new ClientActorContext(self(), persistenceId(), myId));
+ } else {
+ LOG.debug("{}: stashing command {}", persistenceId(), command);
+ context().stash();
+ return this;
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ * This package contains the baseline client infrastructure required to implement clients accessing the data store.
+ *
+ * @author Robert Varga
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
\ No newline at end of file
@After
public void tearDown() {
+ if (followerDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+ if (leaderDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
JavaTestKit.shutdownActorSystem(follower2System);
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.FiniteDuration;
public class DistributedDataStoreTest extends AbstractActorTest {
+ private static final ClientIdentifier UNKNOWN_ID = ClientIdentifier.create(
+ FrontendIdentifier.create(MemberName.forName("local"), FrontendType.forName("unknown")), 0);
private SchemaContext schemaContext;
@Test
public void testRateLimitingUsedInReadWriteTxCreation(){
- try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+ try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
distributedDataStore.newReadWriteTransaction();
@Test
public void testRateLimitingUsedInWriteOnlyTxCreation(){
- try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+ try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
distributedDataStore.newWriteOnlyTransaction();
@Test
public void testRateLimitingNotUsedInReadOnlyTxCreation(){
- try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+ try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
distributedDataStore.newReadOnlyTransaction();
distributedDataStore.newReadOnlyTransaction();
doReturn(datastoreContext).when(actorContext).getDatastoreContext();
doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
- try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+ try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
long start = System.currentTimeMillis();
@Test
public void testWaitTillReadyCountDown(){
- try (final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+ try (final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
doReturn(datastoreContext).when(actorContext).getDatastoreContext();
doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
- Executors.newSingleThreadExecutor().submit(new Runnable() {
- @Override
- public void run() {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
- }
+ Executors.newSingleThreadExecutor().submit(() -> {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
});
long start = System.currentTimeMillis();
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Arrays;
@After
public void tearDown() {
- for(MemberNode m: memberNodes) {
+ for (MemberNode m : Lists.reverse(memberNodes)) {
m.cleanup();
}
+ memberNodes.clear();
}
private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) {