From d0621d28e507d9f6c0b9445d197f90253d34725d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 11 May 2016 23:20:45 +0200 Subject: [PATCH] BUG-5280: introduce DistributedDataStoreClientActor This patch introduces a common ClientActor, which keeps track of frontend generations. Also introduce bind for DistributedDataStore, which uses this common infrastructure. Interface between the DistributedDataStore and the actor world is captured as DistributedDataStoreClient. Change-Id: I42c3281ca790fb5615a593740424ac494469e6a7 Signed-off-by: Robert Varga --- .../admin/ClusterAdminRpcServiceTest.java | 4 +- .../actors/dds/ClientLocalHistory.java | 62 ++++++++++ .../actors/dds/CreateLocalHistoryCommand.java | 23 ++++ .../dds/DistributedDataStoreClient.java | 41 +++++++ .../dds/DistributedDataStoreClientActor.java | 69 +++++++++++ .../DistributedDataStoreClientBehavior.java | 109 ++++++++++++++++++ .../actors/dds/GetClientRequest.java | 28 +++++ .../databroker/actors/dds/package-info.java | 13 +++ .../datastore/DistributedDataStore.java | 42 +++++-- .../actors/client/AbstractClientActor.java | 71 ++++++++++++ .../client/AbstractClientActorBehavior.java | 76 ++++++++++++ .../client/AbstractClientActorContext.java | 37 ++++++ .../actors/client/ClientActorBehavior.java | 48 ++++++++ .../actors/client/ClientActorContext.java | 34 ++++++ .../client/InitialClientActorContext.java | 40 +++++++ .../client/RecoveredClientActorBehavior.java | 25 ++++ .../client/RecoveringClientActorBehavior.java | 66 +++++++++++ .../client/SavingClientActorBehavior.java | 43 +++++++ .../datastore/actors/client/package-info.java | 13 +++ ...butedDataStoreRemotingIntegrationTest.java | 7 ++ .../datastore/DistributedDataStoreTest.java | 25 ++-- ...ributedEntityOwnershipIntegrationTest.java | 4 +- 22 files changed, 858 insertions(+), 22 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java diff --git a/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 8ec173d284..af2b4a9df6 100644 --- a/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -26,6 +26,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; @@ -102,9 +103,10 @@ public class ClusterAdminRpcServiceTest { @After public void tearDown() { - for(MemberNode m: memberNodes) { + for (MemberNode m : Lists.reverse(memberNodes)) { m.cleanup(); } + memberNodes.clear(); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java new file mode 100644 index 0000000000..b22f2bdc7b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Client-side view of a local history. This class tracks all state related to a particular history and routes + * frontend requests towards the backend. + * + * This interface is used by the world outside of the actor system and in the actor system it is manifested via + * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to + * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor. + * + * @author Robert Varga + */ +@Beta +@NotThreadSafe +public final class ClientLocalHistory implements AutoCloseable { + private static final AtomicIntegerFieldUpdater CLOSED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state"); + private static final int IDLE_STATE = 0; + private static final int CLOSED_STATE = 1; + + private final LocalHistoryIdentifier historyId; + private final ActorRef backendActor; + private final ActorRef clientActor; + + private volatile int state = IDLE_STATE; + + ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId, + final ActorRef backendActor) { + this.clientActor = client.self(); + this.backendActor = Preconditions.checkNotNull(backendActor); + this.historyId = new LocalHistoryIdentifier(client.getIdentifier(), historyId); + } + + private void checkNotClosed() { + Preconditions.checkState(state != CLOSED_STATE, "Local history %s has been closed", historyId); + } + + @Override + public void close() { + if (CLOSED_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) { + // FIXME: signal close to both client actor and backend actor + } else if (state != CLOSED_STATE) { + throw new IllegalStateException("Cannot close history with an open transaction"); + } + } + + // FIXME: add client requests related to a particular local history +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java new file mode 100644 index 0000000000..197b2f6222 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import java.util.concurrent.CompletableFuture; + +/** + * Command sent from the Java world to the client actor to create a new local history. + * + * @author Robert Varga + */ +final class CreateLocalHistoryCommand { + private final CompletableFuture future = new CompletableFuture<>(); + + CompletableFuture future() { + return future; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java new file mode 100644 index 0000000000..82c839e438 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.annotations.Beta; +import java.util.concurrent.CompletionStage; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; + +/** + * Client interface for interacting with the frontend actor. This interface is the primary access point through + * which the DistributedDataStore frontend interacts with backend Shards. + * + * Keep this interface as clean as possible, as it needs to be implemented in thread-safe and highly-efficient manner. + * + * @author Robert Varga + */ +@Beta +public interface DistributedDataStoreClient extends Identifiable, AutoCloseable { + @Override + ClientIdentifier getIdentifier(); + + @Override + void close(); + + /** + * Create a new local history. This method initiates an asynchronous instantiation of a local history on the back + * end. ClientLocalHistory represents the interface exposed to the client. + * + * @return Future client history handle + */ + CompletionStage createLocalHistory(); + + // TODO: add methods required by DistributedDataStore + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java new file mode 100644 index 0000000000..1e15fefd6f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.pattern.ExplicitAskSupport; +import akka.util.Timeout; +import com.google.common.base.Throwables; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.datastore.actors.client.AbstractClientActor; +import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior; +import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext; +import scala.Function1; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; +import scala.runtime.AbstractFunction1; + +/** + * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore. + * + * @author Robert Varga + */ +public final class DistributedDataStoreClientActor extends AbstractClientActor { + // Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message. + // In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead + // of akka.japi.Function. + private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$; + private static final Function1 GET_CLIENT_FACTORY = new AbstractFunction1() { + @Override + public Object apply(final ActorRef askSender) { + return new GetClientRequest(askSender); + } + }; + + private DistributedDataStoreClientActor(final FrontendIdentifier frontendId) { + super(frontendId); + } + + @Override + protected ClientActorBehavior initialBehavior(final ClientActorContext context) { + return new DistributedDataStoreClientBehavior(context); + } + + public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName) { + final String name = "DistributedDataStore:storeName='" + storeName + "'"; + final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name)); + return Props.create(DistributedDataStoreClientActor.class, () -> new DistributedDataStoreClientActor(frontendId)); + } + + public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor, + final long timeout, final TimeUnit unit) { + try { + return (DistributedDataStoreClient) Await.result(ASK_SUPPORT.ask(actor, GET_CLIENT_FACTORY, + Timeout.apply(timeout, unit)), Duration.Inf()); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java new file mode 100644 index 0000000000..2b5e6753be --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import akka.actor.Status; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior; +import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore + * frontend. + * + * This class is not visible outside of this package because it breaks the actor containment. Services provided to + * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}. + * + * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract. + * When touching internal state, be mindful of the execution context from which execution context, Actor + * or POJO, is the state being accessed or modified. + * + * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application + * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in + * doubt, feel free to synchronize on this object. + * + * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but + * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed + * for correctness and performance impact. + * + * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal + * for performing work and charging applications for it. That has two positive effects: + * - CPU usage is distributed across applications, minimizing work done in the actor thread + * - CPU usage provides back-pressure towards the application. + * + * @author Robert Varga + */ +final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { + private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); + private static final Object SHUTDOWN = new Object() { + @Override + public String toString() { + return "SHUTDOWN"; + } + }; + + private long nextHistoryId; + + DistributedDataStoreClientBehavior(final ClientActorContext context) { + super(context); + } + + // + // + // Methods below are invoked from the client actor thread + // + // + + private void createLocalHistory(final CreateLocalHistoryCommand command) { + final CompletableFuture future = command.future(); + final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++); + LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future); + + // FIXME: initiate backend instantiation + future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + } + + @Override + protected ClientActorBehavior onCommand(final Object command) { + if (command instanceof CreateLocalHistoryCommand) { + createLocalHistory((CreateLocalHistoryCommand) command); + } else if (command instanceof GetClientRequest) { + ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); + } else if (SHUTDOWN.equals(command)) { + // Add shutdown procedures here + return null; + } else { + LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command); + } + + return this; + } + + // + // + // Methods below are invoked from application threads + // + // + + @Override + public CompletionStage createLocalHistory() { + final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand(); + self().tell(command, ActorRef.noSender()); + return command.future(); + } + + @Override + public void close() { + self().tell(SHUTDOWN, ActorRef.noSender()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java new file mode 100644 index 0000000000..4bfd1c4a60 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; + +/** + * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DistributedDataStoreClient}. + * + * @author Robert Varga + */ +final class GetClientRequest { + private final ActorRef replyTo; + + public GetClientRequest(final ActorRef replyTo) { + this.replyTo = Preconditions.checkNotNull(replyTo); + } + + ActorRef getReplyTo() { + return replyTo; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java new file mode 100644 index 0000000000..4ff8544f8d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * This package contains implementation required by the DistributedDataStore frontend. + * + * @author Robert Varga + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 0244eb3740..fe321d4cd2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -10,11 +10,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient; +import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl; @@ -51,7 +57,6 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); - private static final String UNKNOWN_TYPE = "unknown"; private static final long READY_WAIT_FACTOR = 3; @@ -66,7 +71,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1); - private final String type; + private final ClientIdentifier identifier; + private final DistributedDataStoreClient client; private final TransactionContextFactory txContextFactory; @@ -78,9 +84,22 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null"); - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); + final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(), + datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()); + final ActorRef clientActor = actorSystem.actorOf(clientProps); + try { + client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Failed to get actor for {}", clientProps, e); + clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + throw Throwables.propagate(e); + } + + identifier = client.getIdentifier(); + LOG.debug("Distributed data store client {} started", identifier); - String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString(); + String shardManagerId = ShardManagerIdentifier.builder() + .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString(); LOG.info("Creating ShardManager : {}", shardManagerId); @@ -112,10 +131,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche } @VisibleForTesting - DistributedDataStore(ActorContext actorContext) { + DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.client = null; + this.identifier = Preconditions.checkNotNull(identifier); this.txContextFactory = TransactionContextFactory.create(actorContext); - this.type = UNKNOWN_TYPE; this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; } @@ -214,7 +234,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche @Override public void close() { - LOG.info("Closing data store {}", type); + LOG.info("Closing data store {}", identifier); if (datastoreConfigMXBean != null) { datastoreConfigMXBean.unregisterMBean(); @@ -233,6 +253,10 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche txContextFactory.close(); actorContext.shutdown(); + + if (client != null) { + client.close(); + } } @Override @@ -241,11 +265,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche } public void waitTillReady(){ - LOG.info("Beginning to wait for data store to become ready : {}", type); + LOG.info("Beginning to wait for data store to become ready : {}", identifier); try { if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) { - LOG.debug("Data store {} is now ready", type); + LOG.debug("Data store {} is now ready", identifier); } else { LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java new file mode 100644 index 0000000000..5f940842fa --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.persistence.UntypedPersistentActor; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Frontend actor which takes care of persisting generations and creates an appropriate ClientIdentifier. + * + * @author Robert Varga + */ +@Beta +public abstract class AbstractClientActor extends UntypedPersistentActor { + private static final Logger LOG = LoggerFactory.getLogger(AbstractClientActor.class); + private AbstractClientActorBehavior currentBehavior; + + protected AbstractClientActor(final FrontendIdentifier frontendId) { + currentBehavior = new RecoveringClientActorBehavior( + new InitialClientActorContext(this, frontendId.toString()), frontendId); + } + + @Override + public final String persistenceId() { + return currentBehavior.persistenceId(); + } + + private void switchBehavior(final AbstractClientActorBehavior nextBehavior) { + if (!currentBehavior.equals(nextBehavior)) { + if (nextBehavior == null) { + LOG.debug("{}: shutting down", persistenceId()); + self().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } else { + LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior); + } + + currentBehavior = nextBehavior; + } + } + + @Override + public final void onReceiveCommand(final Object command) { + if (command == null) { + LOG.debug("{}: ignoring null command", persistenceId()); + return; + } + + if (currentBehavior != null) { + switchBehavior(currentBehavior.onReceiveCommand(command)); + } else { + LOG.debug("{}: shutting down, ignoring command {}", persistenceId(), command); + } + } + + @Override + public final void onReceiveRecover(final Object recover) { + switchBehavior(currentBehavior.onReceiveRecover(recover)); + } + + protected abstract ClientActorBehavior initialBehavior(ClientActorContext context); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java new file mode 100644 index 0000000000..9fd5b99581 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Base behavior attached to {@link AbstractClientActor}. Exposes + * @author user + * + * @param Type of associated context + * + * @author Robert Varga + */ +@Beta +public abstract class AbstractClientActorBehavior { + private final C context; + + AbstractClientActorBehavior(final @Nonnull C context) { + // Hidden to prevent outside subclasses. Users instantiated this via ClientActorBehavior + this.context = Preconditions.checkNotNull(context); + } + + /** + * Return an {@link AbstractClientActorContext} associated with this {@link AbstractClientActor}. + * + * @return A client actor context instance. + */ + protected final @Nonnull C context() { + return context; + } + + /** + * Return the persistence identifier associated with this {@link AbstractClientActor}. This identifier should be + * used in logging to identify this actor. + * + * @return Persistence identifier + */ + protected final @Nonnull String persistenceId() { + return context.persistenceId(); + } + + /** + * Return an {@link ActorRef} of this ClientActor. + * + * @return + */ + public final @Nonnull ActorRef self() { + return context.self(); + } + + /** + * Implementation-internal method for handling an incoming command message. + * + * @param command Command message + * @return Behavior which should be used with the next message. Return null if this actor should shut down. + */ + abstract @Nullable AbstractClientActorBehavior onReceiveCommand(@Nonnull Object command); + + /** + * Implementation-internal method for handling an incoming recovery message coming from persistence. + * + * @param recover Recover message + * @return Behavior which should be used with the next message. Return null if this actor should shut down. + */ + abstract @Nullable AbstractClientActorBehavior onReceiveRecover(@Nonnull Object recover); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java new file mode 100644 index 0000000000..3cbca269c4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; +import org.opendaylight.yangtools.concepts.Mutable; + +/** + * Common, externally-invisible superclass of contexts associated with a {@link AbstractClientActor}. End users pass this + * object via opaque {@link ClientActorContext}. + * + * @author Robert Varga + */ +abstract class AbstractClientActorContext implements Mutable { + private final String persistenceId; + private final ActorRef self; + + AbstractClientActorContext(final @Nonnull ActorRef self, final @Nonnull String persistenceId) { + this.persistenceId = Preconditions.checkNotNull(persistenceId); + this.self = Preconditions.checkNotNull(self); + } + + final @Nonnull String persistenceId() { + return persistenceId; + } + + final @Nonnull ActorRef self() { + return self; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java new file mode 100644 index 0000000000..b770b99871 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import com.google.common.annotations.Beta; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; + +/** + * A behavior, which handles messages sent to a {@link AbstractClientActor}. + * + * @param Frontend type + * + * @author Robert Varga + */ +@Beta +public abstract class ClientActorBehavior extends RecoveredClientActorBehavior + implements Identifiable { + protected ClientActorBehavior(final @Nonnull ClientActorContext context) { + super(context); + } + + @Override + final ClientActorBehavior onReceiveCommand(final Object command) { + // TODO: any client-common logic (such as validation and common dispatch) needs to go here + return onCommand(command); + } + + @Override + public final @Nonnull ClientIdentifier getIdentifier() { + return context().getIdentifier(); + } + + /** + * Override this method to handle any command which is not handled by the base behavior. + * + * @param command + * @return Next behavior to use, null if this actor should shut down. + */ + protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java new file mode 100644 index 0000000000..ca393608b7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; + +/** + * An actor context associated with this {@link AbstractClientActor} + * + * @author Robert Varga + */ +@Beta +public final class ClientActorContext extends AbstractClientActorContext implements Identifiable { + private final ClientIdentifier identifier; + + ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) { + super(self, persistenceId); + this.identifier = Preconditions.checkNotNull(identifier); + } + + @Override + public ClientIdentifier getIdentifier() { + return identifier; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java new file mode 100644 index 0000000000..636dd1e34f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; + +/** + * + * @author Robert Varga + */ +final class InitialClientActorContext extends AbstractClientActorContext { + private final AbstractClientActor actor; + + InitialClientActorContext(final AbstractClientActor actor, final String persistenceId) { + super(actor.self(), persistenceId); + this.actor = Preconditions.checkNotNull(actor); + } + + void saveSnapshot(final ClientIdentifier snapshot) { + actor.saveSnapshot(snapshot); + } + + ClientActorBehavior createBehavior(final ClientActorContext context) { + return actor.initialBehavior(context); + } + + void stash() { + actor.stash(); + } + + void unstash() { + actor.unstashAll(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java new file mode 100644 index 0000000000..a5b042cc23 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +/** + * @param Concrete context type + * + * @author Robert Varga + */ +abstract class RecoveredClientActorBehavior extends AbstractClientActorBehavior { + + RecoveredClientActorBehavior(final C context) { + super(context); + } + + @Override + final AbstractClientActorBehavior onReceiveRecover(Object recover) { + throw new IllegalStateException("Frontend has been recovered"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java new file mode 100644 index 0000000000..7a8bcce0d5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.persistence.RecoveryCompleted; +import akka.persistence.SnapshotOffer; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param Frontend type + * + * @author Robert Varga + */ +final class RecoveringClientActorBehavior extends AbstractClientActorBehavior { + private static final Logger LOG = LoggerFactory.getLogger(RecoveringClientActorBehavior.class); + private final FrontendIdentifier currentFrontend; + private ClientIdentifier lastId = null; + + RecoveringClientActorBehavior(final InitialClientActorContext context, final FrontendIdentifier frontendId) { + super(context); + currentFrontend = Preconditions.checkNotNull(frontendId); + } + + @Override + AbstractClientActorBehavior onReceiveCommand(final Object command) { + throw new IllegalStateException("Frontend is recovering"); + } + + @Override + AbstractClientActorBehavior onReceiveRecover(final Object recover) { + if (recover instanceof RecoveryCompleted) { + final ClientIdentifier nextId; + if (lastId != null) { + if (!currentFrontend.equals(lastId.getFrontendId())) { + LOG.error("Mismatched frontend identifier, shutting down. Current: {} Saved: {}", currentFrontend, + lastId.getFrontendId()); + return null; + } + + nextId = ClientIdentifier.create(currentFrontend, lastId.getGeneration() + 1); + } else { + nextId = ClientIdentifier.create(currentFrontend, 0); + } + + LOG.debug("{}: persisting new identifier {}", persistenceId(), nextId); + context().saveSnapshot(nextId); + return new SavingClientActorBehavior(context(), nextId); + } else if (recover instanceof SnapshotOffer) { + lastId = (ClientIdentifier) ((SnapshotOffer)recover).snapshot(); + LOG.debug("{}: recovered identifier {}", lastId); + } else { + LOG.warn("{}: ignoring recovery message {}", recover); + } + + return this; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java new file mode 100644 index 0000000000..ac5f12fb9d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Robert Varga + */ +final class SavingClientActorBehavior extends RecoveredClientActorBehavior { + private static final Logger LOG = LoggerFactory.getLogger(SavingClientActorBehavior.class); + private final ClientIdentifier myId; + + SavingClientActorBehavior(final InitialClientActorContext context, final ClientIdentifier nextId) { + super(context); + this.myId = Preconditions.checkNotNull(nextId); + } + + @Override + AbstractClientActorBehavior onReceiveCommand(final Object command) { + if (command instanceof SaveSnapshotFailure) { + LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) command).cause()); + return null; + } else if (command instanceof SaveSnapshotSuccess) { + context().unstash(); + return context().createBehavior(new ClientActorContext(self(), persistenceId(), myId)); + } else { + LOG.debug("{}: stashing command {}", persistenceId(), command); + context().stash(); + return this; + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java new file mode 100644 index 0000000000..1e9ed1aa0a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * This package contains the baseline client infrastructure required to implement clients accessing the data store. + * + * @author Robert Varga + */ +package org.opendaylight.controller.cluster.datastore.actors.client; \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 0c7575a61d..d3932f50c6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -141,6 +141,13 @@ public class DistributedDataStoreRemotingIntegrationTest { @After public void tearDown() { + if (followerDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + if (leaderDistributedDataStore != null) { + leaderDistributedDataStore.close(); + } + JavaTestKit.shutdownActorSystem(leaderSystem); JavaTestKit.shutdownActorSystem(followerSystem); JavaTestKit.shutdownActorSystem(follower2System); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index d3bdc6a6c0..c8c254879c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -20,12 +20,18 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.FiniteDuration; public class DistributedDataStoreTest extends AbstractActorTest { + private static final ClientIdentifier UNKNOWN_ID = ClientIdentifier.create( + FrontendIdentifier.create(MemberName.forName("local"), FrontendType.forName("unknown")), 0); private SchemaContext schemaContext; @@ -50,7 +56,7 @@ public class DistributedDataStoreTest extends AbstractActorTest { @Test public void testRateLimitingUsedInReadWriteTxCreation(){ - try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) { + try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) { distributedDataStore.newReadWriteTransaction(); @@ -60,7 +66,7 @@ public class DistributedDataStoreTest extends AbstractActorTest { @Test public void testRateLimitingUsedInWriteOnlyTxCreation(){ - try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) { + try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) { distributedDataStore.newWriteOnlyTransaction(); @@ -70,7 +76,7 @@ public class DistributedDataStoreTest extends AbstractActorTest { @Test public void testRateLimitingNotUsedInReadOnlyTxCreation(){ - try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) { + try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) { distributedDataStore.newReadOnlyTransaction(); distributedDataStore.newReadOnlyTransaction(); @@ -85,7 +91,7 @@ public class DistributedDataStoreTest extends AbstractActorTest { doReturn(datastoreContext).when(actorContext).getDatastoreContext(); doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout(); doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration(); - try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) { + try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) { long start = System.currentTimeMillis(); @@ -99,17 +105,14 @@ public class DistributedDataStoreTest extends AbstractActorTest { @Test public void testWaitTillReadyCountDown(){ - try (final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) { + try (final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) { doReturn(datastoreContext).when(actorContext).getDatastoreContext(); doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout(); doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration(); - Executors.newSingleThreadExecutor().submit(new Runnable() { - @Override - public void run() { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - distributedDataStore.getWaitTillReadyCountDownLatch().countDown(); - } + Executors.newSingleThreadExecutor().submit(() -> { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + distributedDataStore.getWaitTillReadyCountDownLatch().countDown(); }); long start = System.currentTimeMillis(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java index 78e2c91384..48baef5b16 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java @@ -30,6 +30,7 @@ import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Arrays; @@ -115,9 +116,10 @@ public class DistributedEntityOwnershipIntegrationTest { @After public void tearDown() { - for(MemberNode m: memberNodes) { + for (MemberNode m : Lists.reverse(memberNodes)) { m.cleanup(); } + memberNodes.clear(); } private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) { -- 2.36.6