BUG-5280: introduce DistributedDataStoreClientActor 83/38683/52
authorRobert Varga <rovarga@cisco.com>
Wed, 11 May 2016 21:20:45 +0000 (23:20 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 6 Jun 2016 13:42:56 +0000 (15:42 +0200)
This patch introduces a common ClientActor, which keeps track of frontend
generations. Also introduce bind for DistributedDataStore, which uses this
common infrastructure.

Interface between the DistributedDataStore and the actor world is captured
as DistributedDataStoreClient.

Change-Id: I42c3281ca790fb5615a593740424ac494469e6a7
Signed-off-by: Robert Varga <rovarga@cisco.com>
22 files changed:
opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java

index 8ec173d..af2b4a9 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileInputStream;
@@ -102,9 +103,10 @@ public class ClusterAdminRpcServiceTest {
 
     @After
     public void tearDown() {
-        for(MemberNode m: memberNodes) {
+        for (MemberNode m : Lists.reverse(memberNodes)) {
             m.cleanup();
         }
+        memberNodes.clear();
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
new file mode 100644 (file)
index 0000000..b22f2bd
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Client-side view of a local history. This class tracks all state related to a particular history and routes
+ * frontend requests towards the backend.
+ *
+ * This interface is used by the world outside of the actor system and in the actor system it is manifested via
+ * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
+ *
+ * @author Robert Varga
+ */
+@Beta
+@NotThreadSafe
+public final class ClientLocalHistory implements AutoCloseable {
+    private static final AtomicIntegerFieldUpdater<ClientLocalHistory> CLOSED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state");
+    private static final int IDLE_STATE = 0;
+    private static final int CLOSED_STATE = 1;
+
+    private final LocalHistoryIdentifier historyId;
+    private final ActorRef backendActor;
+    private final ActorRef clientActor;
+
+    private volatile int state = IDLE_STATE;
+
+    ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId,
+            final ActorRef backendActor) {
+        this.clientActor = client.self();
+        this.backendActor = Preconditions.checkNotNull(backendActor);
+        this.historyId = new LocalHistoryIdentifier(client.getIdentifier(), historyId);
+    }
+
+    private void checkNotClosed() {
+        Preconditions.checkState(state != CLOSED_STATE, "Local history %s has been closed", historyId);
+    }
+
+    @Override
+    public void close() {
+        if (CLOSED_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) {
+            // FIXME: signal close to both client actor and backend actor
+        } else if (state != CLOSED_STATE) {
+            throw new IllegalStateException("Cannot close history with an open transaction");
+        }
+    }
+
+    // FIXME: add client requests related to a particular local history
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/CreateLocalHistoryCommand.java
new file mode 100644 (file)
index 0000000..197b2f6
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Command sent from the Java world to the client actor to create a new local history.
+ *
+ * @author Robert Varga
+ */
+final class CreateLocalHistoryCommand {
+    private final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
+
+    CompletableFuture<ClientLocalHistory> future() {
+        return future;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java
new file mode 100644 (file)
index 0000000..82c839e
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+/**
+ * Client interface for interacting with the frontend actor. This interface is the primary access point through
+ * which the DistributedDataStore frontend interacts with backend Shards.
+ *
+ * Keep this interface as clean as possible, as it needs to be implemented in thread-safe and highly-efficient manner.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface DistributedDataStoreClient extends Identifiable<ClientIdentifier>, AutoCloseable {
+    @Override
+    ClientIdentifier getIdentifier();
+
+    @Override
+    void close();
+
+    /**
+     * Create a new local history. This method initiates an asynchronous instantiation of a local history on the back
+     * end. ClientLocalHistory represents the interface exposed to the client.
+     *
+     * @return Future client history handle
+     */
+    CompletionStage<ClientLocalHistory> createLocalHistory();
+
+    // TODO: add methods required by DistributedDataStore
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientActor.java
new file mode 100644 (file)
index 0000000..1e15fef
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.ExplicitAskSupport;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.actors.client.AbstractClientActor;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import scala.Function1;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * A {@link AbstractClientActor} which acts as the point of contact for DistributedDataStore.
+ *
+ * @author Robert Varga
+ */
+public final class DistributedDataStoreClientActor extends AbstractClientActor {
+    // Unfortunately Akka's explicit ask pattern does not work with its Java API, as it fails to invoke passed message.
+    // In order to make this work for now, we tap directly into ExplicitAskSupport and use a Scala function instead
+    // of akka.japi.Function.
+    private static final ExplicitAskSupport ASK_SUPPORT = akka.pattern.extended.package$.MODULE$;
+    private static final Function1<ActorRef, Object> GET_CLIENT_FACTORY = new AbstractFunction1<ActorRef, Object>() {
+        @Override
+        public Object apply(final ActorRef askSender) {
+            return new GetClientRequest(askSender);
+        }
+    };
+
+    private DistributedDataStoreClientActor(final FrontendIdentifier frontendId) {
+        super(frontendId);
+    }
+
+    @Override
+    protected ClientActorBehavior initialBehavior(final ClientActorContext context) {
+        return new DistributedDataStoreClientBehavior(context);
+    }
+
+    public static Props props(final @Nonnull MemberName memberName, @Nonnull final String storeName) {
+        final String name = "DistributedDataStore:storeName='" + storeName + "'";
+        final FrontendIdentifier frontendId = FrontendIdentifier.create(memberName, FrontendType.forName(name));
+        return Props.create(DistributedDataStoreClientActor.class, () -> new DistributedDataStoreClientActor(frontendId));
+    }
+
+    public static DistributedDataStoreClient getDistributedDataStoreClient(final @Nonnull ActorRef actor,
+            final long timeout, final TimeUnit unit) {
+        try {
+            return (DistributedDataStoreClient) Await.result(ASK_SUPPORT.ask(actor, GET_CLIENT_FACTORY,
+                Timeout.apply(timeout, unit)), Duration.Inf());
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
new file mode 100644 (file)
index 0000000..2b5e675
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
+ * frontend.
+ *
+ * This class is not visible outside of this package because it breaks the actor containment. Services provided to
+ * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
+ *
+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
+ *            When touching internal state, be mindful of the execution context from which execution context, Actor
+ *            or POJO, is the state being accessed or modified.
+ *
+ * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
+ *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
+ *                doubt, feel free to synchronize on this object.
+ *
+ * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
+ *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
+ *              for correctness and performance impact.
+ *
+ * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
+ *             for performing work and charging applications for it. That has two positive effects:
+ *             - CPU usage is distributed across applications, minimizing work done in the actor thread
+ *             - CPU usage provides back-pressure towards the application.
+ *
+ * @author Robert Varga
+ */
+final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
+    private static final Object SHUTDOWN = new Object() {
+        @Override
+        public String toString() {
+            return "SHUTDOWN";
+        }
+    };
+
+    private long nextHistoryId;
+
+    DistributedDataStoreClientBehavior(final ClientActorContext context) {
+        super(context);
+    }
+
+    //
+    //
+    // Methods below are invoked from the client actor thread
+    //
+    //
+
+    private void createLocalHistory(final CreateLocalHistoryCommand command) {
+        final CompletableFuture<ClientLocalHistory> future = command.future();
+        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
+        LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
+
+        // FIXME: initiate backend instantiation
+        future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+    }
+
+    @Override
+    protected ClientActorBehavior onCommand(final Object command) {
+        if (command instanceof CreateLocalHistoryCommand) {
+            createLocalHistory((CreateLocalHistoryCommand) command);
+        } else if (command instanceof GetClientRequest) {
+            ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
+        } else if (SHUTDOWN.equals(command)) {
+            // Add shutdown procedures here
+            return null;
+        } else {
+            LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
+        }
+
+        return this;
+    }
+
+    //
+    //
+    // Methods below are invoked from application threads
+    //
+    //
+
+    @Override
+    public CompletionStage<ClientLocalHistory> createLocalHistory() {
+        final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
+        self().tell(command, ActorRef.noSender());
+        return command.future();
+    }
+
+    @Override
+    public void close() {
+        self().tell(SHUTDOWN, ActorRef.noSender());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/GetClientRequest.java
new file mode 100644 (file)
index 0000000..4bfd1c4
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+
+/**
+ * Request the ClientIdentifier from a particular actor. Response is an instance of {@link DistributedDataStoreClient}.
+ *
+ * @author Robert Varga
+ */
+final class GetClientRequest {
+    private final ActorRef replyTo;
+
+    public GetClientRequest(final ActorRef replyTo) {
+        this.replyTo = Preconditions.checkNotNull(replyTo);
+    }
+
+    ActorRef getReplyTo() {
+        return replyTo;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/package-info.java
new file mode 100644 (file)
index 0000000..4ff8544
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ * This package contains implementation required by the DistributedDataStore frontend.
+ *
+ * @author Robert Varga
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
\ No newline at end of file
index 0244eb3..fe321d4 100644 (file)
@@ -10,11 +10,17 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
@@ -51,7 +57,6 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-    private static final String UNKNOWN_TYPE = "unknown";
 
     private static final long READY_WAIT_FACTOR = 3;
 
@@ -66,7 +71,8 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
 
-    private final String type;
+    private final ClientIdentifier identifier;
+    private final DistributedDataStoreClient client;
 
     private final TransactionContextFactory txContextFactory;
 
@@ -78,9 +84,22 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
 
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName());
+        final ActorRef clientActor = actorSystem.actorOf(clientProps);
+        try {
+            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to get actor for {}", clientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            throw Throwables.propagate(e);
+        }
+
+        identifier = client.getIdentifier();
+        LOG.debug("Distributed data store client {} started", identifier);
 
-        String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+        String shardManagerId = ShardManagerIdentifier.builder()
+                .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
@@ -112,10 +131,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     @VisibleForTesting
-    DistributedDataStore(ActorContext actorContext) {
+    DistributedDataStore(ActorContext actorContext, ClientIdentifier identifier) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.client = null;
+        this.identifier = Preconditions.checkNotNull(identifier);
         this.txContextFactory = TransactionContextFactory.create(actorContext);
-        this.type = UNKNOWN_TYPE;
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
     }
@@ -214,7 +234,7 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     @Override
     public void close() {
-        LOG.info("Closing data store {}", type);
+        LOG.info("Closing data store {}", identifier);
 
         if (datastoreConfigMXBean != null) {
             datastoreConfigMXBean.unregisterMBean();
@@ -233,6 +253,10 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
         txContextFactory.close();
         actorContext.shutdown();
+
+        if (client != null) {
+            client.close();
+        }
     }
 
     @Override
@@ -241,11 +265,11 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
     }
 
     public void waitTillReady(){
-        LOG.info("Beginning to wait for data store to become ready : {}", type);
+        LOG.info("Beginning to wait for data store to become ready : {}", identifier);
 
         try {
             if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
-                LOG.debug("Data store {} is now ready", type);
+                LOG.debug("Data store {} is now ready", identifier);
             } else {
                 LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
             }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActor.java
new file mode 100644 (file)
index 0000000..5f94084
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.persistence.UntypedPersistentActor;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend actor which takes care of persisting generations and creates an appropriate ClientIdentifier.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractClientActor extends UntypedPersistentActor {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientActor.class);
+    private AbstractClientActorBehavior<?> currentBehavior;
+
+    protected AbstractClientActor(final FrontendIdentifier frontendId) {
+        currentBehavior = new RecoveringClientActorBehavior(
+                new InitialClientActorContext(this, frontendId.toString()), frontendId);
+    }
+
+    @Override
+    public final String persistenceId() {
+        return currentBehavior.persistenceId();
+    }
+
+    private void switchBehavior(final AbstractClientActorBehavior<?> nextBehavior) {
+        if (!currentBehavior.equals(nextBehavior)) {
+            if (nextBehavior == null) {
+                LOG.debug("{}: shutting down", persistenceId());
+                self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+            } else {
+                LOG.debug("{}: switched from {} to {}", persistenceId(), currentBehavior, nextBehavior);
+            }
+
+            currentBehavior = nextBehavior;
+        }
+    }
+
+    @Override
+    public final void onReceiveCommand(final Object command) {
+        if (command == null) {
+            LOG.debug("{}: ignoring null command", persistenceId());
+            return;
+        }
+
+        if (currentBehavior != null) {
+            switchBehavior(currentBehavior.onReceiveCommand(command));
+        } else {
+            LOG.debug("{}: shutting down, ignoring command {}", persistenceId(), command);
+        }
+    }
+
+    @Override
+    public final void onReceiveRecover(final Object recover) {
+        switchBehavior(currentBehavior.onReceiveRecover(recover));
+    }
+
+    protected abstract ClientActorBehavior initialBehavior(ClientActorContext context);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorBehavior.java
new file mode 100644 (file)
index 0000000..9fd5b99
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Base behavior attached to {@link AbstractClientActor}. Exposes
+ * @author user
+ *
+ * @param <C> Type of associated context
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractClientActorBehavior<C extends AbstractClientActorContext> {
+    private final C context;
+
+    AbstractClientActorBehavior(final @Nonnull C context) {
+        // Hidden to prevent outside subclasses. Users instantiated this via ClientActorBehavior
+        this.context = Preconditions.checkNotNull(context);
+    }
+
+    /**
+     * Return an {@link AbstractClientActorContext} associated with this {@link AbstractClientActor}.
+     *
+     * @return A client actor context instance.
+     */
+    protected final @Nonnull C context() {
+        return context;
+    }
+
+    /**
+     * Return the persistence identifier associated with this {@link AbstractClientActor}. This identifier should be
+     * used in logging to identify this actor.
+     *
+     * @return Persistence identifier
+     */
+    protected final @Nonnull String persistenceId() {
+        return context.persistenceId();
+    }
+
+    /**
+     * Return an {@link ActorRef} of this ClientActor.
+     *
+     * @return
+     */
+    public final @Nonnull ActorRef self() {
+        return context.self();
+    }
+
+    /**
+     * Implementation-internal method for handling an incoming command message.
+     *
+     * @param command Command message
+     * @return Behavior which should be used with the next message. Return null if this actor should shut down.
+     */
+    abstract @Nullable AbstractClientActorBehavior<?> onReceiveCommand(@Nonnull Object command);
+
+    /**
+     * Implementation-internal method for handling an incoming recovery message coming from persistence.
+     *
+     * @param recover Recover message
+     * @return Behavior which should be used with the next message. Return null if this actor should shut down.
+     */
+    abstract @Nullable AbstractClientActorBehavior<?> onReceiveRecover(@Nonnull Object recover);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/AbstractClientActorContext.java
new file mode 100644 (file)
index 0000000..3cbca26
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.Mutable;
+
+/**
+ * Common, externally-invisible superclass of contexts associated with a {@link AbstractClientActor}. End users pass this
+ * object via opaque {@link ClientActorContext}.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractClientActorContext implements Mutable {
+    private final String persistenceId;
+    private final ActorRef self;
+
+    AbstractClientActorContext(final @Nonnull ActorRef self, final @Nonnull String persistenceId) {
+        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+        this.self = Preconditions.checkNotNull(self);
+    }
+
+    final @Nonnull String persistenceId() {
+        return persistenceId;
+    }
+
+    final @Nonnull ActorRef self() {
+        return self;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java
new file mode 100644 (file)
index 0000000..b770b99
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import com.google.common.annotations.Beta;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+/**
+ * A behavior, which handles messages sent to a {@link AbstractClientActor}.
+ *
+ * @param <T> Frontend type
+ *
+ * @author Robert Varga
+ */
+@Beta
+public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<ClientActorContext>
+        implements Identifiable<ClientIdentifier> {
+    protected ClientActorBehavior(final @Nonnull ClientActorContext context) {
+        super(context);
+    }
+
+    @Override
+    final ClientActorBehavior onReceiveCommand(final Object command) {
+        // TODO: any client-common logic (such as validation and common dispatch) needs to go here
+        return onCommand(command);
+    }
+
+    @Override
+    public final @Nonnull ClientIdentifier getIdentifier() {
+        return context().getIdentifier();
+    }
+
+    /**
+     * Override this method to handle any command which is not handled by the base behavior.
+     *
+     * @param command
+     * @return Next behavior to use, null if this actor should shut down.
+     */
+    protected abstract @Nullable ClientActorBehavior onCommand(@Nonnull Object command);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java
new file mode 100644 (file)
index 0000000..ca39360
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+
+/**
+ * An actor context associated with this {@link AbstractClientActor}
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
+    private final ClientIdentifier identifier;
+
+    ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) {
+        super(self, persistenceId);
+        this.identifier = Preconditions.checkNotNull(identifier);
+    }
+
+    @Override
+    public ClientIdentifier getIdentifier() {
+        return identifier;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java
new file mode 100644 (file)
index 0000000..636dd1e
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+
+/**
+ *
+ * @author Robert Varga
+ */
+final class InitialClientActorContext extends AbstractClientActorContext {
+    private final AbstractClientActor actor;
+
+    InitialClientActorContext(final AbstractClientActor actor, final String persistenceId) {
+        super(actor.self(), persistenceId);
+        this.actor = Preconditions.checkNotNull(actor);
+    }
+
+    void saveSnapshot(final ClientIdentifier snapshot) {
+        actor.saveSnapshot(snapshot);
+    }
+
+    ClientActorBehavior createBehavior(final ClientActorContext context) {
+        return actor.initialBehavior(context);
+    }
+
+    void stash() {
+        actor.stash();
+    }
+
+    void unstash() {
+        actor.unstashAll();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveredClientActorBehavior.java
new file mode 100644 (file)
index 0000000..a5b042c
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+/**
+ * @param <C> Concrete context type
+ *
+ * @author Robert Varga
+ */
+abstract class RecoveredClientActorBehavior<C extends AbstractClientActorContext> extends AbstractClientActorBehavior<C> {
+
+    RecoveredClientActorBehavior(final C context) {
+        super(context);
+    }
+
+    @Override
+    final AbstractClientActorBehavior<?> onReceiveRecover(Object recover) {
+        throw new IllegalStateException("Frontend has been recovered");
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RecoveringClientActorBehavior.java
new file mode 100644 (file)
index 0000000..7a8bcce
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param <T> Frontend type
+ *
+ * @author Robert Varga
+ */
+final class RecoveringClientActorBehavior extends AbstractClientActorBehavior<InitialClientActorContext> {
+    private static final Logger LOG = LoggerFactory.getLogger(RecoveringClientActorBehavior.class);
+    private final FrontendIdentifier currentFrontend;
+    private ClientIdentifier lastId = null;
+
+    RecoveringClientActorBehavior(final InitialClientActorContext context, final FrontendIdentifier frontendId) {
+        super(context);
+        currentFrontend = Preconditions.checkNotNull(frontendId);
+    }
+
+    @Override
+    AbstractClientActorBehavior<?> onReceiveCommand(final Object command) {
+        throw new IllegalStateException("Frontend is recovering");
+    }
+
+    @Override
+    AbstractClientActorBehavior<?> onReceiveRecover(final Object recover) {
+        if (recover instanceof RecoveryCompleted) {
+            final ClientIdentifier nextId;
+            if (lastId != null) {
+                if (!currentFrontend.equals(lastId.getFrontendId())) {
+                    LOG.error("Mismatched frontend identifier, shutting down. Current: {} Saved: {}", currentFrontend,
+                    lastId.getFrontendId());
+                    return null;
+                }
+
+                nextId = ClientIdentifier.create(currentFrontend, lastId.getGeneration() + 1);
+            } else {
+                nextId = ClientIdentifier.create(currentFrontend, 0);
+            }
+
+            LOG.debug("{}: persisting new identifier {}", persistenceId(), nextId);
+            context().saveSnapshot(nextId);
+            return new SavingClientActorBehavior(context(), nextId);
+        } else if (recover instanceof SnapshotOffer) {
+            lastId = (ClientIdentifier) ((SnapshotOffer)recover).snapshot();
+            LOG.debug("{}: recovered identifier {}", lastId);
+        } else {
+            LOG.warn("{}: ignoring recovery message {}", recover);
+        }
+
+        return this;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java
new file mode 100644 (file)
index 0000000..ac5f12f
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Robert Varga
+ */
+final class SavingClientActorBehavior extends RecoveredClientActorBehavior<InitialClientActorContext> {
+    private static final Logger LOG = LoggerFactory.getLogger(SavingClientActorBehavior.class);
+    private final ClientIdentifier myId;
+
+    SavingClientActorBehavior(final InitialClientActorContext context, final ClientIdentifier nextId) {
+        super(context);
+        this.myId = Preconditions.checkNotNull(nextId);
+    }
+
+    @Override
+    AbstractClientActorBehavior<?> onReceiveCommand(final Object command) {
+        if (command instanceof SaveSnapshotFailure) {
+            LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) command).cause());
+            return null;
+        } else if (command instanceof SaveSnapshotSuccess) {
+            context().unstash();
+            return context().createBehavior(new ClientActorContext(self(), persistenceId(), myId));
+        } else {
+            LOG.debug("{}: stashing command {}", persistenceId(), command);
+            context().stash();
+            return this;
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/package-info.java
new file mode 100644 (file)
index 0000000..1e9ed1a
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ * This package contains the baseline client infrastructure required to implement clients accessing the data store.
+ *
+ * @author Robert Varga
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
\ No newline at end of file
index 0c7575a..d3932f5 100644 (file)
@@ -141,6 +141,13 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
     @After
     public void tearDown() {
+        if (followerDistributedDataStore != null) {
+            leaderDistributedDataStore.close();
+        }
+        if (leaderDistributedDataStore != null) {
+            leaderDistributedDataStore.close();
+        }
+
         JavaTestKit.shutdownActorSystem(leaderSystem);
         JavaTestKit.shutdownActorSystem(followerSystem);
         JavaTestKit.shutdownActorSystem(follower2System);
index d3bdc6a..c8c2548 100644 (file)
@@ -20,12 +20,18 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.FiniteDuration;
 
 public class DistributedDataStoreTest extends AbstractActorTest {
+    private static final ClientIdentifier UNKNOWN_ID = ClientIdentifier.create(
+            FrontendIdentifier.create(MemberName.forName("local"), FrontendType.forName("unknown")), 0);
 
     private SchemaContext schemaContext;
 
@@ -50,7 +56,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
 
     @Test
     public void testRateLimitingUsedInReadWriteTxCreation(){
-        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
 
             distributedDataStore.newReadWriteTransaction();
 
@@ -60,7 +66,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
 
     @Test
     public void testRateLimitingUsedInWriteOnlyTxCreation(){
-        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
 
             distributedDataStore.newWriteOnlyTransaction();
 
@@ -70,7 +76,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
 
     @Test
     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
-        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
 
             distributedDataStore.newReadOnlyTransaction();
             distributedDataStore.newReadOnlyTransaction();
@@ -85,7 +91,7 @@ public class DistributedDataStoreTest extends AbstractActorTest {
         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
         doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
         doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
-        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+        try (DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
 
             long start = System.currentTimeMillis();
 
@@ -99,17 +105,14 @@ public class DistributedDataStoreTest extends AbstractActorTest {
 
     @Test
     public void testWaitTillReadyCountDown(){
-        try (final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext)) {
+        try (final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext, UNKNOWN_ID)) {
             doReturn(datastoreContext).when(actorContext).getDatastoreContext();
             doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
             doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
 
-            Executors.newSingleThreadExecutor().submit(new Runnable() {
-                @Override
-                public void run() {
-                    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
-                    distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
-                }
+            Executors.newSingleThreadExecutor().submit(() -> {
+                Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+                distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
             });
 
             long start = System.currentTimeMillis();
index 78e2c91..48baef5 100644 (file)
@@ -30,6 +30,7 @@ import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -115,9 +116,10 @@ public class DistributedEntityOwnershipIntegrationTest {
 
     @After
     public void tearDown() {
-        for(MemberNode m: memberNodes) {
+        for (MemberNode m : Lists.reverse(memberNodes)) {
             m.cleanup();
         }
+        memberNodes.clear();
     }
 
     private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.