import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.HashedWheelTimer;
import java.util.Collections;
import java.util.HashMap;
private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
private final EntityOwnershipListenerRegistration eosListenerRegistration;
- private final HashedWheelTimer timer;
private final ClusterSingletonServiceProvider singletonServiceProvider;
private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
private final ExecutorService executorService;
final ClusterSingletonServiceProvider singletonServiceProvider,
final EntityOwnershipService entityOwnershipService,
final OwnershipChangeListener ownershipChangeListener) {
- this.timer = timer;
this.singletonServiceProvider = singletonServiceProvider;
this.executorService = executorService;
this.ownershipChangeListener = ownershipChangeListener;
}
@VisibleForTesting
- ContextChain createContextChain(final ConnectionContext connectionContext) {
+ void createContextChain(final ConnectionContext connectionContext) {
final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
CHECK_ROLE_MASTER_TIMEOUT / 1000L);
contextChain.registerServices(singletonServiceProvider);
- return contextChain;
}
@Override
}
LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
- final ContextChain newContextChain = createContextChain(connectionContext);
- LOG.debug("Successfully created context chain with identifier: {}", newContextChain.getIdentifier());
+ createContextChain(connectionContext);
return ConnectionStatus.MAY_CONTINUE;
}
} else {
contextChain.isMastered(mastershipState);
if (contextChain.isPrepared()) {
- ownershipChangeListener.becomeMasterBeforeSubmittedDS(
- deviceInfo,
- reconciliationFrameworkCallback(deviceInfo, contextChain));
+ Futures.addCallback(
+ ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
+ reconciliationFrameworkCallback(deviceInfo, contextChain),
+ MoreExecutors.directExecutor());
}
}
} else if (contextChain.isMastered(mastershipState)) {
@Override
public void close() throws Exception {
scheduler.close();
- contextChainMap.keySet().forEach(this::destroyContextChain);
+ Map<DeviceInfo, ContextChain> copyOfChains = new HashMap<>(contextChainMap);
+ copyOfChains.keySet().forEach(this::destroyContextChain);
+ copyOfChains.clear();
contextChainMap.clear();
eosListenerRegistration.close();
}
deviceManager
.removeDeviceFromOperationalDS(nodeInstanceIdentifier)
.checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
- } catch (TimeoutException | TransactionCommitFailedException e) {
+ } catch (TimeoutException | TransactionCommitFailedException | NullPointerException e) {
LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
nodeId);
}
private synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
scheduler.remove(deviceInfo);
- ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
if (ResultState.DONOTHING == result) {
LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo);
if (!contextChain.continueInitializationAfterReconciliation()) {
+ LOG.warn("Initialization submit after reconciliation failed for device {}", deviceInfo);
destroyContextChain(deviceInfo);
} else {
ownershipChangeListener.becomeMaster(deviceInfo);
+ deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
}
} else {
- LOG.warn("Reconciliation framework failure.");
+ LOG.warn("Reconciliation framework failure for device {}", deviceInfo);
destroyContextChain(deviceInfo);
}
}