import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
- private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+ private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
private final String persistenceId;
private final AbstractDataStore dataStore;
final ActorRef shardActor = shardInformation.getActor();
if (shardActor != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
- FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
- .getElectionTimeOutInterval().$times(3);
- final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
- shardActorStoppingFutures.put(shardName, stopFuture);
- stopFuture.onComplete(new OnComplete<Boolean>() {
+ long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+
+ LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
+ timeoutInMS);
+
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
+ FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
+
+ final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
@Override
- public void onComplete(Throwable failure, Boolean result) {
+ public void onComplete(final Throwable failure, final Boolean result) {
if (failure == null) {
LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
} else {
LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
}
- self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
- ActorRef.noSender());
+ self().tell((RunnableMessage) () -> {
+ shardActorsStopping.remove(shardName);
+ notifyOnCompleteTasks(failure, result);
+ }, ActorRef.noSender());
}
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ };
+
+ shardActorsStopping.put(shardName, onComplete);
+ stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
+ .getDispatcher(Dispatchers.DispatcherType.Client));
}
LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
}
private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
- final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
- if (stopFuture == null) {
+ final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
+ if (stopOnComplete == null) {
return false;
}
- LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+ LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
shardName, messageToDefer);
final ActorRef sender = getSender();
- stopFuture.onComplete(new OnComplete<Boolean>() {
+ stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
@Override
public void onComplete(Throwable failure, Boolean result) {
LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
self().tell(messageToDefer, sender);
}
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ });
return true;
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.OnComplete;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An OnComplete implementation that aggrgates other OnComplete tasks.
+ *
+ * @author Thomas Pantelis
+ *
+ * @param <T> the result type
+ */
+public abstract class CompositeOnComplete<T> extends OnComplete<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeOnComplete.class);
+
+ private final List<OnComplete<T>> onCompleteTasks = new ArrayList<>();
+
+ public void addOnComplete(OnComplete<T> task) {
+ onCompleteTasks.add(task);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ protected void notifyOnCompleteTasks(Throwable failure, T result) {
+ for (OnComplete<T> task: onCompleteTasks) {
+ try {
+ task.onComplete(failure, result);
+ } catch (Throwable e) {
+ LOG.error("Caught unexpected exception", e);
+ }
+ }
+
+ onCompleteTasks.clear();
+ }
+}