6af910f106b60f530e238c0f95dc3635ea743f69
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / ContextChainHolderImpl.java
1 /*
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.HashedWheelTimer;
16 import java.util.Collections;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
31 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
37 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
42 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
44 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
45 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
46 import org.opendaylight.openflowplugin.impl.util.ItemScheduler;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 public class ContextChainHolderImpl implements ContextChainHolder {
56     private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
57
58     private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
59     private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L;
60     private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2;
61     private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L;
62     private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
63
64     private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
65     private DeviceManager deviceManager;
66     private RpcManager rpcManager;
67     private StatisticsManager statisticsManager;
68     private EntityOwnershipListenerRegistration eosListenerRegistration;
69     private ClusterSingletonServiceProvider singletonServicesProvider;
70     private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
71     private final ExecutorService executorService;
72
73     public ContextChainHolderImpl(final HashedWheelTimer timer, final ExecutorService executorService) {
74         this.scheduler = new ItemScheduler<>(
75                 timer,
76                 CHECK_ROLE_MASTER_TIMEOUT,
77                 CHECK_ROLE_MASTER_TOLERANCE,
78                 ContextChain::makeDeviceSlave);
79
80         this.executorService = executorService;
81     }
82
83     @Override
84     public <T extends OFPManager> void addManager(final T manager) {
85         if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
86             LOG.trace("Context chain holder: Device manager OK.");
87             deviceManager = (DeviceManager) manager;
88         } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
89             LOG.trace("Context chain holder: RPC manager OK.");
90             rpcManager = (RpcManager) manager;
91         } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
92             LOG.trace("Context chain holder: Statistics manager OK.");
93             statisticsManager = (StatisticsManager) manager;
94         }
95     }
96
97     @Override
98     public ContextChain createContextChain(final ConnectionContext connectionContext) {
99         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
100         final String deviceInfoLOGValue = deviceInfo.getLOGValue();
101         final ContextChain contextChain = new ContextChainImpl(connectionContext);
102
103         if (LOG.isDebugEnabled()) {
104             LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
105         }
106
107         final LifecycleService lifecycleService = new LifecycleServiceImpl(this, executorService);
108         lifecycleService.registerDeviceRemovedHandler(deviceManager);
109         lifecycleService.registerDeviceRemovedHandler(rpcManager);
110         lifecycleService.registerDeviceRemovedHandler(statisticsManager);
111
112         if (LOG.isDebugEnabled()) {
113             LOG.debug("Lifecycle services" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
114         }
115
116         final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
117
118         if (LOG.isDebugEnabled()) {
119             LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
120         }
121
122         final RpcContext rpcContext = rpcManager.createContext(connectionContext.getDeviceInfo(), deviceContext);
123
124         if (LOG.isDebugEnabled()) {
125             LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
126         }
127
128         final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
129
130         if (LOG.isDebugEnabled()) {
131             LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
132         }
133
134         deviceContext.setLifecycleInitializationPhaseHandler(statisticsContext);
135         statisticsContext.setLifecycleInitializationPhaseHandler(rpcContext);
136         statisticsContext.setInitialSubmitHandler(deviceContext);
137
138         contextChain.addLifecycleService(lifecycleService);
139         contextChain.addContext(deviceContext);
140         contextChain.addContext(rpcContext);
141         contextChain.addContext(statisticsContext);
142
143         LOG.info("Starting timer for setting SLAVE role on node {} if no role will be set in {}s.",
144                 deviceInfo.getLOGValue(), CHECK_ROLE_MASTER_TIMEOUT / 1000L);
145         scheduler.add(deviceInfo, contextChain);
146         scheduler.startIfNotRunning();
147         deviceContext.onPublished();
148         contextChain.registerServices(this.singletonServicesProvider);
149         return contextChain;
150     }
151
152     @Override
153     public synchronized void destroyContextChain(final DeviceInfo deviceInfo) {
154         Optional.ofNullable(contextChainMap.remove(deviceInfo)).ifPresent(contextChain -> {
155             deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier());
156             contextChain.close();
157         });
158     }
159
160     @Override
161     public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
162         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
163         final ContextChain contextChain = contextChainMap.get(deviceInfo);
164         LOG.info("Device {} connected.", deviceInfo.getLOGValue());
165
166         if (contextChain != null) {
167             if (contextChain.addAuxiliaryConnection(connectionContext)) {
168                 LOG.info("An auxiliary connection was added to device: {}", deviceInfo.getLOGValue());
169                 return ConnectionStatus.MAY_CONTINUE;
170             } else {
171                 LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo.getLOGValue());
172                 destroyContextChain(deviceInfo);
173                 return ConnectionStatus.ALREADY_CONNECTED;
174             }
175         } else {
176             if (LOG.isDebugEnabled()) {
177                 LOG.debug("No context chain found for device: {}, creating new.", deviceInfo.getLOGValue());
178             }
179             contextChainMap.put(deviceInfo, createContextChain(connectionContext));
180         }
181
182         return ConnectionStatus.MAY_CONTINUE;
183     }
184
185     @Override
186     public void addSingletonServicesProvider(final ClusterSingletonServiceProvider singletonServicesProvider) {
187         this.singletonServicesProvider = singletonServicesProvider;
188     }
189
190     @Override
191     public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason, final boolean mandatory) {
192         LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo.getLOGValue(), reason);
193
194         if (!mandatory) {
195             return;
196         }
197
198         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
199             LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo.getLOGValue());
200             addDestroyChainCallback(contextChain.stopChain(), deviceInfo);
201         });
202     }
203
204     @Override
205     public void onMasterRoleAcquired(final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) {
206         scheduler.remove(deviceInfo);
207
208         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
209             if (contextChain.isMastered(mastershipState)) {
210                 LOG.info("Role MASTER was granted to device {}", deviceInfo.getLOGValue());
211                 deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
212             }
213         });
214     }
215
216     @Override
217     public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
218         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
219     }
220
221     @Override
222     public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
223         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo));
224     }
225
226     @Override
227     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
228         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
229
230         if (Objects.isNull(deviceInfo)) {
231             return;
232         }
233
234         Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
235             if (contextChain.auxiliaryConnectionDropped(connectionContext)) {
236                 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo.getLOGValue());
237             } else {
238                 LOG.info("Device {} disconnected.", deviceInfo.getLOGValue());
239                 addDestroyChainCallback(contextChain.connectionDropped(), deviceInfo);
240             }
241         });
242     }
243
244     @Override
245     public void changeEntityOwnershipService(@Nonnull final EntityOwnershipService entityOwnershipService) {
246         if (Objects.nonNull(this.eosListenerRegistration)) {
247             LOG.warn("Entity ownership service listener is already registered.");
248         } else {
249             this.eosListenerRegistration = Verify.verifyNotNull(entityOwnershipService.registerListener
250                     (ASYNC_SERVICE_ENTITY_TYPE, this));
251         }
252     }
253
254     @VisibleForTesting
255     boolean checkAllManagers() {
256         return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
257     }
258
259     @Override
260     public void close() throws Exception {
261         scheduler.close();
262
263         contextChainMap.forEach((deviceInfo, contextChain) -> {
264             if (contextChain.isMastered(ContextChainMastershipState.CHECK)) {
265                 addDestroyChainCallback(contextChain.stopChain(), deviceInfo);
266             } else {
267                 destroyContextChain(deviceInfo);
268             }
269         });
270
271         contextChainMap.clear();
272
273
274         if (Objects.nonNull(eosListenerRegistration)) {
275             eosListenerRegistration.close();
276             eosListenerRegistration = null;
277         }
278     }
279
280     @Override
281     public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
282         if (entityOwnershipChange.hasOwner()) {
283             return;
284         }
285
286         final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange);
287
288         if (Objects.nonNull(entityName)) {
289             if (LOG.isDebugEnabled()) {
290                 LOG.debug("Entity {} has no owner", entityName);
291             }
292
293             final NodeId nodeId = new NodeId(entityName);
294
295             try {
296                 final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier =
297                         DeviceStateUtil.createNodeInstanceIdentifier(nodeId);
298
299                 deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
300
301                 LOG.info("Removing device {} from operational DS", nodeId);
302                 deviceManager
303                         .removeDeviceFromOperationalDS(nodeInstanceIdentifier)
304                         .checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
305             } catch (TimeoutException | TransactionCommitFailedException e) {
306                 LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
307                         nodeId);
308             }
309         }
310     }
311
312     private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
313         final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
314                 (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
315                         .getEntity()
316                         .getId()
317                         .getLastPathArgument();
318
319         return lastIdArgument
320                 .getKeyValues()
321                 .values()
322                 .iterator()
323                 .next()
324                 .toString();
325     }
326
327     private void addDestroyChainCallback(final ListenableFuture<Void> future, final DeviceInfo deviceInfo) {
328         scheduler.remove(deviceInfo);
329
330         Futures.addCallback(future, new FutureCallback<Void>() {
331             @Override
332             public void onSuccess(@Nullable final Void aVoid) {
333                 destroyContextChain(deviceInfo);
334             }
335
336             @Override
337             public void onFailure(@Nonnull final Throwable throwable) {
338                 destroyContextChain(deviceInfo);
339             }
340         });
341     }
342 }