Remove deprecated CheckedFuture.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainHolderImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.stream.Collectors;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
45 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
48 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
49 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
54 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker {
60     private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
61
62     private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
63     private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
64     private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
65     private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
66     private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
67
68     private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
69     private final EntityOwnershipListenerRegistration eosListenerRegistration;
70     private final ClusterSingletonServiceProvider singletonServiceProvider;
71     private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
72     private final ExecutorService executorService;
73     private final OwnershipChangeListener ownershipChangeListener;
74     private DeviceManager deviceManager;
75     private RpcManager rpcManager;
76     private StatisticsManager statisticsManager;
77
78     public ContextChainHolderImpl(final HashedWheelTimer timer,
79                                   final ExecutorService executorService,
80                                   final ClusterSingletonServiceProvider singletonServiceProvider,
81                                   final EntityOwnershipService entityOwnershipService,
82                                   final OwnershipChangeListener ownershipChangeListener) {
83         this.singletonServiceProvider = singletonServiceProvider;
84         this.executorService = executorService;
85         this.ownershipChangeListener = ownershipChangeListener;
86         this.ownershipChangeListener.setMasterChecker(this);
87         this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
88                 .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
89
90         this.scheduler = new ItemScheduler<>(
91                 timer,
92                 CHECK_ROLE_MASTER_TIMEOUT,
93                 CHECK_ROLE_MASTER_TOLERANCE,
94                 ContextChain::makeDeviceSlave);
95     }
96
97     @Override
98     public <T extends OFPManager> void addManager(final T manager) {
99         if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
100             LOG.trace("Context chain holder: Device manager OK.");
101             deviceManager = (DeviceManager) manager;
102         } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
103             LOG.trace("Context chain holder: RPC manager OK.");
104             rpcManager = (RpcManager) manager;
105         } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
106             LOG.trace("Context chain holder: Statistics manager OK.");
107             statisticsManager = (StatisticsManager) manager;
108         }
109     }
110
111     @VisibleForTesting
112     void createContextChain(final ConnectionContext connectionContext) {
113         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
114
115         final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
116         deviceContext.registerMastershipWatcher(this);
117         LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
118
119         final RpcContext rpcContext = rpcManager.createContext(deviceContext);
120         rpcContext.registerMastershipWatcher(this);
121         LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
122
123         final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
124         statisticsContext.registerMastershipWatcher(this);
125         LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
126
127         final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
128                 executorService);
129         contextChain.registerDeviceRemovedHandler(deviceManager);
130         contextChain.registerDeviceRemovedHandler(rpcManager);
131         contextChain.registerDeviceRemovedHandler(statisticsManager);
132         contextChain.registerDeviceRemovedHandler(this);
133         contextChain.addContext(deviceContext);
134         contextChain.addContext(rpcContext);
135         contextChain.addContext(statisticsContext);
136         contextChainMap.put(deviceInfo, contextChain);
137         LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
138
139         deviceContext.onPublished();
140         scheduler.add(deviceInfo, contextChain);
141         scheduler.startIfNotRunning();
142         LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.",
143                 deviceInfo,
144                 CHECK_ROLE_MASTER_TIMEOUT / 1000L);
145
146         contextChain.registerServices(singletonServiceProvider);
147     }
148
149     @Override
150     public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
151         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
152         final ContextChain contextChain = contextChainMap.get(deviceInfo);
153         LOG.info("Device {} connected.", deviceInfo);
154
155         if (Objects.nonNull(contextChain)) {
156             if (contextChain.isClosing()) {
157                 LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo);
158                 return ConnectionStatus.CLOSING;
159             }
160
161             if (contextChain.addAuxiliaryConnection(connectionContext)) {
162                 LOG.info("An auxiliary connection was added to device: {}", deviceInfo);
163                 return ConnectionStatus.MAY_CONTINUE;
164             }
165
166             LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo);
167             destroyContextChain(deviceInfo);
168             return ConnectionStatus.ALREADY_CONNECTED;
169         }
170
171         LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
172         createContextChain(connectionContext);
173         return ConnectionStatus.MAY_CONTINUE;
174     }
175
176     @Override
177     public void onNotAbleToStartMastership(@Nonnull final DeviceInfo deviceInfo,
178                                            @Nonnull final String reason,
179                                            final boolean mandatory) {
180         LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason);
181
182         if (!mandatory) {
183             return;
184         }
185
186         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
187             LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo);
188             destroyContextChain(deviceInfo);
189         });
190     }
191
192     @Override
193     public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo,
194                                      @Nonnull final ContextChainMastershipState mastershipState) {
195         scheduler.remove(deviceInfo);
196         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
197             if (ownershipChangeListener.isReconciliationFrameworkRegistered()) {
198                 if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) {
199                     LOG.error("Initial submit is not allowed here if using reconciliation framework.");
200                 } else {
201                     contextChain.isMastered(mastershipState);
202                     if (contextChain.isPrepared()) {
203                         Futures.addCallback(
204                                 ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
205                                 reconciliationFrameworkCallback(deviceInfo, contextChain),
206                                 MoreExecutors.directExecutor());
207                     }
208                 }
209             } else if (contextChain.isMastered(mastershipState)) {
210                 LOG.info("Role MASTER was granted to device {}", deviceInfo);
211                 ownershipChangeListener.becomeMaster(deviceInfo);
212                 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
213             }
214         });
215     }
216
217     @Override
218     public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
219         scheduler.remove(deviceInfo);
220         ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
221         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
222     }
223
224     @Override
225     public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
226         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
227     }
228
229     @Override
230     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
231         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
232
233         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
234             if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
235                 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo);
236             } else {
237                 LOG.info("Device {} disconnected.", deviceInfo);
238                 destroyContextChain(deviceInfo);
239             }
240         });
241     }
242
243     @VisibleForTesting
244     boolean checkAllManagers() {
245         return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
246     }
247
248     @Override
249     public void close() throws Exception {
250         scheduler.close();
251         Map<DeviceInfo, ContextChain> copyOfChains = new HashMap<>(contextChainMap);
252         copyOfChains.keySet().forEach(this::destroyContextChain);
253         copyOfChains.clear();
254         contextChainMap.clear();
255         eosListenerRegistration.close();
256     }
257
258     @Override
259     public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
260         if (entityOwnershipChange.hasOwner()) {
261             return;
262         }
263
264         final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
265
266         if (Objects.nonNull(entityName)) {
267             LOG.debug("Entity {} has no owner", entityName);
268             final NodeId nodeId = new NodeId(entityName);
269
270             try {
271                 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
272                         DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
273
274                 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
275
276                 LOG.info("Try to remove device {} from operational DS", nodeId);
277                 deviceManager
278                         .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
279                         .get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
280                 LOG.info("Removing device from operational DS {} was successful", nodeId);
281             } catch (TimeoutException | ExecutionException | NullPointerException | InterruptedException e) {
282                 LOG.warn("Not able to remove device {} from operational DS. ",nodeId, e);
283             }
284         }
285     }
286
287     private synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
288         scheduler.remove(deviceInfo);
289
290         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
291             deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
292             contextChain.close();
293         });
294     }
295
296     @Override
297     public List<DeviceInfo> listOfMasteredDevices() {
298         return contextChainMap
299                 .entrySet()
300                 .stream()
301                 .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry
302                         .getValue()
303                         .isMastered(ContextChainMastershipState.CHECK))
304                 .map(Map.Entry::getKey)
305                 .collect(Collectors.toList());
306     }
307
308     @Override
309     public boolean isAnyDeviceMastered() {
310         return contextChainMap
311                 .entrySet()
312                 .stream()
313                 .findAny()
314                 .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry.getValue()
315                         .isMastered(ContextChainMastershipState.CHECK))
316                 .isPresent();
317     }
318
319     private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
320         final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
321                 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
322                         .getEntity()
323                         .getId()
324                         .getLastPathArgument();
325
326         return lastIdArgument
327                 .getKeyValues()
328                 .values()
329                 .iterator()
330                 .next()
331                 .toString();
332     }
333
334     @Override
335     public void onDeviceRemoved(final DeviceInfo deviceInfo) {
336         scheduler.remove(deviceInfo);
337         contextChainMap.remove(deviceInfo);
338         LOG.debug("Context chain removed for node {}", deviceInfo);
339     }
340
341     private FutureCallback<ResultState> reconciliationFrameworkCallback(
342             @Nonnull DeviceInfo deviceInfo,
343             ContextChain contextChain) {
344         return new FutureCallback<ResultState>() {
345             @Override
346             public void onSuccess(@Nullable ResultState result) {
347                 if (ResultState.DONOTHING == result) {
348                     LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo);
349                     if (!contextChain.continueInitializationAfterReconciliation()) {
350                         LOG.warn("Initialization submit after reconciliation failed for device {}", deviceInfo);
351                         destroyContextChain(deviceInfo);
352                     } else {
353                         ownershipChangeListener.becomeMaster(deviceInfo);
354                         deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
355                     }
356                 } else {
357                     LOG.warn("Reconciliation framework failure for device {}", deviceInfo);
358                     destroyContextChain(deviceInfo);
359                 }
360             }
361
362             @Override
363             public void onFailure(@Nonnull Throwable t) {
364                 LOG.warn("Reconciliation framework failure.");
365                 destroyContextChain(deviceInfo);
366             }
367         };
368     }
369 }