CDS: Retry remote front-end transactions on AskTimeoutException 61/24261/2
authorTom Pantelis <tpanteli@brocade.com>
Wed, 1 Jul 2015 20:28:43 +0000 (16:28 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 20 Jul 2015 21:30:37 +0000 (21:30 +0000)
With the front-end PrimaryShardInfo cache, if the cached primary/leader
shard is remote and unavailable, the RemoteTransactionContextSupport
will fail with an AskTimeoutException when it tries to send the
CreateTransaction message. Since it can take at least 1 election timeout
period to re-elect a new leader, I changed RemoteTransactionContextSupport
to also retry on AskTimeoutException (it already retries on
NoShardLeaderException). However instead of re-sending the
CreateTransaction message, as it did before, it now re-sends the
FindPrimary message to get a new primary shard actor.

I also modified how RemoteTransactionContextSupport retries. It will now
retry for a total period of 2 times the shard election timeout which
should be ample time for a re-election to occur. If no leader is found then
the txn will fail.

I also added a ShardLeaderNotRespondingException which the
RemoteTransactionContextSupport will throw if it ends up with an
AskTimeoutException after the tx creation timeout period. This shouldn't
occur normally as, with the retries, it should get a NoShardLeaderException
even if the initial error was AskTimeoutException. But it's possible to
end up with an AskTimeoutException, eg if the system is overloaded and
the election timeout is delayed.

During testing, I noticed that if you take down the 2 followers and try
a transaction, it fails with an AskTimeoutEx instead of
NoShardLeaderException as one would expect. This is b/c the leader
changes to an isolated leader. So I changed the Shardanager to return
NoShardLeaderException if the state is IsolatedLeader.

Change-Id: I3efd3f841cf41b7738aedb694fa18b44851b3074
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 845609758d1739ee07d5ca92f5448e18a8933861)

14 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NoShardLeaderException.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf [new file with mode: 0644]

index c16a84933572d436a14c0506858822039fdc8a77..582f25859ba9e01c8b9629a1e988a1142c05b621 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -197,7 +198,7 @@ public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
 
         LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
         final Exception e;
