1c4428aca6dc51dfced1faaf71d703cad7323d12
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
40 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
48 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
49 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
50 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
51 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
52 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
55 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  *
65  */
66 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
67
68     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
69
70     private final long globalNotificationQuota;
71     private final boolean switchFeaturesMandatory;
72     private boolean isFlowRemovedNotificationOn;
73     private boolean skipTableFeatures;
74     private static final int SPY_RATE = 10;
75
76     private final DataBroker dataBroker;
77     private final DeviceInitializerProvider deviceInitializerProvider;
78     private final ConvertorExecutor convertorExecutor;
79     private TranslatorLibrary translatorLibrary;
80     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
81     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
82
83     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
84     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
85
86     private long barrierIntervalNanos;
87     private int barrierCountLimit;
88
89     private ExtensionConverterProvider extensionConverterProvider;
90     private ScheduledThreadPoolExecutor spyPool;
91     private final NotificationPublishService notificationPublishService;
92     private final MessageSpy messageSpy;
93     private final HashedWheelTimer hashedWheelTimer;
94     private boolean useSingleLayerSerialization;
95
96     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
97                              final long globalNotificationQuota,
98                              final boolean switchFeaturesMandatory,
99                              final long barrierInterval,
100                              final int barrierCountLimit,
101                              final MessageSpy messageSpy,
102                              final boolean isFlowRemovedNotificationOn,
103                              final ClusterSingletonServiceProvider singletonServiceProvider,
104                              final NotificationPublishService notificationPublishService,
105                              final HashedWheelTimer hashedWheelTimer,
106                              final ConvertorExecutor convertorExecutor,
107                              final boolean skipTableFeatures,
108                              final boolean useSingleLayerSerialization,
109                              final DeviceInitializerProvider deviceInitializerProvider) {
110
111         this.dataBroker = dataBroker;
112         this.deviceInitializerProvider = deviceInitializerProvider;
113
114         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
115         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116         final NodesBuilder nodesBuilder = new NodesBuilder();
117         nodesBuilder.setNode(Collections.<Node>emptyList());
118         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
119         try {
120             tx.submit().get();
121         } catch (ExecutionException | InterruptedException e) {
122             LOG.error("Creation of node failed.", e);
123             throw new IllegalStateException(e);
124         }
125
126         this.switchFeaturesMandatory = switchFeaturesMandatory;
127         this.globalNotificationQuota = globalNotificationQuota;
128         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
129         this.skipTableFeatures = skipTableFeatures;
130         this.convertorExecutor = convertorExecutor;
131         this.hashedWheelTimer = hashedWheelTimer;
132         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
133         this.barrierCountLimit = barrierCountLimit;
134         this.spyPool = new ScheduledThreadPoolExecutor(1);
135         this.notificationPublishService = notificationPublishService;
136         this.messageSpy = messageSpy;
137         this.useSingleLayerSerialization = useSingleLayerSerialization;
138     }
139
140
141     @Override
142     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
143         this.deviceInitPhaseHandler = handler;
144     }
145
146     @Override
147     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
148         // final phase - we have to add new Device to MD-SAL DataStore
149         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
150         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
151         deviceContext.onPublished();
152         lifecycleService.registerDeviceRemovedHandler(this);
153     }
154
155     @Override
156     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
157         return ConnectionStatus.MAY_CONTINUE;
158     }
159
160     @Override
161     public TranslatorLibrary oook() {
162         return translatorLibrary;
163     }
164
165     @Override
166     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
167         this.translatorLibrary = translatorLibrary;
168     }
169
170     @Override
171     public void close() {
172         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
173                 iterator.hasNext();) {
174             final DeviceContext deviceCtx = iterator.next();
175             deviceCtx.shutdownConnection();
176             deviceCtx.shuttingDownDataStoreTransactions();
177         }
178
179         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
180         spyPool = null;
181
182     }
183
184     @Override
185     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
186         updatePacketInRateLimiters();
187         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
188     }
189
190     @Override
191     public void initialize() {
192         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
193     }
194
195     @Override
196     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
197         this.extensionConverterProvider = extensionConverterProvider;
198     }
199
200     @Override
201     public ExtensionConverterProvider getExtensionConverterProvider() {
202         return extensionConverterProvider;
203     }
204
205     @Override
206     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
207         this.deviceTerminPhaseHandler = handler;
208     }
209
210     @Override
211     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
212         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
213         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
214         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
215
216         if (Objects.isNull(deviceCtx)) {
217             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
218             return;
219         }
220
221         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
222             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
223             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
224             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
225             // If this is not primary connection, we should not continue disabling everything
226             return;
227         }
228
229         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
230             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
231             return;
232         }
233
234         deviceCtx.close();
235
236         // TODO: Auxiliary connections supported ?
237         // Device is disconnected and so we need to close TxManager
238         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
239         Futures.addCallback(future, new FutureCallback<Void>() {
240             @Override
241             public void onSuccess(final Void result) {
242                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
243                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
244             }
245
246             @Override
247             public void onFailure(final Throwable t) {
248                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
249                 LOG.trace("TxChainManager failed by closing. ", t);
250                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
251             }
252         });
253
254         // Add timer for Close TxManager because it could fail in cluster without notification
255         final TimerTask timerTask = timeout -> {
256             if (!future.isDone()) {
257                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
258                 future.cancel(false);
259             }
260         };
261
262         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
263     }
264
265     @VisibleForTesting
266     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
267         deviceContexts.put(deviceInfo, deviceContext);
268     }
269
270     @Override
271     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
272         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
273     }
274
275     @Override
276     public boolean isFlowRemovedNotificationOn() {
277         return this.isFlowRemovedNotificationOn;
278     }
279
280
281     @Override
282     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
283         skipTableFeatures = skipTableFeaturesValue;
284     }
285
286     @Override
287     public void setBarrierCountLimit(final int barrierCountLimit) {
288         this.barrierCountLimit = barrierCountLimit;
289     }
290
291     @Override
292     public void setBarrierInterval(final long barrierTimeoutLimit) {
293         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
294     }
295
296     @Override
297     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
298         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
299         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
300         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
301
302         Futures.addCallback(delFuture, new FutureCallback<Void>() {
303             @Override
304             public void onSuccess(final Void result) {
305                 if (LOG.isDebugEnabled()) {
306                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
307                 }
308             }
309
310             @Override
311             public void onFailure(@Nonnull final Throwable t) {
312                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
313             }
314         });
315
316         return delFuture;
317     }
318
319     @Override
320     public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
321         this.useSingleLayerSerialization = useSingleLayerSerialization;
322     }
323
324     public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
325
326         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
327                 connectionContext.getConnectionAdapter().getRemoteAddress(),
328                 connectionContext.getDeviceInfo().getNodeId());
329
330         connectionContext.getConnectionAdapter().setPacketInFiltering(true);
331
332         final OutboundQueueProvider outboundQueueProvider
333                 = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
334
335         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
336         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
337                 connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
338                         outboundQueueProvider,
339                         barrierCountLimit,
340                         barrierIntervalNanos);
341         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
342
343
344         final DeviceContext deviceContext = new DeviceContextImpl(
345                 connectionContext,
346                 dataBroker,
347                 messageSpy,
348                 translatorLibrary,
349                 this,
350                 convertorExecutor,
351                 skipTableFeatures,
352                 hashedWheelTimer,
353                 useSingleLayerSerialization,
354                 deviceInitializerProvider);
355
356         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
357         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
358         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
359         deviceContext.setNotificationPublishService(notificationPublishService);
360
361         deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
362         updatePacketInRateLimiters();
363
364         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
365                 connectionContext.getConnectionAdapter(), deviceContext);
366
367         connectionContext.getConnectionAdapter().setMessageListener(messageListener);
368
369         return deviceContext;
370     }
371
372     private void updatePacketInRateLimiters() {
373         synchronized (deviceContexts) {
374             final int deviceContextsSize = deviceContexts.size();
375             if (deviceContextsSize > 0) {
376                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
377                 if (freshNotificationLimit < 100) {
378                     freshNotificationLimit = 100;
379                 }
380                 if (LOG.isDebugEnabled()) {
381                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
382                 }
383                 for (final DeviceContext deviceContext : deviceContexts.values()) {
384                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
385                 }
386             }
387         }
388     }
389
390     public void onDeviceRemoved(DeviceInfo deviceInfo) {
391         deviceContexts.remove(deviceInfo);
392         LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
393
394         lifecycleServices.remove(deviceInfo);
395         LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
396     }
397
398     @Override
399     public long getBarrierIntervalNanos() {
400         return barrierIntervalNanos;
401     }
402
403     @Override
404     public int getBarrierCountLimit() {
405         return barrierCountLimit;
406     }
407 }