Allow shard settle timeout to be tuned
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractDataStore.java
index 7f759d1b4296adb275f6cf7ee3296dde8c252623..f7608aeb57785dc1395550cef6a5279cd2274870 100644 (file)
@@ -5,19 +5,20 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
@@ -44,6 +45,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 /**
  * Base implementation of a distributed DOMStore.
@@ -54,22 +56,15 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
 
-    private static final long READY_WAIT_FACTOR = 3;
-
+    private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+    private final ClientIdentifier identifier;
+    private final DataStoreClient client;
     private final ActorContext actorContext;
-    private final long waitTillReadyTimeInMillis;
 
     private AutoCloseable closeable;
-
     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
-
     private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
 
-    private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
-
-    private final ClientIdentifier identifier;
-    private final DataStoreClient client;
-
     @SuppressWarnings("checkstyle:IllegalCatch")
     protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
             final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
@@ -115,9 +110,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         identifier = client.getIdentifier();
         LOG.debug("Distributed data store client {} started", identifier);
 
-        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
-                .duration().toMillis() * READY_WAIT_FACTOR;
-
         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
         datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
@@ -133,8 +125,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
         this.client = null;
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
-                .duration().toMillis() * READY_WAIT_FACTOR;
     }
 
     @VisibleForTesting
@@ -143,8 +133,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
         this.client = clientActor;
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
-                .duration().toMillis() * READY_WAIT_FACTOR;
     }
 
     protected final DataStoreClient getClient() {
@@ -238,18 +226,32 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         return actorContext;
     }
 
+    // TODO: consider removing this in favor of awaitReadiness()
     public void waitTillReady() {
         LOG.info("Beginning to wait for data store to become ready : {}", identifier);
 
+        final Duration toWait = initialSettleTime();
         try {
-            if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
-                LOG.debug("Data store {} is now ready", identifier);
+            if (toWait.isFinite()) {
+                if (!waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS)) {
+                    LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
+                    return;
+                }
             } else {
-                LOG.error("Shard leaders failed to settle in {} seconds, giving up",
-                        TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+                waitTillReadyCountDownLatch.await();
             }
         } catch (InterruptedException e) {
             LOG.error("Interrupted while waiting for shards to settle", e);
+            return;
+        }
+
+        LOG.debug("Data store {} is now ready", identifier);
+    }
+
+    @Beta
+    public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
+        if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
+            throw new TimeoutException("Shard leaders failed to settle");
         }
     }
 
@@ -316,4 +318,9 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         return (ListenerRegistration<L>) proxy;
     }
 
+    private Duration initialSettleTime() {
+        final DatastoreContext context = actorContext.getDatastoreContext();
+        final int multiplier = context.getInitialSettleTimeoutMultiplier();
+        return multiplier == 0 ? Duration.Inf() : context.getShardLeaderElectionTimeout().duration().$times(multiplier);
+    }
 }