Merge "Replace odl-dlux-core with odl-dluxapps-topology"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
55 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
56 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
57 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
58 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  *
70  */
71 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
72
73     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
74
75     private final long globalNotificationQuota;
76     private final boolean switchFeaturesMandatory;
77     private boolean isFlowRemovedNotificationOn;
78     private boolean skipTableFeatures;
79     private static final int SPY_RATE = 10;
80
81     private final DataBroker dataBroker;
82     private final DeviceInitializerProvider deviceInitializerProvider;
83     private final ConvertorExecutor convertorExecutor;
84     private TranslatorLibrary translatorLibrary;
85     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
86     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
87
88     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
89     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
90
91     private long barrierIntervalNanos;
92     private int barrierCountLimit;
93
94     private ExtensionConverterProvider extensionConverterProvider;
95     private ScheduledThreadPoolExecutor spyPool;
96     private final ClusterSingletonServiceProvider singletonServiceProvider;
97     private final NotificationPublishService notificationPublishService;
98     private final MessageSpy messageSpy;
99     private final HashedWheelTimer hashedWheelTimer;
100     private final boolean useSingleLayerSerialization;
101
102     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
103                              final long globalNotificationQuota,
104                              final boolean switchFeaturesMandatory,
105                              final long barrierInterval,
106                              final int barrierCountLimit,
107                              final MessageSpy messageSpy,
108                              final boolean isFlowRemovedNotificationOn,
109                              final ClusterSingletonServiceProvider singletonServiceProvider,
110                              final NotificationPublishService notificationPublishService,
111                              final HashedWheelTimer hashedWheelTimer,
112                              final ConvertorExecutor convertorExecutor,
113                              final boolean skipTableFeatures,
114                              final boolean useSingleLayerSerialization,
115                              final DeviceInitializerProvider deviceInitializerProvider) {
116
117         this.dataBroker = dataBroker;
118         this.deviceInitializerProvider = deviceInitializerProvider;
119
120         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
121         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
122         final NodesBuilder nodesBuilder = new NodesBuilder();
123         nodesBuilder.setNode(Collections.<Node>emptyList());
124         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
125         try {
126             tx.submit().get();
127         } catch (ExecutionException | InterruptedException e) {
128             LOG.error("Creation of node failed.", e);
129             throw new IllegalStateException(e);
130         }
131
132         this.switchFeaturesMandatory = switchFeaturesMandatory;
133         this.globalNotificationQuota = globalNotificationQuota;
134         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
135         this.skipTableFeatures = skipTableFeatures;
136         this.convertorExecutor = convertorExecutor;
137         this.hashedWheelTimer = hashedWheelTimer;
138         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
139         this.barrierCountLimit = barrierCountLimit;
140         this.spyPool = new ScheduledThreadPoolExecutor(1);
141         this.singletonServiceProvider = singletonServiceProvider;
142         this.notificationPublishService = notificationPublishService;
143         this.messageSpy = messageSpy;
144         this.useSingleLayerSerialization = useSingleLayerSerialization;
145     }
146
147
148     @Override
149     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
150         this.deviceInitPhaseHandler = handler;
151     }
152
153     @Override
154     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
155         // final phase - we have to add new Device to MD-SAL DataStore
156         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
157         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
158         deviceContext.onPublished();
159         lifecycleService.registerDeviceRemovedHandler(this);
160         lifecycleService.registerService(this.singletonServiceProvider);
161     }
162
163     @Override
164     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
165         Preconditions.checkArgument(connectionContext != null);
166         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
167
168         /*
169          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
170          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
171          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
172          */
173          if (deviceContexts.containsKey(deviceInfo)) {
174              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
175              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
176              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
177                  LOG.warn("Node {} context state not in TERMINATION state.",
178                          connectionContext.getDeviceInfo().getLOGValue());
179                  return ConnectionStatus.ALREADY_CONNECTED;
180              } else {
181                  return ConnectionStatus.CLOSING;
182              }
183          }
184
185         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
186                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
187
188         // Add Disconnect handler
189         connectionContext.setDeviceDisconnectedHandler(this);
190
191         // Cache this for clarity
192         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
193
194         // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
195         connectionAdapter.setPacketInFiltering(true);
196
197         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
198
199         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
200         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
201                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
202         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
203
204         final LifecycleService lifecycleService = new LifecycleServiceImpl();
205         final DeviceContext deviceContext = new DeviceContextImpl(
206                 connectionContext,
207                 dataBroker,
208                 messageSpy,
209                 translatorLibrary,
210                 this,
211                 convertorExecutor,
212                 skipTableFeatures,
213                 hashedWheelTimer,
214                 this,
215                 useSingleLayerSerialization,
216                 deviceInitializerProvider);
217
218         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
219         deviceContexts.put(deviceInfo, deviceContext);
220
221         lifecycleService.setDeviceContext(deviceContext);
222         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
223
224         lifecycleServices.put(deviceInfo, lifecycleService);
225
226         addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
227
228         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
229
230         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
231         deviceContext.setNotificationPublishService(notificationPublishService);
232
233         updatePacketInRateLimiters();
234
235         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
236                 connectionAdapter, deviceContext);
237
238         connectionAdapter.setMessageListener(messageListener);
239         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
240         return ConnectionStatus.MAY_CONTINUE;
241     }
242
243     @Override
244     public TranslatorLibrary oook() {
245         return translatorLibrary;
246     }
247
248     @Override
249     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
250         this.translatorLibrary = translatorLibrary;
251     }
252
253     @Override
254     public void close() {
255         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
256                 iterator.hasNext();) {
257             final DeviceContext deviceCtx = iterator.next();
258             deviceCtx.shutdownConnection();
259             deviceCtx.shuttingDownDataStoreTransactions();
260         }
261
262         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
263         spyPool = null;
264
265     }
266
267     @Override
268     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
269         updatePacketInRateLimiters();
270         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
271     }
272
273     @Override
274     public void initialize() {
275         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
276     }
277
278     @Override
279     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
280         this.extensionConverterProvider = extensionConverterProvider;
281     }
282
283     @Override
284     public ExtensionConverterProvider getExtensionConverterProvider() {
285         return extensionConverterProvider;
286     }
287
288     @Override
289     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
290         this.deviceTerminPhaseHandler = handler;
291     }
292
293     @Override
294     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
295         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
296         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
297         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
298
299         if (Objects.isNull(deviceCtx)) {
300             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
301             return;
302         }
303
304         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
305             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
306             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
307             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
308             // If this is not primary connection, we should not continue disabling everything
309             return;
310         }
311
312         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
313             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
314             return;
315         }
316
317         deviceCtx.close();
318
319         // TODO: Auxiliary connections supported ?
320         // Device is disconnected and so we need to close TxManager
321         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
322         Futures.addCallback(future, new FutureCallback<Void>() {
323             @Override
324             public void onSuccess(final Void result) {
325                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
326                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
327             }
328
329             @Override
330             public void onFailure(final Throwable t) {
331                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
332                 LOG.trace("TxChainManager failed by closing. ", t);
333                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
334             }
335         });
336
337         // Add timer for Close TxManager because it could fail in cluster without notification
338         final TimerTask timerTask = timeout -> {
339             if (!future.isDone()) {
340                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
341                 future.cancel(false);
342             }
343         };
344
345         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
346     }
347
348     @VisibleForTesting
349     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
350         deviceContexts.put(deviceInfo, deviceContext);
351     }
352
353     @Override
354     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
355         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
356     }
357
358     @Override
359     public boolean isFlowRemovedNotificationOn() {
360         return this.isFlowRemovedNotificationOn;
361     }
362
363
364     @Override
365     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
366         skipTableFeatures = skipTableFeaturesValue;
367     }
368
369     @Override
370     public void setBarrierCountLimit(final int barrierCountLimit) {
371         this.barrierCountLimit = barrierCountLimit;
372     }
373
374     @Override
375     public void setBarrierInterval(final long barrierTimeoutLimit) {
376         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
377     }
378
379     @Override
380     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
381         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
382         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
383         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
384
385         Futures.addCallback(delFuture, new FutureCallback<Void>() {
386             @Override
387             public void onSuccess(final Void result) {
388                 if (LOG.isDebugEnabled()) {
389                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
390                 }
391             }
392
393             @Override
394             public void onFailure(@Nonnull final Throwable t) {
395                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
396             }
397         });
398
399         return delFuture;
400     }
401
402
403     private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
404         Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
405             @Override
406             public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
407                 if (LOG.isDebugEnabled()) {
408                     LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
409                 }
410                 deviceContext.sendNodeAddedNotification();
411             }
412
413             @Override
414             public void onFailure(Throwable throwable) {
415                 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
416                 lifecycleService.closeConnection();
417             }
418         });
419     }
420
421     private void updatePacketInRateLimiters() {
422         synchronized (deviceContexts) {
423             final int deviceContextsSize = deviceContexts.size();
424             if (deviceContextsSize > 0) {
425                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
426                 if (freshNotificationLimit < 100) {
427                     freshNotificationLimit = 100;
428                 }
429                 if (LOG.isDebugEnabled()) {
430                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
431                 }
432                 for (final DeviceContext deviceContext : deviceContexts.values()) {
433                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
434                 }
435             }
436         }
437     }
438
439     public void onDeviceRemoved(DeviceInfo deviceInfo) {
440         deviceContexts.remove(deviceInfo);
441         LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
442
443         lifecycleServices.remove(deviceInfo);
444         LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
445     }
446 }