Bug 6110: Fixed bugs in statistics manager due to race condition.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
55 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
56 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
57 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  *
69  */
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
71
72     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
73
74     private final long globalNotificationQuota;
75     private final boolean switchFeaturesMandatory;
76     private boolean isFlowRemovedNotificationOn;
77     private boolean skipTableFeatures;
78     private static final int SPY_RATE = 10;
79
80     private final DataBroker dataBroker;
81     private final ConvertorExecutor convertorExecutor;
82     private TranslatorLibrary translatorLibrary;
83     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
85
86     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
88
89     private long barrierIntervalNanos;
90     private int barrierCountLimit;
91
92     private ExtensionConverterProvider extensionConverterProvider;
93     private ScheduledThreadPoolExecutor spyPool;
94     private final ClusterSingletonServiceProvider singletonServiceProvider;
95     private final NotificationPublishService notificationPublishService;
96     private final MessageSpy messageSpy;
97     private final HashedWheelTimer hashedWheelTimer;
98
99     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
100                              final long globalNotificationQuota,
101                              final boolean switchFeaturesMandatory,
102                              final long barrierInterval,
103                              final int barrierCountLimit,
104                              final MessageSpy messageSpy,
105                              final boolean isFlowRemovedNotificationOn,
106                              final ClusterSingletonServiceProvider singletonServiceProvider,
107                              final NotificationPublishService notificationPublishService,
108                              final HashedWheelTimer hashedWheelTimer,
109                              final ConvertorExecutor convertorExecutor,
110                              final boolean skipTableFeatures) {
111
112         this.dataBroker = dataBroker;
113
114         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
115         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116         final NodesBuilder nodesBuilder = new NodesBuilder();
117         nodesBuilder.setNode(Collections.<Node>emptyList());
118         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
119         try {
120             tx.submit().get();
121         } catch (ExecutionException | InterruptedException e) {
122             LOG.error("Creation of node failed.", e);
123             throw new IllegalStateException(e);
124         }
125
126         this.switchFeaturesMandatory = switchFeaturesMandatory;
127         this.globalNotificationQuota = globalNotificationQuota;
128         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
129         this.skipTableFeatures = skipTableFeatures;
130         this.convertorExecutor = convertorExecutor;
131         this.hashedWheelTimer = hashedWheelTimer;
132         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
133         this.barrierCountLimit = barrierCountLimit;
134         this.spyPool = new ScheduledThreadPoolExecutor(1);
135         this.singletonServiceProvider = singletonServiceProvider;
136         this.notificationPublishService = notificationPublishService;
137         this.messageSpy = messageSpy;
138     }
139
140
141     @Override
142     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
143         this.deviceInitPhaseHandler = handler;
144     }
145
146     @Override
147     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
148         // final phase - we have to add new Device to MD-SAL DataStore
149         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
150         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
151         deviceContext.onPublished();
152         lifecycleService.registerDeviceRemovedHandler(this);
153         lifecycleService.registerService(this.singletonServiceProvider);
154     }
155
156     @Override
157     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
158         Preconditions.checkArgument(connectionContext != null);
159         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
160
161         /*
162          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
163          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
164          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
165          */
166          if (deviceContexts.containsKey(deviceInfo)) {
167              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
168              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
169              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
170                  LOG.warn("Node {} context state not in TERMINATION state.",
171                          connectionContext.getDeviceInfo().getLOGValue());
172                  return ConnectionStatus.ALREADY_CONNECTED;
173              } else {
174                  return ConnectionStatus.CLOSING;
175              }
176          }
177
178         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
179                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
180
181         // Add Disconnect handler
182         connectionContext.setDeviceDisconnectedHandler(this);
183
184         // Cache this for clarity
185         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
186
187         // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
188         connectionAdapter.setPacketInFiltering(true);
189
190         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
191
192         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
193         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
194                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
195         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
196
197         final LifecycleService lifecycleService = new LifecycleServiceImpl();
198
199         final DeviceContext deviceContext = new DeviceContextImpl(
200                 connectionContext,
201                 dataBroker,
202                 messageSpy,
203                 translatorLibrary,
204                 this,
205                 convertorExecutor,
206                 skipTableFeatures,
207                 hashedWheelTimer,
208                 this);
209
210         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
211         deviceContexts.put(deviceInfo, deviceContext);
212
213         lifecycleService.setDeviceContext(deviceContext);
214         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
215
216         lifecycleServices.put(deviceInfo, lifecycleService);
217
218         addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
219
220         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
221
222         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
223         deviceContext.setNotificationPublishService(notificationPublishService);
224
225         updatePacketInRateLimiters();
226
227         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
228                 connectionAdapter, deviceContext);
229
230         connectionAdapter.setMessageListener(messageListener);
231         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
232         return ConnectionStatus.MAY_CONTINUE;
233     }
234
235     @Override
236     public TranslatorLibrary oook() {
237         return translatorLibrary;
238     }
239
240     @Override
241     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
242         this.translatorLibrary = translatorLibrary;
243     }
244
245     @Override
246     public void close() {
247         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
248                 iterator.hasNext();) {
249             final DeviceContext deviceCtx = iterator.next();
250             deviceCtx.shutdownConnection();
251             deviceCtx.shuttingDownDataStoreTransactions();
252         }
253
254         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
255         spyPool = null;
256
257     }
258
259     @Override
260     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
261         updatePacketInRateLimiters();
262         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
263     }
264
265     @Override
266     public void initialize() {
267         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
268     }
269
270     @Override
271     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
272         this.extensionConverterProvider = extensionConverterProvider;
273     }
274
275     @Override
276     public ExtensionConverterProvider getExtensionConverterProvider() {
277         return extensionConverterProvider;
278     }
279
280     @Override
281     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
282         this.deviceTerminPhaseHandler = handler;
283     }
284
285     @Override
286     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
287         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
288         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
289         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
290
291         if (Objects.isNull(deviceCtx)) {
292             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
293             return;
294         }
295
296         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
297             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
298             return;
299         }
300
301         deviceCtx.close();
302
303         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
304             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
305             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
306             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
307         }
308
309         // TODO: Auxiliary connections supported ?
310         // Device is disconnected and so we need to close TxManager
311         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
312         Futures.addCallback(future, new FutureCallback<Void>() {
313             @Override
314             public void onSuccess(final Void result) {
315                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
316                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
317             }
318
319             @Override
320             public void onFailure(final Throwable t) {
321                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
322                 LOG.trace("TxChainManager failed by closing. ", t);
323                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
324             }
325         });
326
327         // Add timer for Close TxManager because it could fail in cluster without notification
328         final TimerTask timerTask = timeout -> {
329             if (!future.isDone()) {
330                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
331                 future.cancel(false);
332             }
333         };
334
335         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
336     }
337
338     @VisibleForTesting
339     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
340         deviceContexts.put(deviceInfo, deviceContext);
341     }
342
343     @Override
344     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
345         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
346     }
347
348     @Override
349     public boolean isFlowRemovedNotificationOn() {
350         return this.isFlowRemovedNotificationOn;
351     }
352
353
354     @Override
355     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
356         skipTableFeatures = skipTableFeaturesValue;
357     }
358
359     @Override
360     public void setBarrierCountLimit(final int barrierCountLimit) {
361         this.barrierCountLimit = barrierCountLimit;
362     }
363
364     @Override
365     public void setBarrierInterval(final long barrierTimeoutLimit) {
366         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
367     }
368
369     @Override
370     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
371         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
372         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
373         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
374
375         Futures.addCallback(delFuture, new FutureCallback<Void>() {
376             @Override
377             public void onSuccess(final Void result) {
378                 if (LOG.isDebugEnabled()) {
379                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
380                 }
381             }
382
383             @Override
384             public void onFailure(@Nonnull final Throwable t) {
385                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
386             }
387         });
388
389         return delFuture;
390     }
391
392
393     private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
394         Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
395             @Override
396             public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
397                 if (LOG.isDebugEnabled()) {
398                     LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
399                 }
400             }
401
402             @Override
403             public void onFailure(Throwable throwable) {
404                 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
405                 lifecycleService.closeConnection();
406             }
407         });
408     }
409
410     private void updatePacketInRateLimiters() {
411         synchronized (deviceContexts) {
412             final int deviceContextsSize = deviceContexts.size();
413             if (deviceContextsSize > 0) {
414                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
415                 if (freshNotificationLimit < 100) {
416                     freshNotificationLimit = 100;
417                 }
418                 if (LOG.isDebugEnabled()) {
419                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
420                 }
421                 for (final DeviceContext deviceContext : deviceContexts.values()) {
422                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
423                 }
424             }
425         }
426     }
427
428     public void onDeviceRemoved(DeviceInfo deviceInfo) {
429         deviceContexts.remove(deviceInfo);
430         LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
431
432         lifecycleServices.remove(deviceInfo);
433         LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
434     }
435 }