Merge "BUG 6179 - Turning off flow removed notification"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener;
45 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
49 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
50 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
52 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  *
65  */
66 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
67
68     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
69
70     private final long globalNotificationQuota;
71     private final boolean switchFeaturesMandatory;
72     private boolean isNotificationFlowRemovedOff;
73
74     private final int spyRate = 10;
75
76     private final DataBroker dataBroker;
77     private TranslatorLibrary translatorLibrary;
78     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
79     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
80
81     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
82
83     private final long barrierIntervalNanos;
84     private final int barrierCountLimit;
85     private ExtensionConverterProvider extensionConverterProvider;
86     private ScheduledThreadPoolExecutor spyPool;
87     private Set<DeviceSynchronizeListener> deviceSynchronizedListeners;
88     private Set<DeviceValidListener> deviceValidListeners;
89
90     private final LifecycleConductor conductor;
91
92     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93                              final long globalNotificationQuota, final boolean switchFeaturesMandatory,
94                              final long barrierInterval, final int barrierCountLimit,
95                              final LifecycleConductor lifecycleConductor, boolean isNotificationFlowRemovedOff) {
96         this.switchFeaturesMandatory = switchFeaturesMandatory;
97         this.globalNotificationQuota = globalNotificationQuota;
98         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
99         this.dataBroker = Preconditions.checkNotNull(dataBroker);
100         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
101         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
102
103         final NodesBuilder nodesBuilder = new NodesBuilder();
104         nodesBuilder.setNode(Collections.<Node>emptyList());
105         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
106         try {
107             tx.submit().get();
108         } catch (ExecutionException | InterruptedException e) {
109             LOG.error("Creation of node failed.", e);
110             throw new IllegalStateException(e);
111         }
112
113         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
114         this.barrierCountLimit = barrierCountLimit;
115
116         this.conductor = lifecycleConductor;
117         spyPool = new ScheduledThreadPoolExecutor(1);
118         this.deviceSynchronizedListeners = new HashSet<>();
119         this.deviceValidListeners = new HashSet<>();
120     }
121
122
123     @Override
124     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
125         this.deviceInitPhaseHandler = handler;
126     }
127
128     @Override
129     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception {
130         // final phase - we have to add new Device to MD-SAL DataStore
131         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
132         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
133         ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
134         deviceContext.onPublished();
135     }
136
137     @Override
138     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
139         Preconditions.checkArgument(connectionContext != null);
140
141         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
142         /**
143          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
144          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
145          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
146          */
147          if (deviceContexts.containsKey(deviceInfo)) {
148             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
149              return false;
150          }
151
152         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
153                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
154
155         // Add Disconnect handler
156         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
157         // Cache this for clarity
158         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
159
160         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
161         connectionAdapter.setPacketInFiltering(true);
162
163         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
164
165         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
166         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
167                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
168         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
169
170         final DeviceState deviceState = new DeviceStateImpl(deviceInfo);
171         this.addDeviceSynchronizeListener(deviceState);
172         this.addDeviceValidListener(deviceState);
173
174         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
175                 deviceState,
176                 dataBroker,
177                 conductor,
178                 outboundQueueProvider,
179                 translatorLibrary,
180                 this);
181
182         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
183
184         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
185         deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
186
187         updatePacketInRateLimiters();
188
189         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
190                 connectionAdapter, deviceContext);
191         connectionAdapter.setMessageListener(messageListener);
192         notifyDeviceValidListeners(deviceInfo, true);
193
194         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo());
195
196         notifyDeviceSynchronizeListeners(deviceInfo, true);
197
198         return true;
199     }
200
201     private void updatePacketInRateLimiters() {
202         synchronized (deviceContexts) {
203             final int deviceContextsSize = deviceContexts.size();
204             if (deviceContextsSize > 0) {
205                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
206                 if (freshNotificationLimit < 100) {
207                     freshNotificationLimit = 100;
208                 }
209                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
210                 for (final DeviceContext deviceContext : deviceContexts.values()) {
211                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
212                 }
213             }
214         }
215     }
216
217     @Override
218     public TranslatorLibrary oook() {
219         return translatorLibrary;
220     }
221
222     @Override
223     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
224         this.translatorLibrary = translatorLibrary;
225     }
226
227     @Override
228     public void close() {
229         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
230                 iterator.hasNext();) {
231             final DeviceContext deviceCtx = iterator.next();
232             notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false);
233             deviceCtx.shutdownConnection();
234             deviceCtx.shuttingDownDataStoreTransactions();
235         }
236
237         if (spyPool != null) {
238             spyPool.shutdownNow();
239             spyPool = null;
240         }
241     }
242
243     @Override
244     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
245         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
246         deviceContexts.remove(deviceInfo);
247         updatePacketInRateLimiters();
248     }
249
250     @Override
251     public void initialize() {
252         spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS);
253     }
254
255     @Override
256     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
257         this.extensionConverterProvider = extensionConverterProvider;
258     }
259
260     @Override
261     public ExtensionConverterProvider getExtensionConverterProvider() {
262         return extensionConverterProvider;
263     }
264
265     @Override
266     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
267         this.deviceTerminPhaseHandler = handler;
268     }
269
270     @Override
271     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
272         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
273         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
274         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
275
276         if (null == deviceCtx) {
277             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
278             return;
279         }
280
281         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
282             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
283             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
284         } else {
285             notifyDeviceValidListeners(deviceInfo, false);
286             /* Device is disconnected and so we need to close TxManager */
287             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
288             Futures.addCallback(future, new FutureCallback<Void>() {
289
290                 @Override
291                 public void onSuccess(final Void result) {
292                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
293                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
294                 }
295
296                 @Override
297                 public void onFailure(final Throwable t) {
298                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
299                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
300                 }
301             });
302             /* Add timer for Close TxManager because it could fain ind cluster without notification */
303             final TimerTask timerTask = timeout -> {
304                 if (!future.isDone()) {
305                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
306                     future.cancel(false);
307                 }
308             };
309             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
310         }
311     }
312
313     @VisibleForTesting
314     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
315         deviceContexts.put(deviceInfo, deviceContext);
316     }
317
318     @Override
319     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
320         return (T) deviceContexts.get(deviceInfo);
321     }
322
323     @Override
324     public ListenableFuture<Void> onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) {
325         DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo);
326         LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId());
327         if (OfpRole.BECOMEMASTER.equals(role)) {
328             return onDeviceTakeClusterLeadership(deviceInfo);
329         }
330         return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager();
331     }
332
333     @Override
334     public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) {
335         this.deviceSynchronizedListeners.add(deviceSynchronizeListener);
336     }
337
338     @Override
339     public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) {
340         for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) {
341             listener.deviceIsSynchronized(deviceInfo, deviceSynchronized);
342         }
343     }
344
345     @Override
346     public void addDeviceValidListener(final DeviceValidListener deviceValidListener) {
347         this.deviceValidListeners.add(deviceValidListener);
348     }
349
350     @Override
351     public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) {
352         for (DeviceValidListener listener : deviceValidListeners) {
353             listener.deviceIsValid(deviceInfo, deviceValid);
354         }
355     }
356
357     @Override
358     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
359         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
360     }
361
362     @Override
363     public boolean getIsNotificationFlowRemovedOff() {
364         return this.isNotificationFlowRemovedOff;
365     }
366
367     private ListenableFuture<Void> onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) {
368         LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId());
369         /* validation */
370         StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo);
371         if (statisticsContext == null) {
372             final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId());
373             LOG.warn(errMsg);
374             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
375         }
376         DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo);
377         /* Prepare init info collecting */
378         notifyDeviceSynchronizeListeners(deviceInfo, false);
379         ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager();
380         /* Init Collecting NodeInfo */
381         final ListenableFuture<Void> initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation(
382                 deviceContext, switchFeaturesMandatory);
383         /* Init Collecting StatInfo */
384         final ListenableFuture<Boolean> statPollFuture = Futures.transform(initCollectingDeviceInfo,
385                 new AsyncFunction<Void, Boolean>() {
386
387                     @Override
388                     public ListenableFuture<Boolean> apply(@Nonnull final Void input) throws Exception {
389                         statisticsContext.statListForCollectingInitialization();
390                         return statisticsContext.initialGatherDynamicData();
391                     }
392                 });
393
394         return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
395
396             @Override
397             public Void apply(final Boolean input) {
398                 if (ConnectionContext.CONNECTION_STATE.RIP.equals(conductor.gainConnectionStateSafely(deviceInfo))) {
399                     final String errMsg = String.format("We lost connection for Device %s, context has to be closed.",
400                             deviceInfo.getNodeId());
401                     LOG.warn(errMsg);
402                     throw new IllegalStateException(errMsg);
403                 }
404                 if (!input) {
405                     final String errMsg = String.format("Get Initial Device %s information fails",
406                             deviceInfo.getNodeId());
407                     LOG.warn(errMsg);
408                     throw new IllegalStateException(errMsg);
409                 }
410                 LOG.debug("Get Initial Device {} information is successful", deviceInfo.getNodeId());
411                 notifyDeviceSynchronizeListeners(deviceInfo, true);
412                 ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
413                 deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);
414                 return null;
415             }
416         });
417     }
418
419 }