2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.HashedWheelTimer;
16 import java.util.Collections;
17 import java.util.HashMap;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
31 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
37 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
42 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
44 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
45 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
46 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 public class ContextChainHolderImpl implements ContextChainHolder {
56 private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
58 private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
59 private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
60 private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
61 private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
62 private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
64 private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
65 private DeviceManager deviceManager;
66 private RpcManager rpcManager;
67 private StatisticsManager statisticsManager;
68 private EntityOwnershipListenerRegistration eosListenerRegistration;
69 private ClusterSingletonServiceProvider singletonServicesProvider;
70 private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
71 private final ExecutorService executorService;
73 public ContextChainHolderImpl(final HashedWheelTimer timer, final ExecutorService executorService) {
74 this.scheduler = new ItemScheduler<>(
76 CHECK_ROLE_MASTER_TIMEOUT,
77 CHECK_ROLE_MASTER_TOLERANCE,
78 ContextChain::makeDeviceSlave);
80 this.executorService = executorService;
84 public <T extends OFPManager> void addManager(final T manager) {
85 if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
86 LOG.trace("Context chain holder: Device manager OK.");
87 deviceManager = (DeviceManager) manager;
88 } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
89 LOG.trace("Context chain holder: RPC manager OK.");
90 rpcManager = (RpcManager) manager;
91 } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
92 LOG.trace("Context chain holder: Statistics manager OK.");
93 statisticsManager = (StatisticsManager) manager;
98 public ContextChain createContextChain(final ConnectionContext connectionContext) {
99 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
100 final String deviceInfoLOGValue = deviceInfo.getLOGValue();
101 final ContextChain contextChain = new ContextChainImpl(connectionContext);
103 if (LOG.isDebugEnabled()) {
104 LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
107 final LifecycleService lifecycleService = new LifecycleServiceImpl(this, executorService);
108 lifecycleService.registerDeviceRemovedHandler(deviceManager);
109 lifecycleService.registerDeviceRemovedHandler(rpcManager);
110 lifecycleService.registerDeviceRemovedHandler(statisticsManager);
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("Lifecycle services" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
116 final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
118 if (LOG.isDebugEnabled()) {
119 LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
122 final RpcContext rpcContext = rpcManager.createContext(connectionContext.getDeviceInfo(), deviceContext);
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
128 final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
130 if (LOG.isDebugEnabled()) {
131 LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
134 deviceContext.setLifecycleInitializationPhaseHandler(statisticsContext);
135 statisticsContext.setLifecycleInitializationPhaseHandler(rpcContext);
136 statisticsContext.setInitialSubmitHandler(deviceContext);
138 contextChain.addLifecycleService(lifecycleService);
139 contextChain.addContext(deviceContext);
140 contextChain.addContext(rpcContext);
141 contextChain.addContext(statisticsContext);
143 LOG.info("Starting timer for setting SLAVE role on node {} if no role will be set in {}s.",
144 deviceInfo.getLOGValue(), CHECK_ROLE_MASTER_TIMEOUT / 1000L);
145 scheduler.add(deviceInfo, contextChain);
146 scheduler.startIfNotRunning();
147 deviceContext.onPublished();
148 contextChain.registerServices(this.singletonServicesProvider);
153 public synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
154 Optional.ofNullable(contextChainMap.remove(deviceInfo)).ifPresent(contextChain -> {
155 deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
156 contextChain.close();
161 public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
162 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
163 final ContextChain contextChain = contextChainMap.get(deviceInfo);
164 LOG.info("Device {} connected.", deviceInfo.getLOGValue());
166 if (contextChain != null) {
167 if (contextChain.addAuxiliaryConnection(connectionContext)) {
168 LOG.info("An auxiliary connection was added to device: {}", deviceInfo.getLOGValue());
169 return ConnectionStatus.MAY_CONTINUE;
171 LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo.getLOGValue());
172 destroyContextChain(deviceInfo);
173 return ConnectionStatus.ALREADY_CONNECTED;
176 if (LOG.isDebugEnabled()) {
177 LOG.debug("No context chain found for device: {}, creating new.", deviceInfo.getLOGValue());
179 contextChainMap.put(deviceInfo, createContextChain(connectionContext));
182 return ConnectionStatus.MAY_CONTINUE;
186 public void addSingletonServicesProvider(final ClusterSingletonServiceProvider singletonServicesProvider) {
187 this.singletonServicesProvider = singletonServicesProvider;
191 public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason, final boolean mandatory) {
192 LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo.getLOGValue(), reason);
198 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
199 LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo.getLOGValue());
200 addDestroyChainCallback(contextChain.stopChain(), deviceInfo);
205 public void onMasterRoleAcquired(final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) {
206 scheduler.remove(deviceInfo);
208 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
209 if (contextChain.isMastered(mastershipState)) {
210 LOG.info("Role MASTER was granted to device {}", deviceInfo.getLOGValue());
211 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
217 public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
218 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
222 public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
223 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
227 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
228 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
230 if (Objects.isNull(deviceInfo)) {
234 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
235 if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
236 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo.getLOGValue());
238 LOG.info("Device {} disconnected.", deviceInfo.getLOGValue());
239 addDestroyChainCallback(contextChain.connectionDropped(), deviceInfo);
245 public void changeEntityOwnershipService(@Nonnull final EntityOwnershipService entityOwnershipService) {
246 if (Objects.nonNull(this.eosListenerRegistration)) {
247 LOG.warn("Entity ownership service listener is already registered.");
249 this.eosListenerRegistration = Verify.verifyNotNull(entityOwnershipService.registerListener
250 (ASYNC_SERVICE_ENTITY_TYPE, this));
255 boolean checkAllManagers() {
256 return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
260 public void close() throws Exception {
263 contextChainMap.forEach((deviceInfo, contextChain) -> {
264 if (contextChain.isMastered(ContextChainMastershipState.CHECK)) {
265 addDestroyChainCallback(contextChain.stopChain(), deviceInfo);
267 destroyContextChain(deviceInfo);
271 contextChainMap.clear();
274 if (Objects.nonNull(eosListenerRegistration)) {
275 eosListenerRegistration.close();
276 eosListenerRegistration = null;
281 public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
282 if (entityOwnershipChange.hasOwner()) {
286 final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
288 if (Objects.nonNull(entityName)) {
289 if (LOG.isDebugEnabled()) {
290 LOG.debug("Entity {} has no owner", entityName);
293 final NodeId nodeId = new NodeId(entityName);
296 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
297 DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
299 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
301 LOG.info("Removing device {} from operational DS", nodeId);
303 .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
304 .checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
305 } catch (TimeoutException | TransactionCommitFailedException e) {
306 LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
312 private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
313 final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
314 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
317 .getLastPathArgument();
319 return lastIdArgument
327 private void addDestroyChainCallback(final ListenableFuture<Void> future, final DeviceInfo deviceInfo) {
328 scheduler.remove(deviceInfo);
330 Futures.addCallback(future, new FutureCallback<Void>() {
332 public void onSuccess(@Nullable final Void aVoid) {
333 destroyContextChain(deviceInfo);
337 public void onFailure(@Nonnull final Throwable throwable) {
338 destroyContextChain(deviceInfo);