+ return this.state;
+ }
+
+ @Override
+ public ListenableFuture<Void> stopClusterServices() {
+ final ListenableFuture<Void> deactivateTxManagerFuture = initialized
+ ? transactionChainManager.deactivateTransactionManager()
+ : Futures.immediateFuture(null);
+
+ final boolean connectionInterrupted =
+ this.getPrimaryConnectionContext()
+ .getConnectionState()
+ .equals(ConnectionContext.CONNECTION_STATE.RIP);
+ if (!connectionInterrupted) {
+ LOG.info("This controller instance is now acting as a non-owner for node {}", deviceInfo.getLOGValue());
+ }
+
+ return deactivateTxManagerFuture;
+ }
+
+ @Override
+ public void cleanupDeviceData() {
+ myManager.removeDeviceFromOperationalDS(deviceInfo);
+ }
+
+ @Override
+ public ServiceGroupIdentifier getServiceIdentifier() {
+ return this.deviceInfo.getServiceIdentifier();
+ }
+
+ @Override
+ public DeviceInfo getDeviceInfo() {
+ return this.deviceInfo;
+ }
+
+ @Override
+ public void close() {
+ if (CONTEXT_STATE.TERMINATION.equals(getState())){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
+ }
+ } else {
+ this.state = CONTEXT_STATE.TERMINATION;
+ }
+ sendNodeRemovedNotification();
+ }
+
+ @Override
+ public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
+ if (initialized) {
+ this.transactionChainManager.setLifecycleService(lifecycleService);
+ }
+ }
+
+ @Override
+ public void replaceConnectionContext(final ConnectionContext connectionContext){
+ // Act like we are initializing the context
+ this.state = CONTEXT_STATE.INITIALIZATION;
+ this.primaryConnectionContext = connectionContext;
+ this.onPublished();
+ }
+
+ @Override
+ public boolean isSkipTableFeatures() {
+ return this.skipTableFeatures;
+ }
+
+ @Override
+ public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
+ this.salRoleService = salRoleService;
+ }
+
+ @Override
+ public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
+ this.clusterInitializationPhaseHandler = handler;
+ }
+
+ @Override
+ public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
+
+ if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
+ LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
+ return false;
+ }
+
+ LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
+
+ lazyTransactionManagerInitialization();
+
+ this.transactionChainManager.activateTransactionManager();
+
+ try {
+ DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor);
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e);
+ return false;
+ }
+
+ Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
+
+ return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
+ }
+
+ @VisibleForTesting
+ void lazyTransactionManagerInitialization() {
+ if (!this.initialized) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
+ }
+ this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
+ this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
+ this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
+ this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+ this.initialized = true;
+ }
+ }
+
+ @Nullable
+ @Override
+ public <T> RequestContext<T> createRequestContext() {
+ return new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
+ @Override
+ public void close() {
+ }
+ };
+
+ }
+
+ ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
+ }
+
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+
+ if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
+ final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
+
+ setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
+
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
+ setRoleOutputFuture.cancel(true);
+ }
+ };
+
+ hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
+ return Futures.immediateFuture(null);
+ }
+
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
+ return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ }
+
+ private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
+ }
+ sendNodeAddedNotification();
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
+ shutdownConnection();
+ }