Merge "Bug 6554 Fix rejecting connections"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.CheckForNull;
28 import javax.annotation.Nonnull;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
39 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
49 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
50 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
51 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
52 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
53 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  *
63  */
64 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65
66     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67
68     private final long globalNotificationQuota;
69     private final boolean switchFeaturesMandatory;
70     private boolean isNotificationFlowRemovedOff;
71     private boolean skipTableFeatures;
72     private static final int SPY_RATE = 10;
73
74     private final DataBroker dataBroker;
75     private final ConvertorExecutor convertorExecutor;
76     private TranslatorLibrary translatorLibrary;
77     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
78     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79
80     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
81     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82
83     private final long barrierIntervalNanos;
84     private final int barrierCountLimit;
85
86     private ExtensionConverterProvider extensionConverterProvider;
87     private ScheduledThreadPoolExecutor spyPool;
88     private final ClusterSingletonServiceProvider singletonServiceProvider;
89     private final NotificationPublishService notificationPublishService;
90     private final MessageSpy messageSpy;
91     private final HashedWheelTimer hashedWheelTimer;
92
93     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
94                              final long globalNotificationQuota,
95                              final boolean switchFeaturesMandatory,
96                              final long barrierInterval,
97                              final int barrierCountLimit,
98                              final MessageSpy messageSpy,
99                              final boolean isNotificationFlowRemovedOff,
100                              final ClusterSingletonServiceProvider singletonServiceProvider,
101                              final NotificationPublishService notificationPublishService,
102                              final HashedWheelTimer hashedWheelTimer,
103                              final ConvertorExecutor convertorExecutor,
104                              final boolean skipTableFeatures) {
105
106         this.dataBroker = dataBroker;
107
108         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
109         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
110         final NodesBuilder nodesBuilder = new NodesBuilder();
111         nodesBuilder.setNode(Collections.<Node>emptyList());
112         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
113         try {
114             tx.submit().get();
115         } catch (ExecutionException | InterruptedException e) {
116             LOG.error("Creation of node failed.", e);
117             throw new IllegalStateException(e);
118         }
119
120         this.switchFeaturesMandatory = switchFeaturesMandatory;
121         this.globalNotificationQuota = globalNotificationQuota;
122         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
123         this.skipTableFeatures = skipTableFeatures;
124         this.convertorExecutor = convertorExecutor;
125         this.hashedWheelTimer = hashedWheelTimer;
126         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
127         this.barrierCountLimit = barrierCountLimit;
128         this.spyPool = new ScheduledThreadPoolExecutor(1);
129         this.singletonServiceProvider = singletonServiceProvider;
130         this.notificationPublishService = notificationPublishService;
131         this.messageSpy = messageSpy;
132     }
133
134
135     @Override
136     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
137         this.deviceInitPhaseHandler = handler;
138     }
139
140     @Override
141     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
142         // final phase - we have to add new Device to MD-SAL DataStore
143         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
144         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
145         deviceContext.onPublished();
146         lifecycleService.registerService(this.singletonServiceProvider);
147     }
148
149     @Override
150     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
151         Preconditions.checkArgument(connectionContext != null);
152
153         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
154         /*
155          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
156          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
157          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
158          */
159          if (deviceContexts.containsKey(deviceInfo)) {
160              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
161              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
162              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
163                  LOG.warn("Node {} context state not in TERMINATION state.",
164                          connectionContext.getDeviceInfo().getLOGValue());
165                  return ConnectionStatus.ALREADY_CONNECTED;
166              } else {
167                  return ConnectionStatus.CLOSING;
168              }
169          }
170
171         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
172                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
173
174         // Add Disconnect handler
175         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
176         // Cache this for clarity
177         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
178
179         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
180         connectionAdapter.setPacketInFiltering(true);
181
182         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
183
184         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
185         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
186                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
187         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
188
189         final DeviceContext deviceContext = new DeviceContextImpl(
190                 connectionContext,
191                 dataBroker,
192                 messageSpy,
193                 translatorLibrary,
194                 this,
195                 convertorExecutor,
196                 skipTableFeatures);
197
198         deviceContexts.put(deviceInfo, deviceContext);
199
200         final LifecycleService lifecycleService = new LifecycleServiceImpl();
201         lifecycleService.setDeviceContext(deviceContext);
202         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
203
204         lifecycleServices.put(deviceInfo, lifecycleService);
205
206         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
207
208         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
209         deviceContext.setNotificationPublishService(notificationPublishService);
210
211         updatePacketInRateLimiters();
212
213         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
214                 connectionAdapter, deviceContext);
215
216         connectionAdapter.setMessageListener(messageListener);
217         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
218         return ConnectionStatus.MAY_CONTINUE;
219     }
220
221     private void updatePacketInRateLimiters() {
222         synchronized (deviceContexts) {
223             final int deviceContextsSize = deviceContexts.size();
224             if (deviceContextsSize > 0) {
225                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
226                 if (freshNotificationLimit < 100) {
227                     freshNotificationLimit = 100;
228                 }
229                 if (LOG.isDebugEnabled()) {
230                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
231                 }
232                 for (final DeviceContext deviceContext : deviceContexts.values()) {
233                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
234                 }
235             }
236         }
237     }
238
239     @Override
240     public TranslatorLibrary oook() {
241         return translatorLibrary;
242     }
243
244     @Override
245     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246         this.translatorLibrary = translatorLibrary;
247     }
248
249     @Override
250     public void close() {
251         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252                 iterator.hasNext();) {
253             final DeviceContext deviceCtx = iterator.next();
254             deviceCtx.shutdownConnection();
255             deviceCtx.shuttingDownDataStoreTransactions();
256         }
257
258         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
259         spyPool = null;
260
261     }
262
263     @Override
264     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
265
266         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
267         if (LOG.isDebugEnabled()) {
268             LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
269         }
270
271         updatePacketInRateLimiters();
272         if (Objects.nonNull(lifecycleService)) {
273             try {
274                 lifecycleService.close();
275                 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
276             } catch (Exception e) {
277                 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
278             }
279         }
280
281         deviceContexts.remove(deviceInfo);
282         if (LOG.isDebugEnabled()) {
283             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
284         }
285
286     }
287
288     @Override
289     public void initialize() {
290         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
291     }
292
293     @Override
294     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
295         this.extensionConverterProvider = extensionConverterProvider;
296     }
297
298     @Override
299     public ExtensionConverterProvider getExtensionConverterProvider() {
300         return extensionConverterProvider;
301     }
302
303     @Override
304     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
305         this.deviceTerminPhaseHandler = handler;
306     }
307
308     @Override
309     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
310         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
311         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
312         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
313
314         if (null == deviceCtx) {
315             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
316             return;
317         }
318
319         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
320             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
321             return;
322         }
323
324         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
325
326         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
327             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
328             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
329             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
330         }
331         //TODO: Auxiliary connections supported ?
332             /* Device is disconnected and so we need to close TxManager */
333         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
334         Futures.addCallback(future, new FutureCallback<Void>() {
335
336             @Override
337             public void onSuccess(final Void result) {
338                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
339                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
340             }
341
342             @Override
343             public void onFailure(final Throwable t) {
344                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
345                 LOG.trace("TxChainManager failed by closing. ", t);
346                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
347             }
348         });
349         /* Add timer for Close TxManager because it could fain ind cluster without notification */
350         final TimerTask timerTask = timeout -> {
351             if (!future.isDone()) {
352                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
353                 future.cancel(false);
354             }
355         };
356         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
357     }
358
359     @VisibleForTesting
360     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
361         deviceContexts.put(deviceInfo, deviceContext);
362     }
363
364     @Override
365     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
366         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
367     }
368
369     @Override
370     public boolean getIsNotificationFlowRemovedOff() {
371         return this.isNotificationFlowRemovedOff;
372     }
373
374
375     @Override
376     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
377         skipTableFeatures = skipTableFeaturesValue;
378     }
379
380 }