From: Shigeru Yasuda Date: Fri, 24 Jun 2016 13:10:07 +0000 (+0900) Subject: Bug 6110: Fixed bugs in statistics manager due to race condition. X-Git-Tag: release/boron-sr3~6^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=5ea63885f11c0b0a9af7f83ffa03f62ebc9c1b4f;p=openflowplugin.git Bug 6110: Fixed bugs in statistics manager due to race condition. * Stats notification listener needs to wait for the XID to be cached. * Enqueue DS operation after all notifications are received. Change-Id: I42ac315a65be1a1f02152fbd9ea9510bee586eb3 Signed-off-by: Shigeru Yasuda --- diff --git a/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/StatRpcMsgManager.java b/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/StatRpcMsgManager.java index d5532a0e33..3ec705086e 100644 --- a/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/StatRpcMsgManager.java +++ b/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/StatRpcMsgManager.java @@ -9,7 +9,6 @@ package org.opendaylight.openflowplugin.applications.statistics.manager; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware; @@ -38,10 +37,7 @@ import com.google.common.util.concurrent.SettableFuture; * * Created: Aug 29, 2014 */ -public interface StatRpcMsgManager extends Runnable, AutoCloseable { - - interface RpcJobsQueue extends Callable {} - +public interface StatRpcMsgManager { /** * Transaction container is definition for Multipart transaction * join container for all Multipart msg with same TransactionId diff --git a/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatRpcMsgManagerImpl.java b/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatRpcMsgManagerImpl.java index 4c3f573ccc..0515ca1d1d 100644 --- a/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatRpcMsgManagerImpl.java +++ b/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatRpcMsgManagerImpl.java @@ -11,10 +11,8 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl; import java.math.BigInteger; import java.util.Arrays; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; @@ -88,8 +86,19 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final Cache> txCache; + /** + * Cache for futures to be returned by + * {@link #isExpectedStatistics(TransactionId, NodeId)}. + */ + private final Cache> txFutureCache; + + /** + * The number of seconds to wait for transaction container to be put into + * {@link #txCache}. + */ + private static final long TXCACHE_WAIT_TIMEOUT = 10L; + private static final int MAX_CACHE_SIZE = 10000; - private static final int QUEUE_CAPACITY = 5000; private static final String MSG_TRANS_ID_NOT_NULL = "TransactionId can not be null!"; private static final String MSG_NODE_ID_NOT_NULL = "NodeId can not be null!"; @@ -107,10 +116,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { private final OpendaylightFlowTableStatisticsService flowTableStatsService; private final OpendaylightQueueStatisticsService queueStatsService; - private BlockingQueue statsRpcJobQueue; - - private volatile boolean finishing = false; - public StatRpcMsgManagerImpl (final StatisticsManager manager, final RpcConsumerRegistry rpcRegistry, final long maxNodeForCollector) { Preconditions.checkArgument(manager != null, "StatisticManager can not be null!"); @@ -134,46 +139,11 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class), "OpendaylightQueueStatisticsService can not be null!"); - statsRpcJobQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); txCache = CacheBuilder.newBuilder().expireAfterWrite((maxNodeForCollector * POSSIBLE_STAT_WAIT_FOR_NOTIFICATION), TimeUnit.SECONDS) .maximumSize(MAX_CACHE_SIZE).build(); - } - - @Override - public void close() { - finishing = true; - statsRpcJobQueue = null; - } - - @Override - public void run() { - /* Neverending cyle - wait for finishing */ - while ( ! finishing) { - try { - statsRpcJobQueue.take().call(); - } - catch (final Exception e) { - LOG.warn("Stat Element RPC executor fail!", e); - } - } - // Drain all rpcCall, making sure any blocked threads are unblocked - while ( ! statsRpcJobQueue.isEmpty()) { - statsRpcJobQueue.poll(); - } - } - - private void addGetAllStatJob(final RpcJobsQueue getAllStatJob) { - final boolean success = statsRpcJobQueue.offer(getAllStatJob); - if ( ! success) { - LOG.warn("Put RPC request getAllStat fail! Queue is full."); - } - } - - private void addStatJob(final RpcJobsQueue getStatJob) { - final boolean success = statsRpcJobQueue.offer(getStatJob); - if ( ! success) { - LOG.debug("Put RPC request for getStat fail! Queue is full."); - } + txFutureCache = CacheBuilder.newBuilder(). + expireAfterWrite(TXCACHE_WAIT_TIMEOUT, TimeUnit.SECONDS). + maximumSize(MAX_CACHE_SIZE).build(); } @Override @@ -201,7 +171,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { final String cacheKey = buildCacheKey(id, nodeKey.getId()); final TransactionCacheContainer container = new TransactionCacheContainerImpl<>(id, inputObj, nodeKey.getId()); - txCache.put(cacheKey, container); + putTransaction(cacheKey, container); } } @@ -227,30 +197,61 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { return String.valueOf(id.getValue()) + "-" + nodeId.getValue(); } + /** + * Put the given statistics transaction container into the cache. + * + * @param key Key that specifies the given transaction container. + * @param container Transaction container. + */ + private synchronized void putTransaction( + String key, TransactionCacheContainer container) { + txCache.put(key, container); + + SettableFuture future = txFutureCache.asMap().remove(key); + if (future != null) { + // Wake up a thread waiting for this transaction container. + future.set(true); + } + } + + /** + * Check to see if the specified transaction container is cached in + * {@link #txCache}. + * + * @param key Key that specifies the transaction container. + * @return A future that will contain the result. + */ + private synchronized Future isExpectedStatistics(String key) { + Future future; + TransactionCacheContainer container = txCache.getIfPresent(key); + if (container == null) { + // Wait for the transaction container to be put into the cache. + SettableFuture f = SettableFuture.create(); + SettableFuture current = + txFutureCache.asMap().putIfAbsent(key, f); + future = (current == null) ? f : current; + } else { + future = Futures.immediateFuture(Boolean.TRUE); + } + + return future; + } + @Override public Future>> getTransactionCacheContainer( final TransactionId id, final NodeId nodeId) { Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL); Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL); - final String key = buildCacheKey(id, nodeId); - final SettableFuture>> result = SettableFuture.create(); - - final RpcJobsQueue getTransactionCacheContainer = new RpcJobsQueue() { + String key = buildCacheKey(id, nodeId); + Optional> resultContainer = + Optional.> fromNullable( + txCache.asMap().remove(key)); + if (!resultContainer.isPresent()) { + LOG.warn("Transaction cache not found: {}", key); + } - @Override - public Void call() throws Exception { - final Optional> resultContainer = - Optional.> fromNullable(txCache.getIfPresent(key)); - if (resultContainer.isPresent()) { - txCache.invalidate(key); - } - result.set(resultContainer); - return null; - } - }; - addStatJob(getTransactionCacheContainer); - return result; + return Futures.immediateFuture(resultContainer); } @Override @@ -258,21 +259,8 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { Preconditions.checkArgument(id != null, MSG_TRANS_ID_NOT_NULL); Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL); - final String key = buildCacheKey(id, nodeId); - final SettableFuture checkStatId = SettableFuture.create(); - - final RpcJobsQueue isExpecedStatistics = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final Optional> result = - Optional.> fromNullable(txCache.getIfPresent(key)); - checkStatId.set(Boolean.valueOf(result.isPresent())); - return null; - } - }; - addStatJob(isExpecedStatistics); - return checkStatId; + String key = buildCacheKey(id, nodeId); + return isExpectedStatistics(key); } @Override @@ -280,79 +268,54 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { Preconditions.checkArgument(notification != null, "TransactionAware can not be null!"); Preconditions.checkArgument(nodeId != null, MSG_NODE_ID_NOT_NULL); - final RpcJobsQueue addNotification = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final TransactionId txId = notification.getTransactionId(); - final String key = buildCacheKey(txId, nodeId); - final TransactionCacheContainer container = (txCache.getIfPresent(key)); - if (container != null) { - container.addNotif(notification); - } - return null; - } - }; - addStatJob(addNotification); + TransactionId txId = notification.getTransactionId(); + String key = buildCacheKey(txId, nodeId); + TransactionCacheContainer container = + txCache.getIfPresent(key); + if (container != null) { + container.addNotif(notification); + } else { + LOG.warn("Unable to add notification: {}, {}", key, + notification.getImplementedInterface()); + } } @Override public Future getAllGroupsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllGroupStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAllGroupStatisticsInputBuilder builder = - new GetAllGroupStatisticsInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService - .getAllGroupStatistics(builder.build()), null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(getAllGroupStat); + SettableFuture result = SettableFuture.create(); + GetAllGroupStatisticsInputBuilder builder = + new GetAllGroupStatisticsInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + groupStatsService.getAllGroupStatistics(builder.build()), null, + nodeRef, result); return result; } @Override public Future getAllMetersStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllMeterStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAllMeterStatisticsInputBuilder builder = - new GetAllMeterStatisticsInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService - .getAllMeterStatistics(builder.build()), null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(getAllMeterStat); + SettableFuture result = SettableFuture.create(); + GetAllMeterStatisticsInputBuilder builder = + new GetAllMeterStatisticsInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + meterStatsService.getAllMeterStatistics(builder.build()), null, + nodeRef, result); return result; } @Override public Future getAllFlowsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllFlowStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder = - new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(flowStatsService - .getAllFlowsStatisticsFromAllFlowTables(builder.build()), null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(getAllFlowStat); + SettableFuture result = SettableFuture.create(); + GetAllFlowsStatisticsFromAllFlowTablesInputBuilder builder = + new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + flowStatsService.getAllFlowsStatisticsFromAllFlowTables(builder.build()), + null, nodeRef, result); return result; } @@ -360,159 +323,100 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); Preconditions.checkArgument(tableId != null, "TableId can not be null!"); - final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = - new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); - builder.setNode(nodeRef); - builder.setTableId(tableId); - - final TableBuilder tbuilder = new TableBuilder(); - tbuilder.setId(tableId.getValue()); - tbuilder.setKey(new TableKey(tableId.getValue())); - registrationRpcFutureCallBack(flowStatsService - .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef, null); - return null; - } - }; - addGetAllStatJob(getAggregateFlowStat); + GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder = + new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); + builder.setNode(nodeRef).setTableId(tableId); + + TableBuilder tbuilder = new TableBuilder(). + setId(tableId.getValue()). + setKey(new TableKey(tableId.getValue())); + registrationRpcFutureCallBack( + flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), + tbuilder.build(), nodeRef, null); } @Override public Future getAllPortsStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllPortsStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAllNodeConnectorsStatisticsInputBuilder builder = - new GetAllNodeConnectorsStatisticsInputBuilder(); - builder.setNode(nodeRef); - final Future> rpc = - portStatsService.getAllNodeConnectorsStatistics(builder.build()); - registrationRpcFutureCallBack(rpc, null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(getAllPortsStat); + SettableFuture result = SettableFuture.create(); + GetAllNodeConnectorsStatisticsInputBuilder builder = + new GetAllNodeConnectorsStatisticsInputBuilder(); + builder.setNode(nodeRef); + Future> rpc = + portStatsService.getAllNodeConnectorsStatistics(builder.build()); + registrationRpcFutureCallBack(rpc, null, nodeRef, result); return result; } @Override public Future getAllTablesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllTableStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetFlowTablesStatisticsInputBuilder builder = - new GetFlowTablesStatisticsInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(flowTableStatsService - .getFlowTablesStatistics(builder.build()), null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(getAllTableStat); + SettableFuture result = SettableFuture.create(); + GetFlowTablesStatisticsInputBuilder builder = + new GetFlowTablesStatisticsInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + flowTableStatsService.getFlowTablesStatistics(builder.build()), + null, nodeRef, result); return result; } @Override public Future getAllQueueStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllQueueStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAllQueuesStatisticsFromAllPortsInputBuilder builder = - new GetAllQueuesStatisticsFromAllPortsInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(queueStatsService - .getAllQueuesStatisticsFromAllPorts(builder.build()), null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(getAllQueueStat); + SettableFuture result = SettableFuture.create(); + GetAllQueuesStatisticsFromAllPortsInputBuilder builder = + new GetAllQueuesStatisticsFromAllPortsInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + queueStatsService.getAllQueuesStatisticsFromAllPorts(builder.build()), + null, nodeRef, result); return result; } @Override public Future getAllMeterConfigStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue qetAllMeterConfStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetAllMeterConfigStatisticsInputBuilder builder = - new GetAllMeterConfigStatisticsInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService - .getAllMeterConfigStatistics(builder.build()), null, nodeRef, result); - return null; - } - }; - addGetAllStatJob(qetAllMeterConfStat); + SettableFuture result = SettableFuture.create(); + GetAllMeterConfigStatisticsInputBuilder builder = + new GetAllMeterConfigStatisticsInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + meterStatsService.getAllMeterConfigStatistics(builder.build()), + null, nodeRef, result); return result; } @Override public void getGroupFeaturesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final RpcJobsQueue getGroupFeaturesStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - /* RPC input */ - final GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(); - input.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService.getGroupFeatures(input.build()), null, nodeRef, null); - return null; - } - }; - addStatJob(getGroupFeaturesStat); + GetGroupFeaturesInputBuilder input = new GetGroupFeaturesInputBuilder(). + setNode(nodeRef); + registrationRpcFutureCallBack( + groupStatsService.getGroupFeatures(input.build()), null, nodeRef, + null); } @Override public void getMeterFeaturesStat(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final RpcJobsQueue getMeterFeaturesStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - /* RPC input */ - final GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(); - input.setNode(nodeRef); - registrationRpcFutureCallBack(meterStatsService.getMeterFeatures(input.build()), null, nodeRef, null); - return null; - } - }; - addStatJob(getMeterFeaturesStat); + GetMeterFeaturesInputBuilder input = new GetMeterFeaturesInputBuilder(). + setNode(nodeRef); + registrationRpcFutureCallBack( + meterStatsService.getMeterFeatures(input.build()), null, nodeRef, + null); } @Override public Future getAllGroupsConfStats(final NodeRef nodeRef) { Preconditions.checkArgument(nodeRef != null, MSG_NODE_REF_NOT_NULL); - final SettableFuture result = SettableFuture.create(); - final RpcJobsQueue getAllGropConfStat = new RpcJobsQueue() { - - @Override - public Void call() throws Exception { - final GetGroupDescriptionInputBuilder builder = - new GetGroupDescriptionInputBuilder(); - builder.setNode(nodeRef); - registrationRpcFutureCallBack(groupStatsService - .getGroupDescription(builder.build()), null, nodeRef, result); - - return null; - } - }; - addGetAllStatJob(getAllGropConfStat); + SettableFuture result = SettableFuture.create(); + GetGroupDescriptionInputBuilder builder = + new GetGroupDescriptionInputBuilder(); + builder.setNode(nodeRef); + registrationRpcFutureCallBack( + groupStatsService.getGroupDescription(builder.build()), null, + nodeRef, result); return result; } @@ -556,4 +460,3 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager { } } } - diff --git a/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatisticsManagerImpl.java b/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatisticsManagerImpl.java index afca1d2ed5..3bd0cc2a65 100644 --- a/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatisticsManagerImpl.java +++ b/applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatisticsManagerImpl.java @@ -80,7 +80,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { private final DataBroker dataBroker; - private final ExecutorService statRpcMsgManagerExecutor; private final ExecutorService statDataStoreOperationServ; private EntityOwnershipService ownershipService; private StatRpcMsgManager rpcMsgManager; @@ -104,7 +103,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!"); ThreadFactory threadFact; threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build(); - statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact); threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build(); statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact); txChain = dataBroker.createTransactionChain(this); @@ -124,7 +122,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator); queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator); - statRpcMsgManagerExecutor.execute(rpcMsgManager); statDataStoreOperationServ.execute(this); LOG.info("Statistics Manager started successfully!"); } @@ -153,8 +150,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { } statCollectors = null; } - rpcMsgManager = close(rpcMsgManager); - statRpcMsgManagerExecutor.shutdown(); + rpcMsgManager = null; statDataStoreOperationServ.shutdown(); txChain = close(txChain); }