+ if (ContextChainState.CLOSED.equals(contextChainState.get())) {
+ LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
+ return;
+ }
+
+ contextChainState.set(ContextChainState.CLOSED);
+ unMasterMe();
+
+ // Close all connections to devices
+ auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
+ auxiliaryConnections.clear();
+
+ // If we are still registered and we are not already closing, then close the registration
+ if (Objects.nonNull(registration)) {
+ try {
+ registration.close();
+ registration = null;
+ LOG.info("Closed clustering services registration for node {}", deviceInfo);
+ } catch (final Exception e) {
+ LOG.warn("Failed to close clustering services registration for node {} with exception: ",
+ deviceInfo, e);
+ }
+ }
+
+
+ // Close all contexts (device, statistics, rpc)
+ contexts.forEach(OFPContext::close);
+ contexts.clear();
+
+ // We are closing, so cleanup all managers now
+ deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
+ deviceRemovedHandlers.clear();
+
+ primaryConnection.closeConnection(false);
+
+ }
+
+ @Override
+ public void makeContextChainStateSlave() {
+ unMasterMe();
+ changeMastershipState(ContextChainState.WORKING_SLAVE);
+ }
+
+ @Override
+ public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
+ registration = Objects.requireNonNull(clusterSingletonServiceProvider
+ .registerClusterSingletonService(this));
+ LOG.debug("Registered clustering services for node {}", deviceInfo);
+ }
+
+ @Override
+ public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState,
+ boolean inReconciliationFrameworkStep) {
+ switch (mastershipState) {
+ case INITIAL_SUBMIT:
+ LOG.debug("Device {}, initial submit OK.", deviceInfo);
+ this.initialSubmitting.set(true);
+ break;
+ case MASTER_ON_DEVICE:
+ LOG.debug("Device {}, master state OK.", deviceInfo);
+ this.masterStateOnDevice.set(true);
+ break;
+ case INITIAL_GATHERING:
+ LOG.debug("Device {}, initial gathering OK.", deviceInfo);
+ this.initialGathering.set(true);
+ break;
+ case RPC_REGISTRATION:
+ LOG.debug("Device {}, RPC registration OK.", deviceInfo);
+ this.rpcRegistration.set(true);
+ break;
+ case INITIAL_FLOW_REGISTRY_FILL:
+ // Flow registry fill is not mandatory to work as a master
+ LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
+ this.registryFilling.set(true);
+ break;
+ case CHECK:
+ // no operation
+ break;
+ default:
+ // no operation
+ break;
+ }
+
+ final boolean result = initialGathering.get() &&
+ masterStateOnDevice.get() &&
+ rpcRegistration.get() &&
+ inReconciliationFrameworkStep || initialSubmitting.get();
+
+ if (!inReconciliationFrameworkStep &&
+ result &&
+ mastershipState != ContextChainMastershipState.CHECK) {
+ LOG.info("Device {} is able to work as master{}",
+ deviceInfo,
+ registryFilling.get() ? "." : " WITHOUT flow registry !!!");
+ changeMastershipState(ContextChainState.WORKING_MASTER);
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean isClosing() {
+ return ContextChainState.CLOSED.equals(contextChainState.get());
+ }
+
+ @Override
+ public void continueInitializationAfterReconciliation() {
+ contexts.forEach(context -> {
+ if (context.map(ReconciliationFrameworkStep.class::isInstance)) {
+ context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation();
+ }
+ });
+ }
+
+ @Override
+ public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) {
+ return (connectionContext.getFeatures().getAuxiliaryId() != 0)
+ && (!ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState()))
+ && auxiliaryConnections.add(connectionContext);
+ }
+
+ @Override
+ public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) {
+ return auxiliaryConnections.remove(connectionContext);
+ }
+
+ @Override
+ public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
+ deviceRemovedHandlers.add(deviceRemovedHandler);
+ }
+
+ private void changeMastershipState(final ContextChainState contextChainState) {
+ if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
+ return;
+ }
+
+ boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
+ this.contextChainState.set(contextChainState);
+
+ if (propagate) {
+ contexts.forEach(context -> {
+ if (context.map(ContextChainStateListener.class::isInstance)) {
+ context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState);
+ }
+ });
+ }
+ }