Merge "Bug 5596 Cleaning lifecycle conductor"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
41 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
45 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
46 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
47 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
48 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
49 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
50 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
51 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  *
62  */
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
64
65     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
66
67     private final long globalNotificationQuota;
68     private final boolean switchFeaturesMandatory;
69     private boolean isNotificationFlowRemovedOff;
70
71     private static final int SPY_RATE = 10;
72
73     private final DataBroker dataBroker;
74     private final ConvertorExecutor convertorExecutor;
75     private TranslatorLibrary translatorLibrary;
76     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
78
79     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
81
82     private final long barrierIntervalNanos;
83     private final int barrierCountLimit;
84     private ExtensionConverterProvider extensionConverterProvider;
85     private ScheduledThreadPoolExecutor spyPool;
86     private final ClusterSingletonServiceProvider singletonServiceProvider;
87     private final NotificationPublishService notificationPublishService;
88     private final MessageSpy messageSpy;
89     private final HashedWheelTimer hashedWheelTimer;
90
91     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
92                              final long globalNotificationQuota,
93                              final boolean switchFeaturesMandatory,
94                              final long barrierInterval,
95                              final int barrierCountLimit,
96                              
97                              final MessageSpy messageSpy,
98                              final boolean isNotificationFlowRemovedOff,
99                              final ClusterSingletonServiceProvider singletonServiceProvider,
100                              final NotificationPublishService notificationPublishService,
101                              final HashedWheelTimer hashedWheelTimer,
102                              final ConvertorExecutor convertorExecutor) {
103         this.switchFeaturesMandatory = switchFeaturesMandatory;
104         this.globalNotificationQuota = globalNotificationQuota;
105         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
106         this.dataBroker = Preconditions.checkNotNull(dataBroker);
107         this.convertorExecutor = convertorExecutor;
108         this.hashedWheelTimer = hashedWheelTimer;
109         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
110         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
111
112         final NodesBuilder nodesBuilder = new NodesBuilder();
113         nodesBuilder.setNode(Collections.<Node>emptyList());
114         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
115         try {
116             tx.submit().get();
117         } catch (ExecutionException | InterruptedException e) {
118             LOG.error("Creation of node failed.", e);
119             throw new IllegalStateException(e);
120         }
121
122         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
123         this.barrierCountLimit = barrierCountLimit;
124
125         spyPool = new ScheduledThreadPoolExecutor(1);
126         this.singletonServiceProvider = singletonServiceProvider;
127         this.notificationPublishService = notificationPublishService;
128         this.messageSpy = messageSpy;
129     }
130
131
132     @Override
133     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
134         this.deviceInitPhaseHandler = handler;
135     }
136
137     @Override
138     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
139         // final phase - we have to add new Device to MD-SAL DataStore
140         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
141         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
142         deviceContext.onPublished();
143         lifecycleService.registerService(this.singletonServiceProvider);
144     }
145
146     @Override
147     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
148         Preconditions.checkArgument(connectionContext != null);
149
150         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
151         /*
152          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
153          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
154          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
155          */
156          if (deviceContexts.containsKey(deviceInfo)) {
157             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
158              return false;
159          }
160
161         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
162                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
163
164         // Add Disconnect handler
165         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
166         // Cache this for clarity
167         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
168
169         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
170         connectionAdapter.setPacketInFiltering(true);
171
172         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
173
174         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
175         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
176                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
177         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
178
179         final DeviceContext deviceContext = new DeviceContextImpl(
180                 connectionContext,
181                 dataBroker,
182                 messageSpy,
183                 translatorLibrary,
184                 this,
185                 convertorExecutor);
186
187         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
188
189         final LifecycleService lifecycleService = new LifecycleServiceImpl();
190         lifecycleService.setDeviceContext(deviceContext);
191
192         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
193
194         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
195
196         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
197         deviceContext.setNotificationPublishService(notificationPublishService);
198
199         updatePacketInRateLimiters();
200
201         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
202                 connectionAdapter, deviceContext);
203
204         connectionAdapter.setMessageListener(messageListener);
205         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
206         return true;
207     }
208
209     private void updatePacketInRateLimiters() {
210         synchronized (deviceContexts) {
211             final int deviceContextsSize = deviceContexts.size();
212             if (deviceContextsSize > 0) {
213                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
214                 if (freshNotificationLimit < 100) {
215                     freshNotificationLimit = 100;
216                 }
217                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
218                 for (final DeviceContext deviceContext : deviceContexts.values()) {
219                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
220                 }
221             }
222         }
223     }
224
225     @Override
226     public TranslatorLibrary oook() {
227         return translatorLibrary;
228     }
229
230     @Override
231     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
232         this.translatorLibrary = translatorLibrary;
233     }
234
235     @Override
236     public void close() {
237         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
238                 iterator.hasNext();) {
239             final DeviceContext deviceCtx = iterator.next();
240             deviceCtx.shutdownConnection();
241             deviceCtx.shuttingDownDataStoreTransactions();
242         }
243
244         if (spyPool != null) {
245             spyPool.shutdownNow();
246             spyPool = null;
247         }
248     }
249
250     @Override
251     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
252         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
253         deviceContexts.remove(deviceInfo);
254         updatePacketInRateLimiters();
255         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
256         try {
257             lifecycleService.close();
258         } catch (Exception e) {
259             LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
260         }
261     }
262
263     @Override
264     public void initialize() {
265         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
266     }
267
268     @Override
269     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
270         this.extensionConverterProvider = extensionConverterProvider;
271     }
272
273     @Override
274     public ExtensionConverterProvider getExtensionConverterProvider() {
275         return extensionConverterProvider;
276     }
277
278     @Override
279     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
280         this.deviceTerminPhaseHandler = handler;
281     }
282
283     @Override
284     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
285         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
286         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
287         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
288
289         if (null == deviceCtx) {
290             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
291             return;
292         }
293
294         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
295             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
296             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
297         } else {
298             /* Device is disconnected and so we need to close TxManager */
299             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
300             Futures.addCallback(future, new FutureCallback<Void>() {
301
302                 @Override
303                 public void onSuccess(final Void result) {
304                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
305                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
306                 }
307
308                 @Override
309                 public void onFailure(final Throwable t) {
310                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
311                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
312                 }
313             });
314             /* Add timer for Close TxManager because it could fain ind cluster without notification */
315             final TimerTask timerTask = timeout -> {
316                 if (!future.isDone()) {
317                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
318                     future.cancel(false);
319                 }
320             };
321             hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
322         }
323     }
324
325     @VisibleForTesting
326     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
327         deviceContexts.put(deviceInfo, deviceContext);
328     }
329
330     @VisibleForTesting
331     void removeDeviceContextFromMap(final DeviceInfo deviceInfo){
332         deviceContexts.remove(deviceInfo);
333     }
334
335     @Override
336     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
337         return (T) deviceContexts.get(deviceInfo);
338     }
339
340     @Override
341     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
342         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
343     }
344
345     @Override
346     public boolean getIsNotificationFlowRemovedOff() {
347         return this.isNotificationFlowRemovedOff;
348     }
349
350 }