Add configuration parameter for single layer
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
55 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
56 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
57 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  *
69  */
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
71
72     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
73
74     private final long globalNotificationQuota;
75     private final boolean switchFeaturesMandatory;
76     private boolean isFlowRemovedNotificationOn;
77     private boolean skipTableFeatures;
78     private static final int SPY_RATE = 10;
79
80     private final DataBroker dataBroker;
81     private final ConvertorExecutor convertorExecutor;
82     private TranslatorLibrary translatorLibrary;
83     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
85
86     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
88
89     private long barrierIntervalNanos;
90     private int barrierCountLimit;
91
92     private ExtensionConverterProvider extensionConverterProvider;
93     private ScheduledThreadPoolExecutor spyPool;
94     private final ClusterSingletonServiceProvider singletonServiceProvider;
95     private final NotificationPublishService notificationPublishService;
96     private final MessageSpy messageSpy;
97     private final HashedWheelTimer hashedWheelTimer;
98     private final boolean useSingleLayerSerialization;
99
100     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
101                              final long globalNotificationQuota,
102                              final boolean switchFeaturesMandatory,
103                              final long barrierInterval,
104                              final int barrierCountLimit,
105                              final MessageSpy messageSpy,
106                              final boolean isFlowRemovedNotificationOn,
107                              final ClusterSingletonServiceProvider singletonServiceProvider,
108                              final NotificationPublishService notificationPublishService,
109                              final HashedWheelTimer hashedWheelTimer,
110                              final ConvertorExecutor convertorExecutor,
111                              final boolean skipTableFeatures,
112                              final boolean useSingleLayerSerialization) {
113
114         this.dataBroker = dataBroker;
115
116         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
117         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
118         final NodesBuilder nodesBuilder = new NodesBuilder();
119         nodesBuilder.setNode(Collections.<Node>emptyList());
120         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
121         try {
122             tx.submit().get();
123         } catch (ExecutionException | InterruptedException e) {
124             LOG.error("Creation of node failed.", e);
125             throw new IllegalStateException(e);
126         }
127
128         this.switchFeaturesMandatory = switchFeaturesMandatory;
129         this.globalNotificationQuota = globalNotificationQuota;
130         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
131         this.skipTableFeatures = skipTableFeatures;
132         this.convertorExecutor = convertorExecutor;
133         this.hashedWheelTimer = hashedWheelTimer;
134         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
135         this.barrierCountLimit = barrierCountLimit;
136         this.spyPool = new ScheduledThreadPoolExecutor(1);
137         this.singletonServiceProvider = singletonServiceProvider;
138         this.notificationPublishService = notificationPublishService;
139         this.messageSpy = messageSpy;
140         this.useSingleLayerSerialization = useSingleLayerSerialization;
141     }
142
143
144     @Override
145     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
146         this.deviceInitPhaseHandler = handler;
147     }
148
149     @Override
150     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
151         // final phase - we have to add new Device to MD-SAL DataStore
152         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
153         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
154         deviceContext.onPublished();
155         lifecycleService.registerDeviceRemovedHandler(this);
156         lifecycleService.registerService(this.singletonServiceProvider);
157     }
158
159     @Override
160     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
161         Preconditions.checkArgument(connectionContext != null);
162         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
163
164         /*
165          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
166          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
167          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
168          */
169          if (deviceContexts.containsKey(deviceInfo)) {
170              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
171              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
172              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
173                  LOG.warn("Node {} context state not in TERMINATION state.",
174                          connectionContext.getDeviceInfo().getLOGValue());
175                  return ConnectionStatus.ALREADY_CONNECTED;
176              } else {
177                  return ConnectionStatus.CLOSING;
178              }
179          }
180
181         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
182                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
183
184         // Add Disconnect handler
185         connectionContext.setDeviceDisconnectedHandler(this);
186
187         // Cache this for clarity
188         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
189
190         // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
191         connectionAdapter.setPacketInFiltering(true);
192
193         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
194
195         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
196         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
197                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
198         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
199
200         final LifecycleService lifecycleService = new LifecycleServiceImpl();
201
202         final DeviceContext deviceContext = new DeviceContextImpl(
203                 connectionContext,
204                 dataBroker,
205                 messageSpy,
206                 translatorLibrary,
207                 this,
208                 convertorExecutor,
209                 skipTableFeatures,
210                 hashedWheelTimer,
211                 this,
212                 useSingleLayerSerialization);
213
214         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
215         deviceContexts.put(deviceInfo, deviceContext);
216
217         lifecycleService.setDeviceContext(deviceContext);
218         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
219
220         lifecycleServices.put(deviceInfo, lifecycleService);
221
222         addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
223
224         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
225
226         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
227         deviceContext.setNotificationPublishService(notificationPublishService);
228
229         updatePacketInRateLimiters();
230
231         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
232                 connectionAdapter, deviceContext);
233
234         connectionAdapter.setMessageListener(messageListener);
235         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
236         return ConnectionStatus.MAY_CONTINUE;
237     }
238
239     @Override
240     public TranslatorLibrary oook() {
241         return translatorLibrary;
242     }
243
244     @Override
245     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246         this.translatorLibrary = translatorLibrary;
247     }
248
249     @Override
250     public void close() {
251         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252                 iterator.hasNext();) {
253             final DeviceContext deviceCtx = iterator.next();
254             deviceCtx.shutdownConnection();
255             deviceCtx.shuttingDownDataStoreTransactions();
256         }
257
258         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
259         spyPool = null;
260
261     }
262
263     @Override
264     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
265         updatePacketInRateLimiters();
266         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
267     }
268
269     @Override
270     public void initialize() {
271         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
272     }
273
274     @Override
275     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
276         this.extensionConverterProvider = extensionConverterProvider;
277     }
278
279     @Override
280     public ExtensionConverterProvider getExtensionConverterProvider() {
281         return extensionConverterProvider;
282     }
283
284     @Override
285     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
286         this.deviceTerminPhaseHandler = handler;
287     }
288
289     @Override
290     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
291         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
292         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
293         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
294
295         if (Objects.isNull(deviceCtx)) {
296             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
297             return;
298         }
299
300         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
301             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
302             return;
303         }
304
305         deviceCtx.close();
306
307         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
308             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
309             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
310             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
311         }
312
313         // TODO: Auxiliary connections supported ?
314         // Device is disconnected and so we need to close TxManager
315         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
316         Futures.addCallback(future, new FutureCallback<Void>() {
317             @Override
318             public void onSuccess(final Void result) {
319                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
320                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
321             }
322
323             @Override
324             public void onFailure(final Throwable t) {
325                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
326                 LOG.trace("TxChainManager failed by closing. ", t);
327                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
328             }
329         });
330
331         // Add timer for Close TxManager because it could fail in cluster without notification
332         final TimerTask timerTask = timeout -> {
333             if (!future.isDone()) {
334                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
335                 future.cancel(false);
336             }
337         };
338
339         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
340     }
341
342     @VisibleForTesting
343     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
344         deviceContexts.put(deviceInfo, deviceContext);
345     }
346
347     @Override
348     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
349         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
350     }
351
352     @Override
353     public boolean isFlowRemovedNotificationOn() {
354         return this.isFlowRemovedNotificationOn;
355     }
356
357
358     @Override
359     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
360         skipTableFeatures = skipTableFeaturesValue;
361     }
362
363     @Override
364     public void setBarrierCountLimit(final int barrierCountLimit) {
365         this.barrierCountLimit = barrierCountLimit;
366     }
367
368     @Override
369     public void setBarrierInterval(final long barrierTimeoutLimit) {
370         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
371     }
372
373     @Override
374     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
375         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
376         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
377         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
378
379         Futures.addCallback(delFuture, new FutureCallback<Void>() {
380             @Override
381             public void onSuccess(final Void result) {
382                 if (LOG.isDebugEnabled()) {
383                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
384                 }
385             }
386
387             @Override
388             public void onFailure(@Nonnull final Throwable t) {
389                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
390             }
391         });
392
393         return delFuture;
394     }
395
396
397     private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
398         Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
399             @Override
400             public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
401                 if (LOG.isDebugEnabled()) {
402                     LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
403                 }
404             }
405
406             @Override
407             public void onFailure(Throwable throwable) {
408                 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
409                 lifecycleService.closeConnection();
410             }
411         });
412     }
413
414     private void updatePacketInRateLimiters() {
415         synchronized (deviceContexts) {
416             final int deviceContextsSize = deviceContexts.size();
417             if (deviceContextsSize > 0) {
418                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
419                 if (freshNotificationLimit < 100) {
420                     freshNotificationLimit = 100;
421                 }
422                 if (LOG.isDebugEnabled()) {
423                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
424                 }
425                 for (final DeviceContext deviceContext : deviceContexts.values()) {
426                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
427                 }
428             }
429         }
430     }
431
432     public void onDeviceRemoved(DeviceInfo deviceInfo) {
433         deviceContexts.remove(deviceInfo);
434         LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
435
436         lifecycleServices.remove(deviceInfo);
437         LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
438     }
439 }