-        if(t instanceof NoShardLeaderException) {
+        if(t instanceof NoShardLeaderException || t instanceof ShardLeaderNotRespondingException) {
             e = new DataStoreUnavailableException(t.getMessage(), t);
         } else if (t instanceof Exception) {
             e = (Exception)t;
index 59205692d119ffc9183a7c505f39c43008ea28db..88c797a99074c366b63a703cc583f64c1608d310 100644 (file)
@@ -10,13 +10,17 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,10 +40,8 @@ import scala.concurrent.duration.FiniteDuration;
 final class RemoteTransactionContextSupport {
     private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
 
-    /**
-     * Time interval in between transaction create retries.
-     */
-    private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
+    private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000;
+    private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000;
 
     private final TransactionProxy parent;
     private final String shardName;
@@ -48,7 +50,13 @@ final class RemoteTransactionContextSupport {
      * The target primary shard.
      */
     private volatile ActorSelection primaryShard;
-    private volatile int createTxTries;
+
+    /**
+     * The total timeout for creating a tx on the primary shard.
+     */
+    private volatile long totalCreateTxTimeout;
+
+    private final Timeout createTxMessageTimeout;
 
     private final TransactionContextWrapper transactionContextWrapper;
 
@@ -57,9 +65,18 @@ final class RemoteTransactionContextSupport {
         this.parent = Preconditions.checkNotNull(parent);
         this.shardName = shardName;
         this.transactionContextWrapper = transactionContextWrapper;
-        createTxTries = (int) (parent.getActorContext().getDatastoreContext().
-                getShardLeaderElectionTimeout().duration().toMillis() /
-                CREATE_TX_TRY_INTERVAL.toMillis());
+
+        // For the total create tx timeout, use 2 times the election timeout. This should be enough time for
+        // a leader re-election to occur if we happen to hit it in transition.
+        totalCreateTxTimeout = parent.getActorContext().getDatastoreContext().getShardRaftConfig()
+                .getElectionTimeOutInterval().toMillis() * 2;
+
+        // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately
+        // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set
+        // larger than the totalCreateTxTimeout in production which we don't want.
+        long operationTimeout = parent.getActorContext().getOperationTimeout().duration().toMillis();
+        createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS),
+                TimeUnit.MILLISECONDS);
     }
 
     String getShardName() {
@@ -74,10 +91,6 @@ final class RemoteTransactionContextSupport {
         return parent.getActorContext();
     }
 
-    private OperationLimiter getOperationLimiter() {
-        return transactionContextWrapper.getLimiter();
-    }
-
     private TransactionIdentifier getIdentifier() {
         return parent.getIdentifier();
     }
@@ -113,7 +126,8 @@ final class RemoteTransactionContextSupport {
         Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
             getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
 
-        Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
+        Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard,
+                serializedCreateMessage, createTxMessageTimeout);
 
         createTxFuture.onComplete(new OnComplete<Object>() {
             @Override
@@ -123,21 +137,60 @@ final class RemoteTransactionContextSupport {
         }, getActorContext().getClientDispatcher());
     }
 
+    private void tryFindPrimaryShard() {
+        LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
+
+        this.primaryShard = null;
+        Future<PrimaryShardInfo> findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName);
+        findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
+            @Override
+            public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+                onFindPrimaryShardComplete(failure, primaryShardInfo);
+            }
+        }, getActorContext().getClientDispatcher());
+    }
+
+    private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+        if (failure == null) {
+            this.primaryShard = primaryShardInfo.getPrimaryShardActor();
+            tryCreateTransaction();
+        } else {
+            LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
+
+            onCreateTransactionComplete(failure, null);
+        }
+    }
+
     private void onCreateTransactionComplete(Throwable failure, Object response) {
-        if(failure instanceof NoShardLeaderException) {
-            // There's no leader for the shard yet - schedule and try again, unless we're out
-            // of retries. Note: createTxTries is volatile as it may be written by different
-            // threads however not concurrently, therefore decrementing it non-atomically here
-            // is ok.
-            if(--createTxTries > 0) {
-                LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
-                    getIdentifier(), shardName);
-
-                getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
+        // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
+        // the cached remote leader actor is no longer available.
+        boolean retryCreateTransaction = this.primaryShard != null &&
+                (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
+        if(retryCreateTransaction) {
+            // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
+            // be written by different threads however not concurrently, therefore decrementing it
+            // non-atomically here is ok.
+            if(totalCreateTxTimeout > 0) {
+                long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
+                if(failure instanceof AskTimeoutException) {
+                    // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
+                    // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
+                    // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
+                    totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
+                    scheduleInterval = 10;
+                }
+
+                totalCreateTxTimeout -= scheduleInterval;
+
+                LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
+                        getIdentifier(), shardName, failure, scheduleInterval);
+
+                getActorContext().getActorSystem().scheduler().scheduleOnce(
+                        FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
                         new Runnable() {
                             @Override
                             public void run() {
-                                tryCreateTransaction();
+                                tryFindPrimaryShard();
                             }
                         }, getActorContext().getClientDispatcher());
                 return;
@@ -160,7 +213,17 @@ final class RemoteTransactionContextSupport {
         if(failure != null) {
             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
+            Throwable resultingEx = failure;
+            if(failure instanceof AskTimeoutException) {
+                resultingEx = new ShardLeaderNotRespondingException(String.format(
+                        "Could not create a %s transaction on shard %s. The shard leader isn't responding.",
+                        parent.getType(), shardName), failure);
+            } else if(!(failure instanceof NoShardLeaderException)) {
+                resultingEx = new Exception(String.format(
+                    "Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
+            }
+
+            localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
index 65be23dfd0dd0de4b037710ab50604d31d6b0c81..ce3cae481e766f4ba4b7f93593e8d12a282ee294 100644 (file)
@@ -62,6 +62,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@ -410,13 +411,10 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
-    private void noLeaderError(Object message) {
+    private void noLeaderError(String errMessage, Object message) {
         // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
         // it more resilient in case we're in the process of electing a new leader.
-        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-            "Could not find the leader for shard %s. This typically happens" +
-            " when the system is coming up or recovering and a leader is being elected. Try again" +
-            " later.", persistenceId()))), getSelf());
+        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
@@ -433,6 +431,8 @@ public class Shard extends RaftActor {
         // window where it could have a stale leader during leadership transitions.
         //
         if(isLeader()) {
+            failIfIsolatedLeader(getSender());
+
             try {
                 commitCoordinator.handleBatchedModifications(batched, getSender(), this);
             } catch (Exception e) {
@@ -449,13 +449,27 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                noLeaderError(batched);
+                noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
             }
         }
     }
 
+    private boolean failIfIsolatedLeader(ActorRef sender) {
+        if(getRaftState() == RaftState.IsolatedLeader) {
+            sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+                    "Shard %s was the leader but has lost contact with all of its followers. Either all" +
+                    " other follower nodes are down or this node is isolated by a network partition.",
+                    persistenceId()))), getSelf());
+            return true;
+        }
+
+        return false;
+    }
+
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         if (isLeader()) {
+            failIfIsolatedLeader(getSender());
+
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
@@ -470,7 +484,7 @@ public class Shard extends RaftActor {
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
-                noLeaderError(message);
+                noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
             }
         }
     }
@@ -520,10 +534,8 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-                "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
-                " when the system is coming up or recovering and a leader is being elected. Try again" +
-                " later.", persistenceId()))), getSelf());
+            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+                    "Could not create a shard transaction", persistenceId())), getSelf());
         }
     }
 
