964ad0ba6b9fb41f3064361ee2f3d13ae05a34a3
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
55 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
56 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
57 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  *
69  */
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
71
72     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
73
74     private final long globalNotificationQuota;
75     private final boolean switchFeaturesMandatory;
76     private boolean isNotificationFlowRemovedOff;
77     private boolean skipTableFeatures;
78     private static final int SPY_RATE = 10;
79
80     private final DataBroker dataBroker;
81     private final ConvertorExecutor convertorExecutor;
82     private TranslatorLibrary translatorLibrary;
83     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
85
86     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
88
89     private long barrierIntervalNanos;
90     private int barrierCountLimit;
91
92     private ExtensionConverterProvider extensionConverterProvider;
93     private ScheduledThreadPoolExecutor spyPool;
94     private final ClusterSingletonServiceProvider singletonServiceProvider;
95     private final NotificationPublishService notificationPublishService;
96     private final MessageSpy messageSpy;
97     private final HashedWheelTimer hashedWheelTimer;
98
99     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
100                              final long globalNotificationQuota,
101                              final boolean switchFeaturesMandatory,
102                              final long barrierInterval,
103                              final int barrierCountLimit,
104                              final MessageSpy messageSpy,
105                              final boolean isNotificationFlowRemovedOff,
106                              final ClusterSingletonServiceProvider singletonServiceProvider,
107                              final NotificationPublishService notificationPublishService,
108                              final HashedWheelTimer hashedWheelTimer,
109                              final ConvertorExecutor convertorExecutor,
110                              final boolean skipTableFeatures) {
111
112         this.dataBroker = dataBroker;
113
114         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
115         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
116         final NodesBuilder nodesBuilder = new NodesBuilder();
117         nodesBuilder.setNode(Collections.<Node>emptyList());
118         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
119         try {
120             tx.submit().get();
121         } catch (ExecutionException | InterruptedException e) {
122             LOG.error("Creation of node failed.", e);
123             throw new IllegalStateException(e);
124         }
125
126         this.switchFeaturesMandatory = switchFeaturesMandatory;
127         this.globalNotificationQuota = globalNotificationQuota;
128         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
129         this.skipTableFeatures = skipTableFeatures;
130         this.convertorExecutor = convertorExecutor;
131         this.hashedWheelTimer = hashedWheelTimer;
132         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
133         this.barrierCountLimit = barrierCountLimit;
134         this.spyPool = new ScheduledThreadPoolExecutor(1);
135         this.singletonServiceProvider = singletonServiceProvider;
136         this.notificationPublishService = notificationPublishService;
137         this.messageSpy = messageSpy;
138     }
139
140
141     @Override
142     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
143         this.deviceInitPhaseHandler = handler;
144     }
145
146     @Override
147     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
148         // final phase - we have to add new Device to MD-SAL DataStore
149         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
150         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
151         deviceContext.onPublished();
152         lifecycleService.registerService(this.singletonServiceProvider);
153     }
154
155     @Override
156     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
157         Preconditions.checkArgument(connectionContext != null);
158
159         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
160         /*
161          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
162          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
163          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
164          */
165          if (deviceContexts.containsKey(deviceInfo)) {
166              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
167              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
168              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
169                  LOG.warn("Node {} context state not in TERMINATION state.",
170                          connectionContext.getDeviceInfo().getLOGValue());
171                  return ConnectionStatus.ALREADY_CONNECTED;
172              } else {
173                  return ConnectionStatus.CLOSING;
174              }
175          }
176
177         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
178                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
179
180         // Add Disconnect handler
181         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
182         // Cache this for clarity
183         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
184
185         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
186         connectionAdapter.setPacketInFiltering(true);
187
188         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
189
190         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
191         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
192                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
193         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
194
195         final LifecycleService lifecycleService = new LifecycleServiceImpl();
196
197         final DeviceContext deviceContext = new DeviceContextImpl(
198                 connectionContext,
199                 dataBroker,
200                 messageSpy,
201                 translatorLibrary,
202                 this,
203                 convertorExecutor,
204                 skipTableFeatures,
205                 hashedWheelTimer,
206                 this);
207
208         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
209         deviceContexts.put(deviceInfo, deviceContext);
210
211         lifecycleService.setDeviceContext(deviceContext);
212         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
213
214         lifecycleServices.put(deviceInfo, lifecycleService);
215
216         addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
217
218         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
219
220         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
221         deviceContext.setNotificationPublishService(notificationPublishService);
222
223         updatePacketInRateLimiters();
224
225         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
226                 connectionAdapter, deviceContext);
227
228         connectionAdapter.setMessageListener(messageListener);
229         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
230         return ConnectionStatus.MAY_CONTINUE;
231     }
232
233     @Override
234     public TranslatorLibrary oook() {
235         return translatorLibrary;
236     }
237
238     @Override
239     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
240         this.translatorLibrary = translatorLibrary;
241     }
242
243     @Override
244     public void close() {
245         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
246                 iterator.hasNext();) {
247             final DeviceContext deviceCtx = iterator.next();
248             deviceCtx.shutdownConnection();
249             deviceCtx.shuttingDownDataStoreTransactions();
250         }
251
252         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
253         spyPool = null;
254
255     }
256
257     @Override
258     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
259
260         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
261         if (LOG.isDebugEnabled()) {
262             LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
263         }
264
265         updatePacketInRateLimiters();
266         if (Objects.nonNull(lifecycleService)) {
267             try {
268                 lifecycleService.close();
269                 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
270             } catch (Exception e) {
271                 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
272             }
273         }
274
275         deviceContexts.remove(deviceInfo);
276         if (LOG.isDebugEnabled()) {
277             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
278         }
279
280     }
281
282     @Override
283     public void initialize() {
284         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
285     }
286
287     @Override
288     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
289         this.extensionConverterProvider = extensionConverterProvider;
290     }
291
292     @Override
293     public ExtensionConverterProvider getExtensionConverterProvider() {
294         return extensionConverterProvider;
295     }
296
297     @Override
298     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
299         this.deviceTerminPhaseHandler = handler;
300     }
301
302     @Override
303     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
304         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
305         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
306         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
307
308         if (null == deviceCtx) {
309             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
310             return;
311         }
312
313         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
314             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
315             return;
316         }
317
318         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
319
320         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
321             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
322             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
323             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
324         }
325         //TODO: Auxiliary connections supported ?
326             /* Device is disconnected and so we need to close TxManager */
327         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
328         Futures.addCallback(future, new FutureCallback<Void>() {
329
330             @Override
331             public void onSuccess(final Void result) {
332                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
333                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
334             }
335
336             @Override
337             public void onFailure(final Throwable t) {
338                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
339                 LOG.trace("TxChainManager failed by closing. ", t);
340                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
341             }
342         });
343         /* Add timer for Close TxManager because it could fain ind cluster without notification */
344         final TimerTask timerTask = timeout -> {
345             if (!future.isDone()) {
346                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
347                 future.cancel(false);
348             }
349         };
350         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
351     }
352
353     @VisibleForTesting
354     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
355         deviceContexts.put(deviceInfo, deviceContext);
356     }
357
358     @Override
359     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
360         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
361     }
362
363     @Override
364     public boolean getIsNotificationFlowRemovedOff() {
365         return this.isNotificationFlowRemovedOff;
366     }
367
368
369     @Override
370     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
371         skipTableFeatures = skipTableFeaturesValue;
372     }
373
374     @Override
375     public void setBarrierCountLimit(final int barrierCountLimit) {
376         this.barrierCountLimit = barrierCountLimit;
377     }
378
379     @Override
380     public void setBarrierInterval(final long barrierTimeoutLimit) {
381         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
382     }
383
384     @Override
385     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
386         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
387         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
388         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
389
390         Futures.addCallback(delFuture, new FutureCallback<Void>() {
391             @Override
392             public void onSuccess(final Void result) {
393                 if (LOG.isDebugEnabled()) {
394                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
395                 }
396             }
397
398             @Override
399             public void onFailure(@Nonnull final Throwable t) {
400                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
401             }
402         });
403
404         return delFuture;
405     }
406
407
408     private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
409         Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
410             @Override
411             public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
412                 if (LOG.isDebugEnabled()) {
413                     LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
414                 }
415             }
416
417             @Override
418             public void onFailure(Throwable throwable) {
419                 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
420                 lifecycleService.closeConnection();
421             }
422         });
423     }
424
425     private void updatePacketInRateLimiters() {
426         synchronized (deviceContexts) {
427             final int deviceContextsSize = deviceContexts.size();
428             if (deviceContextsSize > 0) {
429                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
430                 if (freshNotificationLimit < 100) {
431                     freshNotificationLimit = 100;
432                 }
433                 if (LOG.isDebugEnabled()) {
434                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
435                 }
436                 for (final DeviceContext deviceContext : deviceContexts.values()) {
437                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
438                 }
439             }
440         }
441     }
442
443     @VisibleForTesting
444     void setDeviceContext(final DeviceInfo deviceInfo, final DeviceContext deviceContext) {
445         this.deviceContexts.putIfAbsent(deviceInfo, deviceContext);
446     }
447
448     @VisibleForTesting
449     int getDeviceContextCount() {
450         return this.deviceContexts.size();
451     }
452
453
454 }