import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
private final Collection<SwitchConnectionProvider> switchConnectionProviders;
private final DeviceInitializerProvider deviceInitializerProvider;
private final ConvertorManager convertorManager;
- private final ContextChainHolder contextChainHolder;
private final RpcProviderRegistry rpcProviderRegistry;
private final ClusterSingletonServiceProvider singletonServicesProvider;
+ private final EntityOwnershipService entityOwnershipService;
+ private ContextChainHolder contextChainHolder;
private int rpcRequestsQuota;
private long globalNotificationQuota;
private long barrierInterval;
this.rpcProviderRegistry = rpcProviderRegistry;
this.notificationPublishService = notificationPublishService;
this.singletonServicesProvider = singletonServiceProvider;
+ this.entityOwnershipService = entityOwnershipService;
convertorManager = ConvertorManagerFactory.createDefaultManager();
- contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer);
- contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
extensionConverterManager = new ExtensionConverterManagerImpl();
deviceInitializerProvider = DeviceInitializerProviderFactory.createDefaultProvider();
}
});
}
- private void shutdownSwitchConnections() {
- Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
+ private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
+ final ListenableFuture<List<Boolean>> listListenableFuture = Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
// Revert deserializers to their original state
if (useSingleLayerSerialization) {
DeserializerInjector.revertDeserializers(switchConnectionProvider);
// Shutdown switch connection provider
return switchConnectionProvider.shutdown();
- }).collect(Collectors.toSet())), new FutureCallback<List<Boolean>>() {
+ }).collect(Collectors.toSet()));
+
+ Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
@Override
public void onSuccess(final List<Boolean> result) {
LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
}
});
+
+ return listListenableFuture;
}
@Override
Preconditions.checkNotNull(threadPoolTimeout),
TimeUnit.SECONDS, new SynchronousQueue<>(), POOL_NAME);
+
+ contextChainHolder = new ContextChainHolderImpl(hashedWheelTimer, threadPool);
+ contextChainHolder.changeEntityOwnershipService(entityOwnershipService);
+
connectionManager = new ConnectionManagerImpl(threadPool);
connectionManager.setEchoReplyTimeout(echoReplyTimeout);
}
@Override
- public void close() throws Exception {
+ public void close() {
initialized = false;
+
+ try {
+ shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.warn("Failed to shut down switch connections in time {}s, error: {}", 10, e);
+ }
+
gracefulShutdown(contextChainHolder);
gracefulShutdown(deviceManager);
gracefulShutdown(rpcManager);
gracefulShutdown(statisticsManager);
gracefulShutdown(threadPool);
gracefulShutdown(hashedWheelTimer);
- shutdownSwitchConnections();
unregisterMXBean(MESSAGE_INTELLIGENCE_AGENCY_MX_BEAN_NAME);
}
}
try {
- threadPoolExecutor.shutdown();
+ threadPoolExecutor.shutdownNow();
} catch (Exception e) {
LOG.warn("Failed to shutdown {} gracefully.", threadPoolExecutor);
}