2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.annotations.VisibleForTesting;
11 import io.netty.util.HashedWheelTimer;
12 import java.util.Collections;
13 import java.util.HashMap;
15 import java.util.Objects;
16 import java.util.Optional;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
26 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
27 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
28 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
35 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
36 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
39 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
40 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
44 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 public class ContextChainHolderImpl implements ContextChainHolder {
50 private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
52 private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
53 private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
54 private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
55 private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
56 private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
58 private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
59 private final EntityOwnershipListenerRegistration eosListenerRegistration;
60 private final HashedWheelTimer timer;
61 private final ClusterSingletonServiceProvider singletonServiceProvider;
62 private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
63 private final ExecutorService executorService;
64 private DeviceManager deviceManager;
65 private RpcManager rpcManager;
66 private StatisticsManager statisticsManager;
68 public ContextChainHolderImpl(final HashedWheelTimer timer,
69 final ExecutorService executorService,
70 final ClusterSingletonServiceProvider singletonServiceProvider,
71 final EntityOwnershipService entityOwnershipService) {
73 this.singletonServiceProvider = singletonServiceProvider;
74 this.executorService = executorService;
75 this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
76 .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
78 this.scheduler = new ItemScheduler<>(
80 CHECK_ROLE_MASTER_TIMEOUT,
81 CHECK_ROLE_MASTER_TOLERANCE,
82 ContextChain::makeDeviceSlave);
86 public <T extends OFPManager> void addManager(final T manager) {
87 if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
88 LOG.trace("Context chain holder: Device manager OK.");
89 deviceManager = (DeviceManager) manager;
90 } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
91 LOG.trace("Context chain holder: RPC manager OK.");
92 rpcManager = (RpcManager) manager;
93 } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
94 LOG.trace("Context chain holder: Statistics manager OK.");
95 statisticsManager = (StatisticsManager) manager;
100 ContextChain createContextChain(final ConnectionContext connectionContext) {
101 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
103 final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
104 deviceContext.registerMastershipChangeListener(this);
105 LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
107 final RpcContext rpcContext = rpcManager.createContext(deviceContext);
108 rpcContext.registerMastershipChangeListener(this);
109 LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
111 final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
112 statisticsContext.registerMastershipChangeListener(this);
113 LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
115 final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
117 contextChain.registerDeviceRemovedHandler(deviceManager);
118 contextChain.registerDeviceRemovedHandler(rpcManager);
119 contextChain.registerDeviceRemovedHandler(statisticsManager);
120 contextChain.registerDeviceRemovedHandler(this);
121 contextChain.addContext(deviceContext);
122 contextChain.addContext(rpcContext);
123 contextChain.addContext(statisticsContext);
124 contextChainMap.put(deviceInfo, contextChain);
125 LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
127 deviceContext.onPublished();
128 scheduler.add(deviceInfo, contextChain);
129 scheduler.startIfNotRunning();
130 LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.",
132 CHECK_ROLE_MASTER_TIMEOUT / 1000L);
134 contextChain.registerServices(singletonServiceProvider);
139 public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
140 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
141 final ContextChain contextChain = contextChainMap.get(deviceInfo);
142 LOG.info("Device {} connected.", deviceInfo);
144 if (Objects.nonNull(contextChain)) {
145 if (contextChain.isClosing()) {
146 LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo);
147 return ConnectionStatus.CLOSING;
150 if (contextChain.addAuxiliaryConnection(connectionContext)) {
151 LOG.info("An auxiliary connection was added to device: {}", deviceInfo);
152 return ConnectionStatus.MAY_CONTINUE;
155 LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo);
156 destroyContextChain(deviceInfo);
157 return ConnectionStatus.ALREADY_CONNECTED;
160 LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
161 final ContextChain newContextChain = createContextChain(connectionContext);
162 LOG.debug("Successfully created context chain with identifier: {}", newContextChain.getIdentifier());
163 return ConnectionStatus.MAY_CONTINUE;
167 public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason, final boolean mandatory) {
168 LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason);
174 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
175 LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo);
176 destroyContextChain(deviceInfo);
181 public void onMasterRoleAcquired(final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) {
182 scheduler.remove(deviceInfo);
184 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
185 if (contextChain.isMastered(mastershipState)) {
186 LOG.info("Role MASTER was granted to device {}", deviceInfo);
187 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
193 public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
194 scheduler.remove(deviceInfo);
195 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
199 public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
200 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
204 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
205 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
207 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
208 if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
209 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo);
211 LOG.info("Device {} disconnected.", deviceInfo);
212 destroyContextChain(deviceInfo);
218 boolean checkAllManagers() {
219 return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
223 public void close() throws Exception {
225 contextChainMap.keySet().forEach(this::destroyContextChain);
226 contextChainMap.clear();
227 eosListenerRegistration.close();
231 public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
232 if (entityOwnershipChange.hasOwner()) {
236 final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
238 if (Objects.nonNull(entityName)) {
239 LOG.debug("Entity {} has no owner", entityName);
240 final NodeId nodeId = new NodeId(entityName);
243 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
244 DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
246 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
248 LOG.info("Removing device {} from operational DS", nodeId);
250 .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
251 .checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
252 } catch (TimeoutException | TransactionCommitFailedException e) {
253 LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
259 private synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
260 scheduler.remove(deviceInfo);
262 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
263 deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
264 contextChain.close();
268 private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
269 final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
270 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
273 .getLastPathArgument();
275 return lastIdArgument
284 public void onDeviceRemoved(final DeviceInfo deviceInfo) {
285 scheduler.remove(deviceInfo);
286 contextChainMap.remove(deviceInfo);
287 LOG.debug("Context chain removed for node {}", deviceInfo);