Bug 5596 Changes when closing device
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.CheckForNull;
28 import javax.annotation.Nonnull;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
53 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  *
63  */
64 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
65
66     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
67
68     private final long globalNotificationQuota;
69     private final boolean switchFeaturesMandatory;
70     private boolean isNotificationFlowRemovedOff;
71
72     private static final int SPY_RATE = 10;
73
74     private final DataBroker dataBroker;
75     private final ConvertorExecutor convertorExecutor;
76     private TranslatorLibrary translatorLibrary;
77     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
78     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
79
80     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
81     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
82
83     private final long barrierIntervalNanos;
84     private final int barrierCountLimit;
85     private ExtensionConverterProvider extensionConverterProvider;
86     private ScheduledThreadPoolExecutor spyPool;
87     private final ClusterSingletonServiceProvider singletonServiceProvider;
88     private final NotificationPublishService notificationPublishService;
89     private final MessageSpy messageSpy;
90     private final HashedWheelTimer hashedWheelTimer;
91
92     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93                              final long globalNotificationQuota,
94                              final boolean switchFeaturesMandatory,
95                              final long barrierInterval,
96                              final int barrierCountLimit,
97                              
98                              final MessageSpy messageSpy,
99                              final boolean isNotificationFlowRemovedOff,
100                              final ClusterSingletonServiceProvider singletonServiceProvider,
101                              final NotificationPublishService notificationPublishService,
102                              final HashedWheelTimer hashedWheelTimer,
103                              final ConvertorExecutor convertorExecutor) {
104         this.switchFeaturesMandatory = switchFeaturesMandatory;
105         this.globalNotificationQuota = globalNotificationQuota;
106         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
107         this.dataBroker = Preconditions.checkNotNull(dataBroker);
108         this.convertorExecutor = convertorExecutor;
109         this.hashedWheelTimer = hashedWheelTimer;
110         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
111         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
112
113         final NodesBuilder nodesBuilder = new NodesBuilder();
114         nodesBuilder.setNode(Collections.<Node>emptyList());
115         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
116         try {
117             tx.submit().get();
118         } catch (ExecutionException | InterruptedException e) {
119             LOG.error("Creation of node failed.", e);
120             throw new IllegalStateException(e);
121         }
122
123         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
124         this.barrierCountLimit = barrierCountLimit;
125
126         spyPool = new ScheduledThreadPoolExecutor(1);
127         this.singletonServiceProvider = singletonServiceProvider;
128         this.notificationPublishService = notificationPublishService;
129         this.messageSpy = messageSpy;
130     }
131
132
133     @Override
134     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
135         this.deviceInitPhaseHandler = handler;
136     }
137
138     @Override
139     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
140         // final phase - we have to add new Device to MD-SAL DataStore
141         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
142         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
143         deviceContext.onPublished();
144         lifecycleService.registerService(this.singletonServiceProvider);
145     }
146
147     @Override
148     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
149         Preconditions.checkArgument(connectionContext != null);
150
151         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
152         /*
153          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
154          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
155          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
156          */
157          if (deviceContexts.containsKey(deviceInfo)) {
158              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
159              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
160                  LOG.warn("Context state for node {} is not in TERMINATION state, trying to reconnect", connectionContext.getNodeId().getValue());
161              } else {
162                  LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId().getValue());
163              }
164              return false;
165          }
166
167         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
168                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
169
170         // Add Disconnect handler
171         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
172         // Cache this for clarity
173         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
174
175         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
176         connectionAdapter.setPacketInFiltering(true);
177
178         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
179
180         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
181         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
182                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
183         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
184
185         final DeviceContext deviceContext = new DeviceContextImpl(
186                 connectionContext,
187                 dataBroker,
188                 messageSpy,
189                 translatorLibrary,
190                 this,
191                 convertorExecutor);
192
193         deviceContexts.putIfAbsent(deviceInfo, deviceContext);
194
195         final LifecycleService lifecycleService = new LifecycleServiceImpl();
196         lifecycleService.setDeviceContext(deviceContext);
197
198         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
199
200         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
201
202         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
203         deviceContext.setNotificationPublishService(notificationPublishService);
204
205         updatePacketInRateLimiters();
206
207         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
208                 connectionAdapter, deviceContext);
209
210         connectionAdapter.setMessageListener(messageListener);
211         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
212         return true;
213     }
214
215     private void updatePacketInRateLimiters() {
216         synchronized (deviceContexts) {
217             final int deviceContextsSize = deviceContexts.size();
218             if (deviceContextsSize > 0) {
219                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
220                 if (freshNotificationLimit < 100) {
221                     freshNotificationLimit = 100;
222                 }
223                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
224                 for (final DeviceContext deviceContext : deviceContexts.values()) {
225                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
226                 }
227             }
228         }
229     }
230
231     @Override
232     public TranslatorLibrary oook() {
233         return translatorLibrary;
234     }
235
236     @Override
237     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
238         this.translatorLibrary = translatorLibrary;
239     }
240
241     @Override
242     public void close() {
243         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
244                 iterator.hasNext();) {
245             final DeviceContext deviceCtx = iterator.next();
246             deviceCtx.shutdownConnection();
247             deviceCtx.shuttingDownDataStoreTransactions();
248         }
249
250         if (spyPool != null) {
251             spyPool.shutdownNow();
252             spyPool = null;
253         }
254     }
255
256     @Override
257     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
258         deviceContexts.remove(deviceInfo);
259         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
260         updatePacketInRateLimiters();
261         if (Objects.nonNull(lifecycleService)) {
262             try {
263                 lifecycleService.close();
264             } catch (Exception e) {
265                 LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
266             }
267         }
268     }
269
270     @Override
271     public void initialize() {
272         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
273     }
274
275     @Override
276     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
277         this.extensionConverterProvider = extensionConverterProvider;
278     }
279
280     @Override
281     public ExtensionConverterProvider getExtensionConverterProvider() {
282         return extensionConverterProvider;
283     }
284
285     @Override
286     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
287         this.deviceTerminPhaseHandler = handler;
288     }
289
290     @Override
291     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
292         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
293         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
294         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
295
296         if (null == deviceCtx) {
297             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
298             return;
299         }
300
301         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
302             LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
303             return;
304         }
305
306         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
307
308         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
309             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getNodeId().getValue());
310             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
311             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
312         }
313         //TODO: Auxiliary connections supported ?
314         {
315             /* Device is disconnected and so we need to close TxManager */
316             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
317             Futures.addCallback(future, new FutureCallback<Void>() {
318
319                 @Override
320                 public void onSuccess(final Void result) {
321                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId().getValue());
322                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
323                 }
324
325                 @Override
326                 public void onFailure(final Throwable t) {
327                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId().getValue());
328                     LOG.trace("TxChainManager failed by closing. ", t);
329                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
330                 }
331             });
332             /* Add timer for Close TxManager because it could fain ind cluster without notification */
333             final TimerTask timerTask = timeout -> {
334                 if (!future.isDone()) {
335                     LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId().getValue());
336                     future.cancel(false);
337                 }
338             };
339             hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
340         }
341     }
342
343     @VisibleForTesting
344     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
345         deviceContexts.put(deviceInfo, deviceContext);
346     }
347
348     @Override
349     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
350         return (T) deviceContexts.get(deviceInfo);
351     }
352
353     @Override
354     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
355         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
356     }
357
358     @Override
359     public boolean getIsNotificationFlowRemovedOff() {
360         return this.isNotificationFlowRemovedOff;
361     }
362
363 }