/**
* About to stop services in cluster not master anymore or going down
* @return Future most of services need time to be closed
+ * @param deviceDisconnected
*/
- default ListenableFuture<Void> stopClusterServices(){
+ default ListenableFuture<Void> stopClusterServices(final boolean deviceDisconnected){
return Futures.immediateFailedFuture(new RejectedExecutionException("Cannot stop abstract services, check implementation of cluster services"));
}
*/
void setDeviceDisconnectedHandler(DeviceDisconnectedHandler deviceDisconnectedHandler);
+ String getSafeNodeIdForLOG();
+
void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration);
/**
/**
* Create and return basic device info
- * @return
+ * @return created device info
*/
DeviceInfo getDeviceInfo();
@Override
public void onConnectionClosed() {
+
+ connectionState = ConnectionContext.CONNECTION_STATE.RIP;
+
if (null == nodeId){
SessionStatistics.countEvent(connectionAdapter.getRemoteAddress().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_DEVICE);
} else {
SessionStatistics.countEvent(nodeId.toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_DEVICE);
}
- connectionState = ConnectionContext.CONNECTION_STATE.RIP;
final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
final Short auxiliaryId;
* This method returns safe nodeId for logging
* @return string value od nodeId or string "null"
*/
- private String getSafeNodeIdForLOG() {
+ @Override
+ public String getSafeNodeIdForLOG() {
return null == nodeId ? "null" : nodeId.getValue();
}
@Override
public void onDisconnectEvent(final DisconnectEvent notification) {
LOG.info("ConnectionEvent: Connection closed by device, Device:{}, NodeId:{}",
- connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId());
+ connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getSafeNodeIdForLOG());
connectionContext.onConnectionClosed();
}
if (shouldBeDisconnected) {
if (LOG.isInfoEnabled()) {
LOG.info("ConnectionEvent:Closing connection as device is idle. Echo sent at {}. Device:{}, NodeId:{}",
- new Date(System.currentTimeMillis() - echoReplyTimeout), remoteAddress, connectionContext.getNodeId());
+ new Date(System.currentTimeMillis() - echoReplyTimeout), remoteAddress, connectionContext.getSafeNodeIdForLOG());
}
connectionContext.closeConnection(true);
}
@Override
- public ListenableFuture<Void> stopClusterServices() {
+ public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
return this.transactionChainManager.deactivateTransactionManager();
}
@Override
public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
- deviceContexts.remove(deviceInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
- }
-
LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
}
}
+
+ deviceContexts.remove(deviceInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
+ }
+
}
@Override
}
ListenableFuture<Void> shuttingDown() {
- LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII);
+ LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII.getKey().getId().getValue());
ListenableFuture<Void> future;
synchronized (txLock) {
this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
- LOG.debug("===============================================");
+ LOG.debug("========== Starting clustering MASTER services for node {} ==========", this.deviceContext.getDeviceInfo().getLOGValue());
}
if (connectionInterrupted()) {
@Override
public ListenableFuture<Void> closeServiceInstance() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
- LOG.debug("===============================================");
+ LOG.debug("========== Stopping clustering MASTER services for node {} ==========", this.deviceContext.getDeviceInfo().getLOGValue());
}
+ final boolean connectionInterrupted =
+ this.deviceContext
+ .getPrimaryConnectionContext()
+ .getConnectionState()
+ .equals(ConnectionContext.CONNECTION_STATE.RIP);
+
LOG.info("Stopping role context cluster services for node {}", getIdentifier());
- roleContext.stopClusterServices();
+ roleContext.stopClusterServices(connectionInterrupted);
LOG.info("Stopping statistics context cluster services for node {}", getIdentifier());
- statContext.stopClusterServices();
+ statContext.stopClusterServices(connectionInterrupted);
LOG.info("Stopping rpc context cluster services for node {}", getIdentifier());
- rpcContext.stopClusterServices();
+ rpcContext.stopClusterServices(connectionInterrupted);
LOG.info("Stopping device context cluster services for node {}", getIdentifier());
- return deviceContext.stopClusterServices();
+ return deviceContext.stopClusterServices(connectionInterrupted);
}
@Override
@Override
public ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
- LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier);
+ LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier.getKey().getId().getValue());
// Prepare path for read transaction
// TODO: Read only Tables, and not entire FlowCapableNode (fix Yang model)
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
private final DeviceInfo deviceInfo;
private CONTEXT_STATE state;
private final RoleManager myManager;
- private final LifecycleService lifecycleService;
RoleContextImpl(final DeviceInfo deviceInfo,
final HashedWheelTimer hashedWheelTimer,
- final RoleManager myManager,
- final LifecycleService lifecycleService) {
+ final RoleManager myManager) {
this.deviceInfo = deviceInfo;
- this.lifecycleService = lifecycleService;
state = CONTEXT_STATE.WORKING;
this.myManager = myManager;
this.hashedWheelTimer = hashedWheelTimer;
@Override
public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getNodeId().getValue());
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
}
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getNodeId().getValue());
- lifecycleService.closeConnection();
+ LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
}
});
}
@Override
- public ListenableFuture<Void> stopClusterServices() {
- ListenableFuture<Void> future = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- return null;
- }
- });
-
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable Void aVoid) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getNodeId().getValue());
- }
- myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+ public ListenableFuture<Void> stopClusterServices(final boolean deviceDisconnected) {
+
+ if (!deviceDisconnected) {
+ ListenableFuture<Void> future = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ return null;
+ }
+ });
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void aVoid) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
}
+ }
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getNodeId().getValue());
- LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
- myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
+ LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
+ myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
- }
- });
- return future;
+ }
+ });
+ return future;
+ } else {
+ return myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+ }
}
@Override
setRoleOutputFuture = getSalRoleService().setRole(setRoleInput);
final TimerTask timerTask = timeout -> {
if (!setRoleOutputFuture.isDone()) {
- LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, deviceInfo.getNodeId());
+ LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, deviceInfo.getLOGValue());
setRoleOutputFuture.cancel(true);
}
};
@Override
public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
final DeviceContext deviceContext = Preconditions.checkNotNull(lifecycleService.getDeviceContext());
- final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this, lifecycleService);
+ final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this);
roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getLOGValue());
Futures.addCallback(roleContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
@Override
public void onFailure(Throwable throwable) {
LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
- lifecycleService.closeConnection();
}
});
lifecycleService.setRoleContext(roleContext);
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.XidSequencer;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
final RoutedRpcRegistration<S> routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance);
routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier);
rpcRegistrations.put(serviceClass, routedRpcReg);
- LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier);
+ LOG.debug("Registration of service {} for device {}.", serviceClass.getSimpleName(), nodeInstanceIdentifier.getKey().getId().getValue());
}
}
rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
rpcRegistration.close();
if (LOG.isDebugEnabled()) {
- LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
- nodeInstanceIdentifier);
+ LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType().getSimpleName(),
+ nodeInstanceIdentifier.getKey().getId().getValue());
}
}
}
LOG.trace("Device queue {} at capacity", this);
return null;
} else {
- LOG.trace("Acquired semaphore for {}, available permits:{} ", nodeInstanceIdentifier.getKey().getId(), tracker.availablePermits());
+ LOG.trace("Acquired semaphore for {}, available permits:{} ", nodeInstanceIdentifier.getKey().getId().getValue(), tracker.availablePermits());
}
final Long xid = deviceInfo.reserveXidForDeviceMessage();
if (xid == null) {
- LOG.warn("Xid cannot be reserved for new RequestContext, node:{}", nodeInstanceIdentifier.getKey().getId());
+ LOG.warn("Xid cannot be reserved for new RequestContext, node:{}", nodeInstanceIdentifier.getKey().getId().getValue());
tracker.release();
return null;
}
if (rpcRegistration != null) {
rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
rpcRegistration.close();
- LOG.debug("Unregistration serviceClass {} for Node {}", serviceClass, nodeInstanceIdentifier.getKey().getId());
+ LOG.debug("Un-registration serviceClass {} for Node {}", serviceClass.getSimpleName(), nodeInstanceIdentifier.getKey().getId().getValue());
}
}
}
@Override
- public ListenableFuture<Void> stopClusterServices() {
+ public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
MdSalRegistrationUtils.unregisterServices(this);
return Futures.immediateFuture(null);
}
currentRoleGuard.acquire();
LOG.trace("currentRole lock queue length: {} " + currentRoleGuard.getQueueLength());
} catch (final InterruptedException e) {
- LOG.error("Unexpected exception {} for acquire semaphor for input {}", e, input);
+ LOG.error("Unexpected exception {} for acquire semaphore for input {}", e, input);
return RpcResultBuilder.<SetRoleOutput> failed().buildFuture();
}
// compare with last known role and set if different. If they are same, then return.
}
@Override
- public ListenableFuture<Void> stopClusterServices() {
+ public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
myManager.stopScheduling(deviceInfo);
return Futures.immediateFuture(null);
}
verifyCommonInvocationsSubSet();
Mockito.verify(connectionContext).onConnectionClosed();
Mockito.verify(connectionContext).getConnectionAdapter();
- Mockito.verify(connectionContext).getNodeId();
+ Mockito.verify(connectionContext, Mockito.atLeastOnce()).getSafeNodeIdForLOG();
}
/**
verifyCommonInvocationsSubSet();
Mockito.verify(connectionContext).onConnectionClosed();
Mockito.verify(connectionContext).getConnectionAdapter();
- Mockito.verify(connectionContext).getNodeId();
+ Mockito.verify(connectionContext, Mockito.atLeastOnce()).getSafeNodeIdForLOG();
}
/**
verifyCommonInvocationsSubSet();
Mockito.verify(connectionContext).onConnectionClosed();
Mockito.verify(connectionContext).getConnectionAdapter();
- Mockito.verify(connectionContext).getNodeId();
+ Mockito.verify(connectionContext, Mockito.atLeastOnce()).getSafeNodeIdForLOG();
}
/**
verifyCommonInvocationsSubSet();
Mockito.verify(connectionContext).onConnectionClosed();
Mockito.verify(connectionContext).getConnectionAdapter();
- Mockito.verify(connectionContext).getNodeId();
+ Mockito.verify(connectionContext, Mockito.atLeastOnce()).getSafeNodeIdForLOG();
}
/**
Mockito.verify(connectionAdapter).disconnect();
Mockito.verify(connectionContext).changeStateToTimeouting();
Mockito.verify(connectionContext).closeConnection(true);
- Mockito.verify(connectionContext).getNodeId();
+ Mockito.verify(connectionContext, Mockito.atLeastOnce()).getSafeNodeIdForLOG();
}
@Before
public void setup() throws CandidateAlreadyRegisteredException {
- roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager, lifecycleService);
+ roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager);
Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
private DeviceContext deviceContext;
@Mock
private BindingAwareBroker.RoutedRpcRegistration<TestRpcService> routedRpcReg;
+
+ private Class<TestRpcService> serviceClass;
@Mock
private NotificationPublishService notificationPublishService;
@Mock
@Test
public void testClose() {
+ serviceClass = TestRpcService.class;
+ when(routedRpcReg.getServiceType()).thenReturn(serviceClass);
rpcContext.registerRpcServiceImplementation(TestRpcService.class, serviceInstance);
rpcContext.close();
assertEquals(rpcContext.isEmptyRpcRegistrations(), true);