f521e971527b72d14c1bb6ea11a5cb09cd84a5f9
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainHolderImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.stream.Collectors;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
31 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
32 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener;
43 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
47 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
48 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
53 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker {
59     private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
60
61     private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
62     private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
63     private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
64     private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
65     private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
66
67     private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
68     private final EntityOwnershipListenerRegistration eosListenerRegistration;
69     private final ClusterSingletonServiceProvider singletonServiceProvider;
70     private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
71     private final ExecutorService executorService;
72     private final OwnershipChangeListener ownershipChangeListener;
73     private DeviceManager deviceManager;
74     private RpcManager rpcManager;
75     private StatisticsManager statisticsManager;
76
77     public ContextChainHolderImpl(final HashedWheelTimer timer,
78                                   final ExecutorService executorService,
79                                   final ClusterSingletonServiceProvider singletonServiceProvider,
80                                   final EntityOwnershipService entityOwnershipService,
81                                   final OwnershipChangeListener ownershipChangeListener) {
82         this.singletonServiceProvider = singletonServiceProvider;
83         this.executorService = executorService;
84         this.ownershipChangeListener = ownershipChangeListener;
85         this.ownershipChangeListener.setMasterChecker(this);
86         this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
87                 .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
88
89         this.scheduler = new ItemScheduler<>(
90                 timer,
91                 CHECK_ROLE_MASTER_TIMEOUT,
92                 CHECK_ROLE_MASTER_TOLERANCE,
93                 ContextChain::makeDeviceSlave);
94     }
95
96     @Override
97     public <T extends OFPManager> void addManager(final T manager) {
98         if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
99             LOG.trace("Context chain holder: Device manager OK.");
100             deviceManager = (DeviceManager) manager;
101         } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
102             LOG.trace("Context chain holder: RPC manager OK.");
103             rpcManager = (RpcManager) manager;
104         } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
105             LOG.trace("Context chain holder: Statistics manager OK.");
106             statisticsManager = (StatisticsManager) manager;
107         }
108     }
109
110     @VisibleForTesting
111     void createContextChain(final ConnectionContext connectionContext) {
112         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
113
114         final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
115         deviceContext.registerMastershipWatcher(this);
116         LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
117
118         final RpcContext rpcContext = rpcManager.createContext(deviceContext);
119         rpcContext.registerMastershipWatcher(this);
120         LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
121
122         final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
123         statisticsContext.registerMastershipWatcher(this);
124         LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
125
126         final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
127                 executorService);
128         contextChain.registerDeviceRemovedHandler(deviceManager);
129         contextChain.registerDeviceRemovedHandler(rpcManager);
130         contextChain.registerDeviceRemovedHandler(statisticsManager);
131         contextChain.registerDeviceRemovedHandler(this);
132         contextChain.addContext(deviceContext);
133         contextChain.addContext(rpcContext);
134         contextChain.addContext(statisticsContext);
135         contextChainMap.put(deviceInfo, contextChain);
136         LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
137
138         deviceContext.onPublished();
139         scheduler.add(deviceInfo, contextChain);
140         scheduler.startIfNotRunning();
141         LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.",
142                 deviceInfo,
143                 CHECK_ROLE_MASTER_TIMEOUT / 1000L);
144
145         contextChain.registerServices(singletonServiceProvider);
146     }
147
148     @Override
149     public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
150         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
151         final ContextChain contextChain = contextChainMap.get(deviceInfo);
152         LOG.info("Device {} connected.", deviceInfo);
153
154         if (Objects.nonNull(contextChain)) {
155             if (contextChain.isClosing()) {
156                 LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo);
157                 return ConnectionStatus.CLOSING;
158             }
159
160             if (contextChain.addAuxiliaryConnection(connectionContext)) {
161                 LOG.info("An auxiliary connection was added to device: {}", deviceInfo);
162                 return ConnectionStatus.MAY_CONTINUE;
163             }
164
165             LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo);
166             destroyContextChain(deviceInfo);
167             return ConnectionStatus.ALREADY_CONNECTED;
168         }
169
170         LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
171         createContextChain(connectionContext);
172         return ConnectionStatus.MAY_CONTINUE;
173     }
174
175     @Override
176     public void onNotAbleToStartMastership(@Nonnull final DeviceInfo deviceInfo,
177                                            @Nonnull final String reason,
178                                            final boolean mandatory) {
179         LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason);
180
181         if (!mandatory) {
182             return;
183         }
184
185         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
186             LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo);
187             destroyContextChain(deviceInfo);
188         });
189     }
190
191     @Override
192     public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo,
193                                      @Nonnull final ContextChainMastershipState mastershipState) {
194         scheduler.remove(deviceInfo);
195         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
196             if (ownershipChangeListener.isReconciliationFrameworkRegistered()) {
197                 if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) {
198                     LOG.error("Initial submit is not allowed here if using reconciliation framework.");
199                 } else {
200                     contextChain.isMastered(mastershipState);
201                     if (contextChain.isPrepared()) {
202                         Futures.addCallback(
203                                 ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
204                                 reconciliationFrameworkCallback(deviceInfo, contextChain),
205                                 MoreExecutors.directExecutor());
206                     }
207                 }
208             } else if (contextChain.isMastered(mastershipState)) {
209                 LOG.info("Role MASTER was granted to device {}", deviceInfo);
210                 ownershipChangeListener.becomeMaster(deviceInfo);
211                 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
212             }
213         });
214     }
215
216     @Override
217     public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
218         scheduler.remove(deviceInfo);
219         ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
220         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
221     }
222
223     @Override
224     public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
225         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
226     }
227
228     @Override
229     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
230         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
231
232         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
233             if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
234                 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo);
235             } else {
236                 LOG.info("Device {} disconnected.", deviceInfo);
237                 destroyContextChain(deviceInfo);
238             }
239         });
240     }
241
242     @VisibleForTesting
243     boolean checkAllManagers() {
244         return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
245     }
246
247     @Override
248     public void close() throws Exception {
249         scheduler.close();
250         Map<DeviceInfo, ContextChain> copyOfChains = new HashMap<>(contextChainMap);
251         copyOfChains.keySet().forEach(this::destroyContextChain);
252         copyOfChains.clear();
253         contextChainMap.clear();
254         eosListenerRegistration.close();
255     }
256
257     @Override
258     public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
259         if (entityOwnershipChange.hasOwner()) {
260             return;
261         }
262
263         final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
264
265         if (Objects.nonNull(entityName)) {
266             LOG.debug("Entity {} has no owner", entityName);
267             final NodeId nodeId = new NodeId(entityName);
268
269             try {
270                 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
271                         DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
272
273                 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
274
275                 LOG.info("Removing device {} from operational DS", nodeId);
276                 deviceManager
277                         .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
278                         .checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
279             } catch (TimeoutException | TransactionCommitFailedException | NullPointerException e) {
280                 LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
281                         nodeId);
282             }
283         }
284     }
285
286     private synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
287         scheduler.remove(deviceInfo);
288
289         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
290             deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
291             contextChain.close();
292         });
293     }
294
295     @Override
296     public List<DeviceInfo> listOfMasteredDevices() {
297         return contextChainMap
298                 .entrySet()
299                 .stream()
300                 .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry
301                         .getValue()
302                         .isMastered(ContextChainMastershipState.CHECK))
303                 .map(Map.Entry::getKey)
304                 .collect(Collectors.toList());
305     }
306
307     @Override
308     public boolean isAnyDeviceMastered() {
309         return contextChainMap
310                 .entrySet()
311                 .stream()
312                 .findAny()
313                 .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry.getValue()
314                         .isMastered(ContextChainMastershipState.CHECK))
315                 .isPresent();
316     }
317
318     private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
319         final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
320                 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
321                         .getEntity()
322                         .getId()
323                         .getLastPathArgument();
324
325         return lastIdArgument
326                 .getKeyValues()
327                 .values()
328                 .iterator()
329                 .next()
330                 .toString();
331     }
332
333     @Override
334     public void onDeviceRemoved(final DeviceInfo deviceInfo) {
335         scheduler.remove(deviceInfo);
336         contextChainMap.remove(deviceInfo);
337         LOG.debug("Context chain removed for node {}", deviceInfo);
338     }
339
340     private FutureCallback<ResultState> reconciliationFrameworkCallback(
341             @Nonnull DeviceInfo deviceInfo,
342             ContextChain contextChain) {
343         return new FutureCallback<ResultState>() {
344             @Override
345             public void onSuccess(@Nullable ResultState result) {
346                 if (ResultState.DONOTHING == result) {
347                     LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo);
348                     if (!contextChain.continueInitializationAfterReconciliation()) {
349                         LOG.warn("Initialization submit after reconciliation failed for device {}", deviceInfo);
350                         destroyContextChain(deviceInfo);
351                     } else {
352                         ownershipChangeListener.becomeMaster(deviceInfo);
353                         deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
354                     }
355                 } else {
356                     LOG.warn("Reconciliation framework failure for device {}", deviceInfo);
357                     destroyContextChain(deviceInfo);
358                 }
359             }
360
361             @Override
362             public void onFailure(@Nonnull Throwable t) {
363                 LOG.warn("Reconciliation framework failure.");
364                 destroyContextChain(deviceInfo);
365             }
366         };
367     }
368 }