Merge "Bug 6380 lldp-speaker - DTCL instead of DTL"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  *
62  */
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
64
65     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
66
67     private final long globalNotificationQuota;
68     private final boolean switchFeaturesMandatory;
69     private boolean isNotificationFlowRemovedOff;
70
71     private static final int SPY_RATE = 10;
72
73     private final DataBroker dataBroker;
74     private final ConvertorExecutor convertorExecutor;
75     private TranslatorLibrary translatorLibrary;
76     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
78
79     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
81
82     private final long barrierIntervalNanos;
83     private final int barrierCountLimit;
84     private ExtensionConverterProvider extensionConverterProvider;
85     private ScheduledThreadPoolExecutor spyPool;
86     private final ClusterSingletonServiceProvider singletonServiceProvider;
87     private final NotificationPublishService notificationPublishService;
88     private final MessageSpy messageSpy;
89     private final HashedWheelTimer hashedWheelTimer;
90
91     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
92                              final long globalNotificationQuota,
93                              final boolean switchFeaturesMandatory,
94                              final long barrierInterval,
95                              final int barrierCountLimit,
96                              
97                              final MessageSpy messageSpy,
98                              final boolean isNotificationFlowRemovedOff,
99                              final ClusterSingletonServiceProvider singletonServiceProvider,
100                              final NotificationPublishService notificationPublishService,
101                              final HashedWheelTimer hashedWheelTimer,
102                              final ConvertorExecutor convertorExecutor) {
103         this.switchFeaturesMandatory = switchFeaturesMandatory;
104         this.globalNotificationQuota = globalNotificationQuota;
105         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
106         this.dataBroker = Preconditions.checkNotNull(dataBroker);
107         this.convertorExecutor = convertorExecutor;
108         this.hashedWheelTimer = hashedWheelTimer;
109         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
110         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
111
112         final NodesBuilder nodesBuilder = new NodesBuilder();
113         nodesBuilder.setNode(Collections.<Node>emptyList());
114         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
115         try {
116             tx.submit().get();
117         } catch (ExecutionException | InterruptedException e) {
118             LOG.error("Creation of node failed.", e);
119             throw new IllegalStateException(e);
120         }
121
122         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
123         this.barrierCountLimit = barrierCountLimit;
124
125         spyPool = new ScheduledThreadPoolExecutor(1);
126         this.singletonServiceProvider = singletonServiceProvider;
127         this.notificationPublishService = notificationPublishService;
128         this.messageSpy = messageSpy;
129     }
130
131
132     @Override
133     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
134         this.deviceInitPhaseHandler = handler;
135     }
136
137     @Override
138     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
139         // final phase - we have to add new Device to MD-SAL DataStore
140         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
141         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
142         deviceContext.onPublished();
143         lifecycleService.registerService(this.singletonServiceProvider);
144     }
145
146     @Override
147     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
148         Preconditions.checkArgument(connectionContext != null);
149
150         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
151         /*
152          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
153          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
154          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
155          */
156          if (deviceContexts.containsKey(deviceInfo)) {
157              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
158              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
159                  LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context",
160                          connectionContext.getDeviceInfo().getLOGValue());
161                  deviceContext.replaceConnectionContext(connectionContext);
162                  return ConnectionStatus.ALREADY_CONNECTED;
163              } else {
164                  LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
165                          connectionContext.getDeviceInfo().getLOGValue());
166                  return ConnectionStatus.CLOSING;
167              }
168          }
169
170         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
171                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
172
173         // Add Disconnect handler
174         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
175         // Cache this for clarity
176         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
177
178         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
179         connectionAdapter.setPacketInFiltering(true);
180
181         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
182
183         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
184         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
185                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
186         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
187
188         final DeviceContext deviceContext = new DeviceContextImpl(
189                 connectionContext,
190                 dataBroker,
191                 messageSpy,
192                 translatorLibrary,
193                 this,
194                 convertorExecutor);
195
196         deviceContexts.putIfAbsent(deviceInfo, deviceContext);
197
198         final LifecycleService lifecycleService = new LifecycleServiceImpl();
199         lifecycleService.setDeviceContext(deviceContext);
200         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
201
202         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
203
204         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
205
206         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
207         deviceContext.setNotificationPublishService(notificationPublishService);
208
209         updatePacketInRateLimiters();
210
211         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
212                 connectionAdapter, deviceContext);
213
214         connectionAdapter.setMessageListener(messageListener);
215         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
216         return ConnectionStatus.MAY_CONTINUE;
217     }
218
219     private void updatePacketInRateLimiters() {
220         synchronized (deviceContexts) {
221             final int deviceContextsSize = deviceContexts.size();
222             if (deviceContextsSize > 0) {
223                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
224                 if (freshNotificationLimit < 100) {
225                     freshNotificationLimit = 100;
226                 }
227                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
228                 for (final DeviceContext deviceContext : deviceContexts.values()) {
229                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
230                 }
231             }
232         }
233     }
234
235     @Override
236     public TranslatorLibrary oook() {
237         return translatorLibrary;
238     }
239
240     @Override
241     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
242         this.translatorLibrary = translatorLibrary;
243     }
244
245     @Override
246     public void close() {
247         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
248                 iterator.hasNext();) {
249             final DeviceContext deviceCtx = iterator.next();
250             deviceCtx.shutdownConnection();
251             deviceCtx.shuttingDownDataStoreTransactions();
252         }
253
254         if (spyPool != null) {
255             spyPool.shutdownNow();
256             spyPool = null;
257         }
258     }
259
260     @Override
261     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
262
263         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
264         if (LOG.isDebugEnabled()) {
265             LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
266         }
267
268         updatePacketInRateLimiters();
269         if (Objects.nonNull(lifecycleService)) {
270             try {
271                 lifecycleService.close();
272                 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
273             } catch (Exception e) {
274                 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
275             }
276         }
277
278         deviceContexts.remove(deviceInfo);
279         if (LOG.isDebugEnabled()) {
280             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
281         }
282
283     }
284
285     @Override
286     public void initialize() {
287         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
288     }
289
290     @Override
291     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
292         this.extensionConverterProvider = extensionConverterProvider;
293     }
294
295     @Override
296     public ExtensionConverterProvider getExtensionConverterProvider() {
297         return extensionConverterProvider;
298     }
299
300     @Override
301     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
302         this.deviceTerminPhaseHandler = handler;
303     }
304
305     @Override
306     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
307         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
308         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
309         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
310
311         if (null == deviceCtx) {
312             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
313             return;
314         }
315
316         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
317             LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
318             return;
319         }
320
321         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
322
323         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
324             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
325             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
326             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
327         }
328         //TODO: Auxiliary connections supported ?
329         {
330             /* Device is disconnected and so we need to close TxManager */
331             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
332             Futures.addCallback(future, new FutureCallback<Void>() {
333
334                 @Override
335                 public void onSuccess(final Void result) {
336                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
337                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
338                 }
339
340                 @Override
341                 public void onFailure(final Throwable t) {
342                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
343                     LOG.trace("TxChainManager failed by closing. ", t);
344                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
345                 }
346             });
347             /* Add timer for Close TxManager because it could fain ind cluster without notification */
348             final TimerTask timerTask = timeout -> {
349                 if (!future.isDone()) {
350                     LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
351                     future.cancel(false);
352                 }
353             };
354             hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
355         }
356     }
357
358     @VisibleForTesting
359     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
360         deviceContexts.put(deviceInfo, deviceContext);
361     }
362
363     @Override
364     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
365         return (T) deviceContexts.get(deviceInfo);
366     }
367
368     @Override
369     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
370         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
371     }
372
373     @Override
374     public boolean getIsNotificationFlowRemovedOff() {
375         return this.isNotificationFlowRemovedOff;
376     }
377
378 }