Merge "Bug 5596 Added cluster provider"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
51 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
52 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
53 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
54 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
55 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
56 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
57 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
58 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  *
69  */
70 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
71
72     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
73
74     private final long globalNotificationQuota;
75     private final boolean switchFeaturesMandatory;
76     private boolean isNotificationFlowRemovedOff;
77
78     private static final int SPY_RATE = 10;
79
80     private final DataBroker dataBroker;
81     private final ConvertorExecutor convertorExecutor;
82     private TranslatorLibrary translatorLibrary;
83     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
84     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
85
86     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
87     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
88
89     private final long barrierIntervalNanos;
90     private final int barrierCountLimit;
91     private ExtensionConverterProvider extensionConverterProvider;
92     private ScheduledThreadPoolExecutor spyPool;
93     private Set<DeviceSynchronizeListener> deviceSynchronizedListeners;
94     private Set<DeviceValidListener> deviceValidListeners;
95     private final ClusterSingletonServiceProvider singletonServiceProvider;
96
97     private final LifecycleConductor conductor;
98
99     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
100                              final long globalNotificationQuota,
101                              final boolean switchFeaturesMandatory,
102                              final long barrierInterval,
103                              final int barrierCountLimit,
104                              final LifecycleConductor lifecycleConductor,
105                              boolean isNotificationFlowRemovedOff,
106                              final ConvertorExecutor convertorExecutor,
107                              final ClusterSingletonServiceProvider singletonServiceProvider) {
108         this.switchFeaturesMandatory = switchFeaturesMandatory;
109         this.globalNotificationQuota = globalNotificationQuota;
110         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
111         this.dataBroker = Preconditions.checkNotNull(dataBroker);
112         this.convertorExecutor = convertorExecutor;
113         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
114         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
115
116         final NodesBuilder nodesBuilder = new NodesBuilder();
117         nodesBuilder.setNode(Collections.<Node>emptyList());
118         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
119         try {
120             tx.submit().get();
121         } catch (ExecutionException | InterruptedException e) {
122             LOG.error("Creation of node failed.", e);
123             throw new IllegalStateException(e);
124         }
125
126         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
127         this.barrierCountLimit = barrierCountLimit;
128
129         this.conductor = lifecycleConductor;
130         spyPool = new ScheduledThreadPoolExecutor(1);
131         this.deviceSynchronizedListeners = new HashSet<>();
132         this.deviceValidListeners = new HashSet<>();
133         this.singletonServiceProvider = singletonServiceProvider;
134     }
135
136
137     @Override
138     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
139         this.deviceInitPhaseHandler = handler;
140     }
141
142     @Override
143     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
144         // final phase - we have to add new Device to MD-SAL DataStore
145         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
146         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
147         deviceContext.onPublished();
148         lifecycleService.registerService(this.singletonServiceProvider);
149     }
150
151     @Override
152     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
153         Preconditions.checkArgument(connectionContext != null);
154
155         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
156         /*
157          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
158          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
159          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
160          */
161          if (deviceContexts.containsKey(deviceInfo)) {
162             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
163              return false;
164          }
165
166         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
167                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
168
169         // Add Disconnect handler
170         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
171         // Cache this for clarity
172         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
173
174         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
175         connectionAdapter.setPacketInFiltering(true);
176
177         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
178
179         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
180         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
181                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
182         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
183
184         final DeviceState deviceState = new DeviceStateImpl(deviceInfo);
185         this.addDeviceSynchronizeListener(deviceState);
186         this.addDeviceValidListener(deviceState);
187
188         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
189                 deviceState,
190                 dataBroker,
191                 conductor,
192                 outboundQueueProvider,
193                 translatorLibrary,
194                 this,
195                 connectionContext.getDeviceInfo(),
196                 convertorExecutor);
197
198         final LifecycleService lifecycleService = new LifecycleServiceImpl();
199         lifecycleService.setDeviceContext(deviceContext);
200
201         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
202         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
203
204         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
205
206         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
207         deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
208
209         updatePacketInRateLimiters();
210
211         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
212                 connectionAdapter, deviceContext);
213         connectionAdapter.setMessageListener(messageListener);
214         notifyDeviceValidListeners(deviceInfo, true);
215
216         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleServices.get(deviceInfo));
217
218         notifyDeviceSynchronizeListeners(deviceInfo, true);
219
220         return true;
221     }
222
223     private void updatePacketInRateLimiters() {
224         synchronized (deviceContexts) {
225             final int deviceContextsSize = deviceContexts.size();
226             if (deviceContextsSize > 0) {
227                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
228                 if (freshNotificationLimit < 100) {
229                     freshNotificationLimit = 100;
230                 }
231                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
232                 for (final DeviceContext deviceContext : deviceContexts.values()) {
233                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
234                 }
235             }
236         }
237     }
238
239     @Override
240     public TranslatorLibrary oook() {
241         return translatorLibrary;
242     }
243
244     @Override
245     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
246         this.translatorLibrary = translatorLibrary;
247     }
248
249     @Override
250     public void close() {
251         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
252                 iterator.hasNext();) {
253             final DeviceContext deviceCtx = iterator.next();
254             notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false);
255             deviceCtx.shutdownConnection();
256             deviceCtx.shuttingDownDataStoreTransactions();
257         }
258
259         if (spyPool != null) {
260             spyPool.shutdownNow();
261             spyPool = null;
262         }
263     }
264
265     @Override
266     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
267         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
268         deviceContexts.remove(deviceInfo);
269         updatePacketInRateLimiters();
270         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
271         try {
272             lifecycleService.close();
273         } catch (Exception e) {
274             LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
275         }
276     }
277
278     @Override
279     public void initialize() {
280         spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
281     }
282
283     @Override
284     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
285         this.extensionConverterProvider = extensionConverterProvider;
286     }
287
288     @Override
289     public ExtensionConverterProvider getExtensionConverterProvider() {
290         return extensionConverterProvider;
291     }
292
293     @Override
294     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
295         this.deviceTerminPhaseHandler = handler;
296     }
297
298     @Override
299     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
300         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
301         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
302         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
303
304         if (null == deviceCtx) {
305             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
306             return;
307         }
308
309         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
310             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
311             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
312         } else {
313             notifyDeviceValidListeners(deviceInfo, false);
314             /* Device is disconnected and so we need to close TxManager */
315             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
316             Futures.addCallback(future, new FutureCallback<Void>() {
317
318                 @Override
319                 public void onSuccess(final Void result) {
320                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
321                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
322                 }
323
324                 @Override
325                 public void onFailure(final Throwable t) {
326                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
327                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
328                 }
329             });
330             /* Add timer for Close TxManager because it could fain ind cluster without notification */
331             final TimerTask timerTask = timeout -> {
332                 if (!future.isDone()) {
333                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
334                     future.cancel(false);
335                 }
336             };
337             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
338         }
339     }
340
341     @VisibleForTesting
342     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
343         deviceContexts.put(deviceInfo, deviceContext);
344     }
345
346     @Override
347     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
348         return (T) deviceContexts.get(deviceInfo);
349     }
350
351     @Override
352     public ListenableFuture<Void> onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) {
353         DeviceContext deviceContext = deviceContexts.get(deviceInfo);
354         LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId());
355         if (OfpRole.BECOMEMASTER.equals(role)) {
356             return onDeviceTakeClusterLeadership(deviceInfo);
357         }
358         return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager();
359     }
360
361     @Override
362     public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) {
363         this.deviceSynchronizedListeners.add(deviceSynchronizeListener);
364     }
365
366     @Override
367     public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) {
368         for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) {
369             listener.deviceIsSynchronized(deviceInfo, deviceSynchronized);
370         }
371     }
372
373     @Override
374     public void addDeviceValidListener(final DeviceValidListener deviceValidListener) {
375         this.deviceValidListeners.add(deviceValidListener);
376     }
377
378     @Override
379     public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) {
380         for (DeviceValidListener listener : deviceValidListeners) {
381             listener.deviceIsValid(deviceInfo, deviceValid);
382         }
383     }
384
385     @Override
386     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
387         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
388     }
389
390     @Override
391     public boolean getIsNotificationFlowRemovedOff() {
392         return this.isNotificationFlowRemovedOff;
393     }
394
395     private ListenableFuture<Void> onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) {
396         LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId());
397         /* validation */
398         StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo);
399         if (statisticsContext == null) {
400             final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId());
401             LOG.warn(errMsg);
402             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
403         }
404         DeviceContext deviceContext = deviceContexts.get(deviceInfo);
405         /* Prepare init info collecting */
406         notifyDeviceSynchronizeListeners(deviceInfo, false);
407         ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager();
408         ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
409         /* Init Collecting NodeInfo */
410         final ListenableFuture<Void> initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation(
411                 deviceContext, switchFeaturesMandatory, convertorExecutor);
412         /* Init Collecting StatInfo */
413         final ListenableFuture<Boolean> statPollFuture = Futures.transform(initCollectingDeviceInfo,
414                 new AsyncFunction<Void, Boolean>() {
415
416                     @Override
417                     public ListenableFuture<Boolean> apply(@Nonnull final Void input) throws Exception {
418                         statisticsContext.statListForCollectingInitialization();
419                         return statisticsContext.initialGatherDynamicData();
420                     }
421                 });
422
423         return Futures.transform(statPollFuture, getInitialDeviceInformation(deviceContext));
424     }
425
426     private Function<Boolean, Void> getInitialDeviceInformation(final DeviceContext deviceContext) {
427         return input -> {
428             if (ConnectionContext.CONNECTION_STATE.RIP.equals(
429                     conductor.gainConnectionStateSafely(deviceContext.getDeviceInfo())
430             )) {
431                 final String errMsg =
432                         String.format("We lost connection for Device %s, context has to be closed.",
433                         deviceContext.getDeviceInfo().getNodeId());
434                 LOG.warn(errMsg);
435                 throw new IllegalStateException(errMsg);
436             }
437
438             if (input == null || !input) {
439                 final String errMsg =
440                         String.format("Get Initial Device %s information fails",
441                         deviceContext.getDeviceInfo().getNodeId());
442                 LOG.warn(errMsg);
443                 throw new IllegalStateException(errMsg);
444             }
445             LOG.debug("Get Initial Device {} information is successful",
446                     deviceContext.getDeviceInfo().getNodeId());
447             notifyDeviceSynchronizeListeners(deviceContext.getDeviceInfo(), true);
448             deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);
449             return null;
450         };
451     }
452
453 }