import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
import io.netty.util.HashedWheelTimer;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
private static final long TICK_DURATION = 10; // 0.5 sec.
private final long globalNotificationQuota;
private final boolean switchFeaturesMandatory;
+
private ScheduledThreadPoolExecutor spyPool;
private final int spyRate = 10;
private final ConcurrentMap<NodeId, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
private final MessageIntelligenceAgency messageIntelligenceAgency;
- private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500);
- private final int maxQueueDepth = 25600;
+ private final long barrierIntervalNanos;
+ private final int barrierCountLimit;
private ExtensionConverterProvider extensionConverterProvider;
public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
@Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
- final long globalNotificationQuota, final boolean switchFeaturesMandatory) {
+ final long globalNotificationQuota, final boolean switchFeaturesMandatory,
+ final long barrierInterval, final int barrierCountLimit) {
this.switchFeaturesMandatory = switchFeaturesMandatory;
this.globalNotificationQuota = globalNotificationQuota;
this.dataBroker = Preconditions.checkNotNull(dataBroker);
}
this.messageIntelligenceAgency = messageIntelligenceAgency;
+ this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
+ this.barrierCountLimit = barrierCountLimit;
}
}
@Override
- public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+ public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
// final phase - we have to add new Device to MD-SAL DataStore
LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
Preconditions.checkNotNull(deviceContext);
- try {
- ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
- deviceContext.onPublished();
-
- } catch (final Exception e) {
- LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage());
- LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
- try {
- deviceContext.close();
- } catch (final Exception e1) {
- LOG.warn("Exception on device context close. ", e);
- }
- }
-
+ ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
+ deviceContext.onPublished();
}
@Override
connectionContext.setOutboundQueueProvider(outboundQueueProvider);
final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
- connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+ connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
final DeviceState deviceState = createDeviceState(connectionContext);
}
@Override
- public void close() throws Exception {
- for (final DeviceContext deviceContext : deviceContexts.values()) {
- deviceContext.close();
+ public void close() {
+ for (final Iterator<Entry<NodeId, DeviceContext>> iterator = Iterators
+ .consumingIterator(deviceContexts.entrySet().iterator()); iterator.hasNext();) {
+ iterator.next().getValue().close();
+ }
+
+ if (spyPool != null) {
+ spyPool.shutdownNow();
+ spyPool = null;
}
}