Disconnection improvements.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
40 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
48 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
51 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
52 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
55 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  *
65  */
66 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
67
68     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
69
70     private final long globalNotificationQuota;
71     private final boolean switchFeaturesMandatory;
72     private boolean isFlowRemovedNotificationOn;
73     private boolean skipTableFeatures;
74     private static final int SPY_RATE = 10;
75
76     private final DataBroker dataBroker;
77     private final DeviceInitializerProvider deviceInitializerProvider;
78     private final ConvertorExecutor convertorExecutor;
79     private TranslatorLibrary translatorLibrary;
80     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
81
82     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
83     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
84
85     private long barrierIntervalNanos;
86     private int barrierCountLimit;
87
88     private ExtensionConverterProvider extensionConverterProvider;
89     private ScheduledThreadPoolExecutor spyPool;
90     private final NotificationPublishService notificationPublishService;
91     private final MessageSpy messageSpy;
92     private final HashedWheelTimer hashedWheelTimer;
93     private boolean useSingleLayerSerialization;
94
95     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
96                              final long globalNotificationQuota,
97                              final boolean switchFeaturesMandatory,
98                              final long barrierInterval,
99                              final int barrierCountLimit,
100                              final MessageSpy messageSpy,
101                              final boolean isFlowRemovedNotificationOn,
102                              final ClusterSingletonServiceProvider singletonServiceProvider,
103                              final NotificationPublishService notificationPublishService,
104                              final HashedWheelTimer hashedWheelTimer,
105                              final ConvertorExecutor convertorExecutor,
106                              final boolean skipTableFeatures,
107                              final boolean useSingleLayerSerialization,
108                              final DeviceInitializerProvider deviceInitializerProvider) {
109
110         this.dataBroker = dataBroker;
111         this.deviceInitializerProvider = deviceInitializerProvider;
112
113         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
114         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
115         final NodesBuilder nodesBuilder = new NodesBuilder();
116         nodesBuilder.setNode(Collections.<Node>emptyList());
117         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
118         try {
119             tx.submit().get();
120         } catch (ExecutionException | InterruptedException e) {
121             LOG.error("Creation of node failed.", e);
122             throw new IllegalStateException(e);
123         }
124
125         this.switchFeaturesMandatory = switchFeaturesMandatory;
126         this.globalNotificationQuota = globalNotificationQuota;
127         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
128         this.skipTableFeatures = skipTableFeatures;
129         this.convertorExecutor = convertorExecutor;
130         this.hashedWheelTimer = hashedWheelTimer;
131         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
132         this.barrierCountLimit = barrierCountLimit;
133         this.spyPool = new ScheduledThreadPoolExecutor(1);
134         this.notificationPublishService = notificationPublishService;
135         this.messageSpy = messageSpy;
136         this.useSingleLayerSerialization = useSingleLayerSerialization;
137     }
138
139
140     @Override
141     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
142     }
143
144     @Override
145     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
146         // final phase - we have to add new Device to MD-SAL DataStore
147         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
148         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
149         deviceContext.onPublished();
150         lifecycleService.registerDeviceRemovedHandler(this);
151     }
152
153     @Override
154     public TranslatorLibrary oook() {
155         return translatorLibrary;
156     }
157
158     @Override
159     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
160         this.translatorLibrary = translatorLibrary;
161     }
162
163     @Override
164     public void close() {
165         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
166                 iterator.hasNext();) {
167             final DeviceContext deviceCtx = iterator.next();
168             deviceCtx.shutdownConnection();
169             deviceCtx.shuttingDownDataStoreTransactions();
170         }
171
172         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
173         spyPool = null;
174
175     }
176
177     @Override
178     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
179         updatePacketInRateLimiters();
180         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
181     }
182
183     @Override
184     public void initialize() {
185         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
186     }
187
188     @Override
189     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
190         this.extensionConverterProvider = extensionConverterProvider;
191     }
192
193     @Override
194     public ExtensionConverterProvider getExtensionConverterProvider() {
195         return extensionConverterProvider;
196     }
197
198     @Override
199     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
200         this.deviceTerminPhaseHandler = handler;
201     }
202
203     @Override
204     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
205         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
206         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
207         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
208
209         if (Objects.isNull(deviceCtx)) {
210             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
211             return;
212         }
213
214         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
215             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
216             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
217             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
218             // If this is not primary connection, we should not continue disabling everything
219             return;
220         }
221
222         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
223             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
224             return;
225         }
226
227         deviceCtx.close();
228
229         // TODO: Auxiliary connections supported ?
230         // Device is disconnected and so we need to close TxManager
231         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
232         Futures.addCallback(future, new FutureCallback<Void>() {
233             @Override
234             public void onSuccess(final Void result) {
235                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
236                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
237             }
238
239             @Override
240             public void onFailure(final Throwable t) {
241                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
242                 LOG.trace("TxChainManager failed by closing. ", t);
243                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
244             }
245         });
246
247         // Add timer for Close TxManager because it could fail in cluster without notification
248         final TimerTask timerTask = timeout -> {
249             if (!future.isDone()) {
250                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
251                 future.cancel(false);
252             }
253         };
254
255         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
256     }
257
258     @VisibleForTesting
259     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
260         deviceContexts.put(deviceInfo, deviceContext);
261     }
262
263     @Override
264     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
265         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
266     }
267
268     @Override
269     public boolean isFlowRemovedNotificationOn() {
270         return this.isFlowRemovedNotificationOn;
271     }
272
273
274     @Override
275     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
276         skipTableFeatures = skipTableFeaturesValue;
277     }
278
279     @Override
280     public void setBarrierCountLimit(final int barrierCountLimit) {
281         this.barrierCountLimit = barrierCountLimit;
282     }
283
284     @Override
285     public void setBarrierInterval(final long barrierTimeoutLimit) {
286         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
287     }
288
289     @Override
290     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
291         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
292         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
293         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
294
295         Futures.addCallback(delFuture, new FutureCallback<Void>() {
296             @Override
297             public void onSuccess(final Void result) {
298                 if (LOG.isDebugEnabled()) {
299                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
300                 }
301             }
302
303             @Override
304             public void onFailure(@Nonnull final Throwable t) {
305                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
306             }
307         });
308
309         return delFuture;
310     }
311
312     @Override
313     public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
314         this.useSingleLayerSerialization = useSingleLayerSerialization;
315     }
316
317     public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
318
319         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
320                 connectionContext.getConnectionAdapter().getRemoteAddress(),
321                 connectionContext.getDeviceInfo().getNodeId());
322
323         connectionContext.getConnectionAdapter().setPacketInFiltering(true);
324
325         final OutboundQueueProvider outboundQueueProvider
326                 = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
327
328         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
329         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
330                 connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
331                         outboundQueueProvider,
332                         barrierCountLimit,
333                         barrierIntervalNanos);
334         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
335
336
337         final DeviceContext deviceContext = new DeviceContextImpl(
338                 connectionContext,
339                 dataBroker,
340                 messageSpy,
341                 translatorLibrary,
342                 this,
343                 convertorExecutor,
344                 skipTableFeatures,
345                 hashedWheelTimer,
346                 useSingleLayerSerialization,
347                 deviceInitializerProvider);
348
349         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
350         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
351         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
352         deviceContext.setNotificationPublishService(notificationPublishService);
353
354         deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
355         updatePacketInRateLimiters();
356
357         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
358                 connectionContext.getConnectionAdapter(), deviceContext);
359
360         connectionContext.getConnectionAdapter().setMessageListener(messageListener);
361
362         return deviceContext;
363     }
364
365     private void updatePacketInRateLimiters() {
366         synchronized (deviceContexts) {
367             final int deviceContextsSize = deviceContexts.size();
368             if (deviceContextsSize > 0) {
369                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
370                 if (freshNotificationLimit < 100) {
371                     freshNotificationLimit = 100;
372                 }
373                 if (LOG.isDebugEnabled()) {
374                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
375                 }
376                 for (final DeviceContext deviceContext : deviceContexts.values()) {
377                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
378                 }
379             }
380         }
381     }
382
383     public void onDeviceRemoved(DeviceInfo deviceInfo) {
384         deviceContexts.remove(deviceInfo);
385         LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
386
387         lifecycleServices.remove(deviceInfo);
388         LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
389     }
390
391     @Override
392     public long getBarrierIntervalNanos() {
393         return barrierIntervalNanos;
394     }
395
396     @Override
397     public int getBarrierCountLimit() {
398         return barrierCountLimit;
399     }
400 }