2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
45 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
48 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
49 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
54 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker {
60 private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
62 private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
63 private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
64 private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
65 private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
66 private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
68 private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
69 private final EntityOwnershipListenerRegistration eosListenerRegistration;
70 private final ClusterSingletonServiceProvider singletonServiceProvider;
71 private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
72 private final ExecutorService executorService;
73 private final OwnershipChangeListener ownershipChangeListener;
74 private DeviceManager deviceManager;
75 private RpcManager rpcManager;
76 private StatisticsManager statisticsManager;
78 public ContextChainHolderImpl(final HashedWheelTimer timer,
79 final ExecutorService executorService,
80 final ClusterSingletonServiceProvider singletonServiceProvider,
81 final EntityOwnershipService entityOwnershipService,
82 final OwnershipChangeListener ownershipChangeListener) {
83 this.singletonServiceProvider = singletonServiceProvider;
84 this.executorService = executorService;
85 this.ownershipChangeListener = ownershipChangeListener;
86 this.ownershipChangeListener.setMasterChecker(this);
87 this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
88 .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
90 this.scheduler = new ItemScheduler<>(
92 CHECK_ROLE_MASTER_TIMEOUT,
93 CHECK_ROLE_MASTER_TOLERANCE,
94 ContextChain::makeDeviceSlave);
98 public <T extends OFPManager> void addManager(final T manager) {
99 if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
100 LOG.trace("Context chain holder: Device manager OK.");
101 deviceManager = (DeviceManager) manager;
102 } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
103 LOG.trace("Context chain holder: RPC manager OK.");
104 rpcManager = (RpcManager) manager;
105 } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
106 LOG.trace("Context chain holder: Statistics manager OK.");
107 statisticsManager = (StatisticsManager) manager;
112 void createContextChain(final ConnectionContext connectionContext) {
113 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
115 final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
116 deviceContext.registerMastershipWatcher(this);
117 LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
119 final RpcContext rpcContext = rpcManager.createContext(deviceContext);
120 rpcContext.registerMastershipWatcher(this);
121 LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
123 final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
124 statisticsContext.registerMastershipWatcher(this);
125 LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
127 final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
129 contextChain.registerDeviceRemovedHandler(deviceManager);
130 contextChain.registerDeviceRemovedHandler(rpcManager);
131 contextChain.registerDeviceRemovedHandler(statisticsManager);
132 contextChain.registerDeviceRemovedHandler(this);
133 contextChain.addContext(deviceContext);
134 contextChain.addContext(rpcContext);
135 contextChain.addContext(statisticsContext);
136 contextChainMap.put(deviceInfo, contextChain);
137 LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
139 deviceContext.onPublished();
140 scheduler.add(deviceInfo, contextChain);
141 scheduler.startIfNotRunning();
142 LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.",
144 CHECK_ROLE_MASTER_TIMEOUT / 1000L);
146 contextChain.registerServices(singletonServiceProvider);
150 public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
151 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
152 final ContextChain contextChain = contextChainMap.get(deviceInfo);
153 LOG.info("Device {} connected.", deviceInfo);
155 if (Objects.nonNull(contextChain)) {
156 if (contextChain.isClosing()) {
157 LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo);
158 return ConnectionStatus.CLOSING;
161 if (contextChain.addAuxiliaryConnection(connectionContext)) {
162 LOG.info("An auxiliary connection was added to device: {}", deviceInfo);
163 return ConnectionStatus.MAY_CONTINUE;
166 LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo);
167 destroyContextChain(deviceInfo);
168 return ConnectionStatus.ALREADY_CONNECTED;
171 LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
172 createContextChain(connectionContext);
173 return ConnectionStatus.MAY_CONTINUE;
177 public void onNotAbleToStartMastership(@Nonnull final DeviceInfo deviceInfo,
178 @Nonnull final String reason,
179 final boolean mandatory) {
180 LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason);
186 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
187 LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo);
188 destroyContextChain(deviceInfo);
193 public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo,
194 @Nonnull final ContextChainMastershipState mastershipState) {
195 scheduler.remove(deviceInfo);
196 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
197 if (ownershipChangeListener.isReconciliationFrameworkRegistered()) {
198 if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) {
199 LOG.error("Initial submit is not allowed here if using reconciliation framework.");
201 contextChain.isMastered(mastershipState);
202 if (contextChain.isPrepared()) {
204 ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
205 reconciliationFrameworkCallback(deviceInfo, contextChain),
206 MoreExecutors.directExecutor());
209 } else if (contextChain.isMastered(mastershipState)) {
210 LOG.info("Role MASTER was granted to device {}", deviceInfo);
211 ownershipChangeListener.becomeMaster(deviceInfo);
212 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
218 public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
219 scheduler.remove(deviceInfo);
220 ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
221 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
225 public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
226 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
230 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
231 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
233 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
234 if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
235 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo);
237 LOG.info("Device {} disconnected.", deviceInfo);
238 destroyContextChain(deviceInfo);
244 boolean checkAllManagers() {
245 return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
249 public void close() throws Exception {
251 Map<DeviceInfo, ContextChain> copyOfChains = new HashMap<>(contextChainMap);
252 copyOfChains.keySet().forEach(this::destroyContextChain);
253 copyOfChains.clear();
254 contextChainMap.clear();
255 eosListenerRegistration.close();
259 public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
260 if (entityOwnershipChange.hasOwner()) {
264 final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
266 if (Objects.nonNull(entityName)) {
267 LOG.debug("Entity {} has no owner", entityName);
268 final NodeId nodeId = new NodeId(entityName);
271 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
272 DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
274 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
276 LOG.info("Try to remove device {} from operational DS", nodeId);
278 .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
279 .get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
280 LOG.info("Removing device from operational DS {} was successful", nodeId);
281 } catch (TimeoutException | ExecutionException | NullPointerException | InterruptedException e) {
282 LOG.warn("Not able to remove device {} from operational DS. ",nodeId, e);
287 private synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
288 scheduler.remove(deviceInfo);
290 Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
291 deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
292 contextChain.close();
297 public List<DeviceInfo> listOfMasteredDevices() {
298 return contextChainMap
301 .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry
303 .isMastered(ContextChainMastershipState.CHECK))
304 .map(Map.Entry::getKey)
305 .collect(Collectors.toList());
309 public boolean isAnyDeviceMastered() {
310 return contextChainMap
314 .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry.getValue()
315 .isMastered(ContextChainMastershipState.CHECK))
319 private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
320 final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
321 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
324 .getLastPathArgument();
326 return lastIdArgument
335 public void onDeviceRemoved(final DeviceInfo deviceInfo) {
336 scheduler.remove(deviceInfo);
337 contextChainMap.remove(deviceInfo);
338 LOG.debug("Context chain removed for node {}", deviceInfo);
341 private FutureCallback<ResultState> reconciliationFrameworkCallback(
342 @Nonnull DeviceInfo deviceInfo,
343 ContextChain contextChain) {
344 return new FutureCallback<ResultState>() {
346 public void onSuccess(@Nullable ResultState result) {
347 if (ResultState.DONOTHING == result) {
348 LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo);
349 if (!contextChain.continueInitializationAfterReconciliation()) {
350 LOG.warn("Initialization submit after reconciliation failed for device {}", deviceInfo);
351 destroyContextChain(deviceInfo);
353 ownershipChangeListener.becomeMaster(deviceInfo);
354 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
357 LOG.warn("Reconciliation framework failure for device {}", deviceInfo);
358 destroyContextChain(deviceInfo);
363 public void onFailure(@Nonnull Throwable t) {
364 LOG.warn("Reconciliation framework failure.");
365 destroyContextChain(deviceInfo);