@@ -541,6 +553,11 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         try {
+            if(TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
+                    failIfIsolatedLeader(getSender())) {
+                return;
+            }
+
             ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
                 createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
                 createTransaction.getVersion());
index 4f3c521f442ea10765d06dd4f0372d72968e712a..a46bb3dae8e5887202d66ad0ddfda51e5774537b 100644 (file)
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
@@ -65,6 +66,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The ShardManager has the following jobs,
@@ -365,8 +367,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 
+                FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+                if(shardInformation.isShardInitialized()) {
+                    // If the shard is already initialized then we'll wait enough time for the shard to
+                    // elect a leader, ie 2 times the election timeout.
+                    timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+                            .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
+                }
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
-                        datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
+                        timeout, getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                         getContext().dispatcher(), getSelf());
 
@@ -389,9 +399,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
-        return new NoShardLeaderException(String.format(
-                "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
-                "recovering and a leader is being elected. Try again later.", shardId));
+        return new NoShardLeaderException(null, shardId.toString());
     }
 
     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
@@ -733,7 +741,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardReadyWithLeaderId() {
-            return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
+            return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+                    (isLeader() || peerAddresses.get(leaderId) != null);
         }
 
         boolean isShardInitialized() {
index b205b0fe7853c176be3f92ef2a2f3d3fd952dbab..8365177d1b9307ae9c6cd7681d8cfdac0c50d2d9 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore.exceptions;
 
+import com.google.common.base.Strings;
+
 /**
  * Exception indicating a shard has no current leader.
  *
@@ -15,7 +17,12 @@ package org.opendaylight.controller.cluster.datastore.exceptions;
 public class NoShardLeaderException extends RuntimeException {
     private static final long serialVersionUID = 1L;
 
-    public NoShardLeaderException(String message){
+    public NoShardLeaderException(String message) {
         super(message);
     }
+
+    public NoShardLeaderException(String message, String shardName) {
+        super(String.format("%sShard %s currently has no leader. Try again later.",
+                (Strings.isNullOrEmpty(message) ? "" : message + ". "), shardName));
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/ShardLeaderNotRespondingException.java
new file mode 100644 (file)
index 0000000..5dfac1e
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+/**
+ * Exception indicating a shard leader is not responding to messages.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardLeaderNotRespondingException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public ShardLeaderNotRespondingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
index fcc18ed70251de9280fed8a0f32518a18f21ccc2..37ca9533b6ab3467d7e7150e7f2f36669584e864 100644 (file)
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Objects;
@@ -131,7 +132,9 @@ public abstract class AbstractTransactionProxyTest {
 
     protected final String memberName = "mock-member";
 
-    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2);
+    private final int operationTimeoutInSeconds = 2;
+    protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
+            .operationTimeoutInSeconds(operationTimeoutInSeconds);
 
     @BeforeClass
     public static void setUpClass() throws IOException {
@@ -159,6 +162,7 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
@@ -380,7 +384,7 @@ public abstract class AbstractTransactionProxyTest {
 
             doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(prefix, type));
+                        eqCreateTransaction(prefix, type), any(Timeout.class));
         }
 
         return txActorRef;
@@ -404,7 +408,12 @@ public abstract class AbstractTransactionProxyTest {
             future.checkedGet(5, TimeUnit.SECONDS);
             fail("Expected ReadFailedException");
         } catch(ReadFailedException e) {
-            throw e.getCause();
+            assertNotNull("Expected a cause", e.getCause());
+            if(e.getCause().getCause() != null) {
+                throw e.getCause().getCause();
+            } else {
+                throw e.getCause();
+            }
         }
     }
 
index 2319c5be384326a61ef8fb0d5c7519d2e812c498..9dd882f2de29d16e76d18ce6889f49e6fadc9902 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -34,6 +35,8 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
@@ -614,13 +617,17 @@ public class DistributedDataStoreIntegrationTest {
             // by setting the election timeout, which is based on the heartbeat interval, really high.
 
             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
-            datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
 
-            // Set the leader election timeout low for the test.
+            DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+
+            Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
+                    new FindLocalShard(shardName, true));
+            assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
 
-            datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
+            // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
 
-            DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
+            datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
+            dataStore.onDatastoreContextUpdated(datastoreContextBuilder.build());
 
             // Create the write Tx.
 
index 672d8f3b9997cc866d8320c11da3a7d55aa61e49..90600f952926de70e615377d7a97c61b9a9ec2eb 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -19,17 +20,22 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
+import akka.pattern.AskTimeoutException;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.math.BigInteger;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -75,10 +81,12 @@ public class DistributedDataStoreRemotingIntegrationTest {
     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
     private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
 
-    private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
+    private static final String MODULE_SHARDS_CONFIG_2 = "module-shards-member1-and-2.conf";
+    private static final String MODULE_SHARDS_CONFIG_3 = "module-shards-member1-and-2-and-3.conf";
 
     private ActorSystem leaderSystem;
     private ActorSystem followerSystem;
+    private ActorSystem follower2System;
 
     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
@@ -92,27 +100,35 @@ public class DistributedDataStoreRemotingIntegrationTest {
     private IntegrationTestKit leaderTestKit;
 
     @Before
-    public void setUpClass() {
+    public void setUp() {
         leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
         Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
 
         followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
         Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+
+        follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
+        Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
     }
 
     @After
-    public void tearDownClass() {
+    public void tearDown() {
         JavaTestKit.shutdownActorSystem(leaderSystem);
         JavaTestKit.shutdownActorSystem(followerSystem);
+        JavaTestKit.shutdownActorSystem(follower2System);
     }
 
     private void initDatastores(String type) {
+        initDatastores(type, MODULE_SHARDS_CONFIG_2);
+    }
+
+    private void initDatastores(String type, String moduleShardsConfig) {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
-        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
 
-        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
 
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
     }
@@ -478,8 +494,7 @@ public class DistributedDataStoreRemotingIntegrationTest {
         DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
                 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
-        DistributedDataStore newMember1Datastore = newMember1TestKit.
-                    setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
+        newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_2, false, SHARD_NAMES);
 
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
 
@@ -521,4 +536,115 @@ public class DistributedDataStoreRemotingIntegrationTest {
 
         verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
     }
+
+    @Test(expected=NoShardLeaderException.class)
+    public void testTransactionWithIsolatedLeader() throws Throwable {
+        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(300);
+        String testName = "testTransactionWithIsolatedLeader";
+        initDatastores(testName);
+
+        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+
+        Uninterruptibles.sleepUninterruptibly(leaderDistributedDataStore.getActorContext().getDatastoreContext()
+                .getShardRaftConfig().getElectionTimeOutInterval().toMillis() * 3, TimeUnit.MILLISECONDS);
+
+        DOMStoreWriteTransaction writeTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        try {
+            followerTestKit.doCommit(writeTx.ready());
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+
+    @Test(expected=AskTimeoutException.class)
+    public void testTransactionWithShardLeaderNotResponding() throws Throwable {
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
+        initDatastores("testTransactionWithShardLeaderNotResponding");
+
+        // Do an initial read to get the primary shard info cached.
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+        readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+        // Shutdown the leader and try to create a new tx.
+
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+        followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
+        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+        DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        try {
+            followerTestKit.doCommit(rwTx.ready());
+        } catch (ExecutionException e) {
+            assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
+                    e.getCause() instanceof ShardLeaderNotRespondingException);
+            assertNotNull("Expected a nested cause", e.getCause().getCause());
+            throw e.getCause().getCause();
+        }
+    }
+
+    @Test(expected=NoShardLeaderException.class)
+    public void testTransactionWithCreateTxFailureDueToNoLeader() throws Throwable {
+        initDatastores("testTransactionWithCreateTxFailureDueToNoLeader");
+
+        // Do an initial read to get the primary shard info cached.
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+        readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+        // Shutdown the leader and try to create a new tx.
+
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+        followerDatastoreContextBuilder.operationTimeoutInMillis(10).shardElectionTimeoutFactor(1);
+        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+        DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        try {
+            followerTestKit.doCommit(rwTx.ready());
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+
+    @Test
+    public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
+        followerDatastoreContextBuilder.shardElectionTimeoutFactor(30);
+        String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
+        initDatastores(testName, MODULE_SHARDS_CONFIG_3);
+
+        DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder().
+                shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+        IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
+        follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG_3, false, SHARD_NAMES);
+
+        // Do an initial read to get the primary shard info cached.
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+        readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+
+        // Shutdown the leader and try to create a new tx.
+
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+
+        followerDatastoreContextBuilder.operationTimeoutInMillis(500);
+        followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
+
+        DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+        rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        followerTestKit.doCommit(rwTx.ready());
+    }
 }
index b5cceea8510350faf6e357b81cfb8fb24afdf03b..4362047a32def9e5380585e68c6e9b543693f6eb 100644 (file)
@@ -80,7 +80,8 @@ public class ShardManagerTest extends AbstractActorTest {
     private static TestActorRef<MessageCollectorActor> mockShardActor;
 
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
-            dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
+            dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS)
+                   .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6);
 
     private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
         String name = new ShardIdentifier(shardName, memberName,"config").toString();
@@ -342,6 +343,22 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+            shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
+                    null, RaftState.IsolatedLeader.name()), mockShardActor);
+
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
+
+            expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
+        }};
+    }
+
     @Test
     public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception {
         new JavaTestKit(getSystem()) {{
index 4ea81f44e3c6f5b2e6cc5c97ba1b4010ede956a9..03e4a16e679dd50de0fe87aa2a177d80cf75cb9e 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
@@ -23,6 +24,7 @@ import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
 import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
 import akka.actor.ActorRef;
+import akka.util.Timeout;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -245,7 +247,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
         readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
-                eqCreateTransaction(tx2MemberName, READ_WRITE));
+                eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
     }
 
     @Test(expected=IllegalStateException.class)
index b4cbbd5d3b112efcdcd8316e41696edd4bb61f48..4a9f4246e553012ad9992e3f4d866a99f83350b4 100644 (file)
@@ -19,6 +19,7 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -138,7 +139,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }
 
         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), any());
+                any(ActorSelection.class), any(), any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -216,7 +217,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
             when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
-            eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
+            eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
+            any(Timeout.class));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
@@ -334,7 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
         doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
                 eq(getSystem().actorSelection(actorRef.path())),
-                eqCreateTransaction(memberName, READ_WRITE));
+                eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
@@ -838,13 +840,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }
 
         String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
-        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder().
-                setTransactionId("txn-1").setTransactionActorPath(actorPath).
-                setMessageVersion(DataStoreVersions.CURRENT_VERSION).build();
 
-        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, READ_WRITE));
+        doReturn(incompleteFuture()).when(mockActorContext).
+        executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                 eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
@@ -891,7 +890,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, READ_WRITE));
+                        eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
 
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
@@ -1311,9 +1310,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
                 expectBatchedModifications(1);
 
-                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
-                        any(ActorSelection.class), any(ReadyTransaction.class));
-
                 transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
                 transactionProxy.ready();
@@ -1574,7 +1570,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
-                        eqCreateTransaction(memberName, TransactionType.READ_ONLY));
+                        eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
 
         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
index 8f5550fe00949675b7d3c219face0166c8dc42e6..5357e6900a152b36bf441bc7be78e97fc4eb9a9e 100644 (file)
@@ -140,3 +140,57 @@ Member2 {
     }
   }
 }
+
+Member3 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    loglevel = "DEBUG"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+          readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+          "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2557
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-3"
+      ]
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2-and-3.conf
new file mode 100644 (file)
index 0000000..481e27d
--- /dev/null
@@ -0,0 +1,28 @@
+module-shards = [
+    {
+        name = "people"
+        shards = [
+            {
+                name="people"
+                replicas = [
+                    "member-1",
+                    "member-2",
+                    "member-3"
+                ]
+            }
+        ]
+    },
+    {
+        name = "cars"
+        shards = [
+            {
+                name="cars"
+                replicas = [
+                    "member-1",
+                    "member-2",
+                    "member-3"
+                ]
+            }
+        ]
+    }
+]