Merge "Bug 5596 Cleaning part 1"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ScheduledThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
31 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
32 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
33 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
39 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
40 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
44 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
45 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
46 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
47 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
48 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
49 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
50 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  *
60  */
61 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
62
63     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
64
65     private final long globalNotificationQuota;
66     private final boolean switchFeaturesMandatory;
67     private boolean isNotificationFlowRemovedOff;
68
69     private static final int SPY_RATE = 10;
70
71     private final DataBroker dataBroker;
72     private final ConvertorExecutor convertorExecutor;
73     private TranslatorLibrary translatorLibrary;
74     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
75     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
76
77     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
78     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
79
80     private final long barrierIntervalNanos;
81     private final int barrierCountLimit;
82     private ExtensionConverterProvider extensionConverterProvider;
83     private ScheduledThreadPoolExecutor spyPool;
84     private final ClusterSingletonServiceProvider singletonServiceProvider;
85
86     private final LifecycleConductor conductor;
87
88     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
89                              final long globalNotificationQuota,
90                              final boolean switchFeaturesMandatory,
91                              final long barrierInterval,
92                              final int barrierCountLimit,
93                              final LifecycleConductor lifecycleConductor,
94                              boolean isNotificationFlowRemovedOff,
95                              final ConvertorExecutor convertorExecutor,
96                              final ClusterSingletonServiceProvider singletonServiceProvider) {
97         this.switchFeaturesMandatory = switchFeaturesMandatory;
98         this.globalNotificationQuota = globalNotificationQuota;
99         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
100         this.dataBroker = Preconditions.checkNotNull(dataBroker);
101         this.convertorExecutor = convertorExecutor;
102         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
103         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
104
105         final NodesBuilder nodesBuilder = new NodesBuilder();
106         nodesBuilder.setNode(Collections.<Node>emptyList());
107         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
108         try {
109             tx.submit().get();
110         } catch (ExecutionException | InterruptedException e) {
111             LOG.error("Creation of node failed.", e);
112             throw new IllegalStateException(e);
113         }
114
115         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
116         this.barrierCountLimit = barrierCountLimit;
117
118         this.conductor = lifecycleConductor;
119         spyPool = new ScheduledThreadPoolExecutor(1);
120         this.singletonServiceProvider = singletonServiceProvider;
121     }
122
123
124     @Override
125     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
126         this.deviceInitPhaseHandler = handler;
127     }
128
129     @Override
130     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
131         // final phase - we have to add new Device to MD-SAL DataStore
132         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
133         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
134         deviceContext.onPublished();
135         lifecycleService.registerService(this.singletonServiceProvider);
136     }
137
138     @Override
139     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
140         Preconditions.checkArgument(connectionContext != null);
141
142         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
143         /*
144          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
145          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
146          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
147          */
148          if (deviceContexts.containsKey(deviceInfo)) {
149             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
150              return false;
151          }
152
153         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
154                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
155
156         // Add Disconnect handler
157         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
158         // Cache this for clarity
159         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
160
161         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
162         connectionAdapter.setPacketInFiltering(true);
163
164         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
165
166         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
167         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
168                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
169         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
170
171         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
172                 dataBroker,
173                 conductor,
174                 outboundQueueProvider,
175                 translatorLibrary,
176                 this,
177                 convertorExecutor);
178
179         final LifecycleService lifecycleService = new LifecycleServiceImpl();
180         lifecycleService.setDeviceContext(deviceContext);
181
182         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
183         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
184
185         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
186
187         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
188         deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
189
190         updatePacketInRateLimiters();
191
192         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
193                 connectionAdapter, deviceContext);
194
195         connectionAdapter.setMessageListener(messageListener);
196         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
197         return true;
198     }
199
200     private void updatePacketInRateLimiters() {
201         synchronized (deviceContexts) {
202             final int deviceContextsSize = deviceContexts.size();
203             if (deviceContextsSize > 0) {
204                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
205                 if (freshNotificationLimit < 100) {
206                     freshNotificationLimit = 100;
207                 }
208                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
209                 for (final DeviceContext deviceContext : deviceContexts.values()) {
210                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
211                 }
212             }
213         }
214     }
215
216     @Override
217     public TranslatorLibrary oook() {
218         return translatorLibrary;
219     }
220
221     @Override
222     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
223         this.translatorLibrary = translatorLibrary;
224     }
225
226     @Override
227     public void close() {
228         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
229                 iterator.hasNext();) {
230             final DeviceContext deviceCtx = iterator.next();
231             deviceCtx.shutdownConnection();
232             deviceCtx.shuttingDownDataStoreTransactions();
233         }
234
235         if (spyPool != null) {
236             spyPool.shutdownNow();
237             spyPool = null;
238         }
239     }
240
241     @Override
242     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
243         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
244         deviceContexts.remove(deviceInfo);
245         updatePacketInRateLimiters();
246         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
247         try {
248             lifecycleService.close();
249         } catch (Exception e) {
250             LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
251         }
252     }
253
254     @Override
255     public void initialize() {
256         spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
257     }
258
259     @Override
260     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
261         this.extensionConverterProvider = extensionConverterProvider;
262     }
263
264     @Override
265     public ExtensionConverterProvider getExtensionConverterProvider() {
266         return extensionConverterProvider;
267     }
268
269     @Override
270     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
271         this.deviceTerminPhaseHandler = handler;
272     }
273
274     @Override
275     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
276         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
277         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
278         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
279
280         if (null == deviceCtx) {
281             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
282             return;
283         }
284
285         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
286             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
287             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
288         } else {
289             /* Device is disconnected and so we need to close TxManager */
290             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
291             Futures.addCallback(future, new FutureCallback<Void>() {
292
293                 @Override
294                 public void onSuccess(final Void result) {
295                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
296                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
297                 }
298
299                 @Override
300                 public void onFailure(final Throwable t) {
301                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
302                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
303                 }
304             });
305             /* Add timer for Close TxManager because it could fain ind cluster without notification */
306             final TimerTask timerTask = timeout -> {
307                 if (!future.isDone()) {
308                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
309                     future.cancel(false);
310                 }
311             };
312             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
313         }
314     }
315
316     @VisibleForTesting
317     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
318         deviceContexts.put(deviceInfo, deviceContext);
319     }
320
321     @Override
322     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
323         return (T) deviceContexts.get(deviceInfo);
324     }
325
326     @Override
327     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
328         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
329     }
330
331     @Override
332     public boolean getIsNotificationFlowRemovedOff() {
333         return this.isNotificationFlowRemovedOff;
334     }
335
336 }