Merge "BUG-6458:"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  *
62  */
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
64
65     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
66
67     private final long globalNotificationQuota;
68     private final boolean switchFeaturesMandatory;
69     private boolean isNotificationFlowRemovedOff;
70     private boolean skipTableFeatures;
71     private static final int SPY_RATE = 10;
72
73     private final DataBroker dataBroker;
74     private final ConvertorExecutor convertorExecutor;
75     private TranslatorLibrary translatorLibrary;
76     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
78
79     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
81
82     private final long barrierIntervalNanos;
83     private final int barrierCountLimit;
84
85     private ExtensionConverterProvider extensionConverterProvider;
86     private ScheduledThreadPoolExecutor spyPool;
87     private final ClusterSingletonServiceProvider singletonServiceProvider;
88     private final NotificationPublishService notificationPublishService;
89     private final MessageSpy messageSpy;
90     private final HashedWheelTimer hashedWheelTimer;
91
92     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93                              final long globalNotificationQuota,
94                              final boolean switchFeaturesMandatory,
95                              final long barrierInterval,
96                              final int barrierCountLimit,
97                              final MessageSpy messageSpy,
98                              final boolean isNotificationFlowRemovedOff,
99                              final ClusterSingletonServiceProvider singletonServiceProvider,
100                              final NotificationPublishService notificationPublishService,
101                              final HashedWheelTimer hashedWheelTimer,
102                              final ConvertorExecutor convertorExecutor,
103                              final boolean skipTableFeatures) {
104
105         this.switchFeaturesMandatory = switchFeaturesMandatory;
106         this.globalNotificationQuota = globalNotificationQuota;
107         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
108         this.skipTableFeatures = skipTableFeatures;
109         this.dataBroker = Preconditions.checkNotNull(dataBroker);
110         this.convertorExecutor = convertorExecutor;
111         this.hashedWheelTimer = hashedWheelTimer;
112         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
113         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
114
115         final NodesBuilder nodesBuilder = new NodesBuilder();
116         nodesBuilder.setNode(Collections.<Node>emptyList());
117         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
118         try {
119             tx.submit().get();
120         } catch (ExecutionException | InterruptedException e) {
121             LOG.error("Creation of node failed.", e);
122             throw new IllegalStateException(e);
123         }
124
125         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
126         this.barrierCountLimit = barrierCountLimit;
127
128         spyPool = new ScheduledThreadPoolExecutor(1);
129         this.singletonServiceProvider = singletonServiceProvider;
130         this.notificationPublishService = notificationPublishService;
131         this.messageSpy = messageSpy;
132     }
133
134
135     @Override
136     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
137         this.deviceInitPhaseHandler = handler;
138     }
139
140     @Override
141     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
142         // final phase - we have to add new Device to MD-SAL DataStore
143         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
144         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
145         deviceContext.onPublished();
146         lifecycleService.registerService(this.singletonServiceProvider);
147     }
148
149     @Override
150     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
151         Preconditions.checkArgument(connectionContext != null);
152
153         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
154         /*
155          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
156          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
157          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
158          */
159          if (deviceContexts.containsKey(deviceInfo)) {
160              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
161              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
162                  LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context",
163                          connectionContext.getDeviceInfo().getLOGValue());
164                  deviceContext.replaceConnectionContext(connectionContext);
165                  return ConnectionStatus.ALREADY_CONNECTED;
166              } else {
167                  LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
168                          connectionContext.getDeviceInfo().getLOGValue());
169                  return ConnectionStatus.CLOSING;
170              }
171          }
172
173         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
174                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
175
176         // Add Disconnect handler
177         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
178         // Cache this for clarity
179         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
180
181         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
182         connectionAdapter.setPacketInFiltering(true);
183
184         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
185
186         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
187         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
188                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
189         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
190
191         final DeviceContext deviceContext = new DeviceContextImpl(
192                 connectionContext,
193                 dataBroker,
194                 messageSpy,
195                 translatorLibrary,
196                 this,
197                 convertorExecutor,
198                 skipTableFeatures);
199
200         deviceContexts.putIfAbsent(deviceInfo, deviceContext);
201
202         final LifecycleService lifecycleService = new LifecycleServiceImpl();
203         lifecycleService.setDeviceContext(deviceContext);
204         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
205
206         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
207
208         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
209
210         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
211         deviceContext.setNotificationPublishService(notificationPublishService);
212
213         updatePacketInRateLimiters();
214
215         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
216                 connectionAdapter, deviceContext);
217
218         connectionAdapter.setMessageListener(messageListener);
219         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
220         return ConnectionStatus.MAY_CONTINUE;
221     }
222
223     private void updatePacketInRateLimiters() {
224         synchronized (deviceContexts) {
225             final int deviceContextsSize = deviceContexts.size();
226             if (deviceContextsSize > 0) {
227                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
228                 if (freshNotificationLimit < 100) {
229                     freshNotificationLimit = 100;
230                 }
231                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
232                 for (final DeviceContext deviceContext : deviceContexts.values()) {
233                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
234                 }
235             }
236         }
237     }
238
239     @Override
240     public TranslatorLibrary oook() {
241         return translatorLibrary;
242     }
243
244     @Override
245     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246         this.translatorLibrary = translatorLibrary;
247     }
248
249     @Override
250     public void close() {
251         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252                 iterator.hasNext();) {
253             final DeviceContext deviceCtx = iterator.next();
254             deviceCtx.shutdownConnection();
255             deviceCtx.shuttingDownDataStoreTransactions();
256         }
257
258         if (spyPool != null) {
259             spyPool.shutdownNow();
260             spyPool = null;
261         }
262     }
263
264     @Override
265     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
266
267         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
268         if (LOG.isDebugEnabled()) {
269             LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
270         }
271
272         updatePacketInRateLimiters();
273         if (Objects.nonNull(lifecycleService)) {
274             try {
275                 lifecycleService.close();
276                 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
277             } catch (Exception e) {
278                 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
279             }
280         }
281
282         deviceContexts.remove(deviceInfo);
283         if (LOG.isDebugEnabled()) {
284             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
285         }
286
287     }
288
289     @Override
290     public void initialize() {
291         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
292     }
293
294     @Override
295     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
296         this.extensionConverterProvider = extensionConverterProvider;
297     }
298
299     @Override
300     public ExtensionConverterProvider getExtensionConverterProvider() {
301         return extensionConverterProvider;
302     }
303
304     @Override
305     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
306         this.deviceTerminPhaseHandler = handler;
307     }
308
309     @Override
310     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
311         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
312         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
313         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
314
315         if (null == deviceCtx) {
316             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
317             return;
318         }
319
320         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
321             LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
322             return;
323         }
324
325         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
326
327         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
328             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
329             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
330             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
331         }
332         //TODO: Auxiliary connections supported ?
333         {
334             /* Device is disconnected and so we need to close TxManager */
335             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
336             Futures.addCallback(future, new FutureCallback<Void>() {
337
338                 @Override
339                 public void onSuccess(final Void result) {
340                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
341                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
342                 }
343
344                 @Override
345                 public void onFailure(final Throwable t) {
346                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
347                     LOG.trace("TxChainManager failed by closing. ", t);
348                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
349                 }
350             });
351             /* Add timer for Close TxManager because it could fain ind cluster without notification */
352             final TimerTask timerTask = timeout -> {
353                 if (!future.isDone()) {
354                     LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
355                     future.cancel(false);
356                 }
357             };
358             hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
359         }
360     }
361
362     @VisibleForTesting
363     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
364         deviceContexts.put(deviceInfo, deviceContext);
365     }
366
367     @Override
368     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
369         return (T) deviceContexts.get(deviceInfo);
370     }
371
372     @Override
373     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
374         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
375     }
376
377     @Override
378     public boolean getIsNotificationFlowRemovedOff() {
379         return this.isNotificationFlowRemovedOff;
380     }
381
382
383     @Override
384     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
385         skipTableFeatures = skipTableFeaturesValue;
386     }
387
388 }