Bug 5596 Change of start up services
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
36 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener;
45 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
49 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
50 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
51 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
52 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
53 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
54 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
55 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  *
66  */
67 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
68
69     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
70
71     private final long globalNotificationQuota;
72     private final boolean switchFeaturesMandatory;
73     private boolean isNotificationFlowRemovedOff;
74
75     private static final int SPY_RATE = 10;
76
77     private final DataBroker dataBroker;
78     private final ConvertorExecutor convertorExecutor;
79     private TranslatorLibrary translatorLibrary;
80     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
81     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
82
83     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
84
85     private final long barrierIntervalNanos;
86     private final int barrierCountLimit;
87     private ExtensionConverterProvider extensionConverterProvider;
88     private ScheduledThreadPoolExecutor spyPool;
89     private Set<DeviceSynchronizeListener> deviceSynchronizedListeners;
90     private Set<DeviceValidListener> deviceValidListeners;
91
92     private final LifecycleConductor conductor;
93
94     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
95                              final long globalNotificationQuota,
96                              final boolean switchFeaturesMandatory,
97                              final long barrierInterval,
98                              final int barrierCountLimit,
99                              final LifecycleConductor lifecycleConductor,
100                              boolean isNotificationFlowRemovedOff,
101                              final ConvertorExecutor convertorExecutor) {
102         this.switchFeaturesMandatory = switchFeaturesMandatory;
103         this.globalNotificationQuota = globalNotificationQuota;
104         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
105         this.dataBroker = Preconditions.checkNotNull(dataBroker);
106         this.convertorExecutor = convertorExecutor;
107         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
108         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
109
110         final NodesBuilder nodesBuilder = new NodesBuilder();
111         nodesBuilder.setNode(Collections.<Node>emptyList());
112         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
113         try {
114             tx.submit().get();
115         } catch (ExecutionException | InterruptedException e) {
116             LOG.error("Creation of node failed.", e);
117             throw new IllegalStateException(e);
118         }
119
120         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
121         this.barrierCountLimit = barrierCountLimit;
122
123         this.conductor = lifecycleConductor;
124         spyPool = new ScheduledThreadPoolExecutor(1);
125         this.deviceSynchronizedListeners = new HashSet<>();
126         this.deviceValidListeners = new HashSet<>();
127     }
128
129
130     @Override
131     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
132         this.deviceInitPhaseHandler = handler;
133     }
134
135     @Override
136     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception {
137         // final phase - we have to add new Device to MD-SAL DataStore
138         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
139         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
140         deviceContext.onPublished();
141     }
142
143     @Override
144     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
145         Preconditions.checkArgument(connectionContext != null);
146
147         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
148         /*
149          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
150          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
151          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
152          */
153          if (deviceContexts.containsKey(deviceInfo)) {
154             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
155              return false;
156          }
157
158         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
159                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
160
161         // Add Disconnect handler
162         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
163         // Cache this for clarity
164         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
165
166         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
167         connectionAdapter.setPacketInFiltering(true);
168
169         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
170
171         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
172         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
173                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
174         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
175
176         final DeviceState deviceState = new DeviceStateImpl(deviceInfo);
177         this.addDeviceSynchronizeListener(deviceState);
178         this.addDeviceValidListener(deviceState);
179
180         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
181                 deviceState,
182                 dataBroker,
183                 conductor,
184                 outboundQueueProvider,
185                 translatorLibrary,
186                 this,
187                 connectionContext.getDeviceInfo(),
188                 convertorExecutor);
189
190         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
191
192         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
193
194         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
195         deviceContext.setNotificationPublishService(conductor.getNotificationPublishService());
196
197         updatePacketInRateLimiters();
198
199         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
200                 connectionAdapter, deviceContext);
201         connectionAdapter.setMessageListener(messageListener);
202         notifyDeviceValidListeners(deviceInfo, true);
203
204         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo());
205
206         notifyDeviceSynchronizeListeners(deviceInfo, true);
207
208         return true;
209     }
210
211     private void updatePacketInRateLimiters() {
212         synchronized (deviceContexts) {
213             final int deviceContextsSize = deviceContexts.size();
214             if (deviceContextsSize > 0) {
215                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
216                 if (freshNotificationLimit < 100) {
217                     freshNotificationLimit = 100;
218                 }
219                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
220                 for (final DeviceContext deviceContext : deviceContexts.values()) {
221                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
222                 }
223             }
224         }
225     }
226
227     @Override
228     public TranslatorLibrary oook() {
229         return translatorLibrary;
230     }
231
232     @Override
233     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
234         this.translatorLibrary = translatorLibrary;
235     }
236
237     @Override
238     public void close() {
239         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
240                 iterator.hasNext();) {
241             final DeviceContext deviceCtx = iterator.next();
242             notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false);
243             deviceCtx.shutdownConnection();
244             deviceCtx.shuttingDownDataStoreTransactions();
245         }
246
247         if (spyPool != null) {
248             spyPool.shutdownNow();
249             spyPool = null;
250         }
251     }
252
253     @Override
254     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
255         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
256         deviceContexts.remove(deviceInfo);
257         updatePacketInRateLimiters();
258     }
259
260     @Override
261     public void initialize() {
262         spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
263     }
264
265     @Override
266     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
267         this.extensionConverterProvider = extensionConverterProvider;
268     }
269
270     @Override
271     public ExtensionConverterProvider getExtensionConverterProvider() {
272         return extensionConverterProvider;
273     }
274
275     @Override
276     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
277         this.deviceTerminPhaseHandler = handler;
278     }
279
280     @Override
281     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
282         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
283         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
284         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
285
286         if (null == deviceCtx) {
287             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
288             return;
289         }
290
291         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
292             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
293             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
294         } else {
295             notifyDeviceValidListeners(deviceInfo, false);
296             /* Device is disconnected and so we need to close TxManager */
297             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
298             Futures.addCallback(future, new FutureCallback<Void>() {
299
300                 @Override
301                 public void onSuccess(final Void result) {
302                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
303                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
304                 }
305
306                 @Override
307                 public void onFailure(final Throwable t) {
308                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
309                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
310                 }
311             });
312             /* Add timer for Close TxManager because it could fain ind cluster without notification */
313             final TimerTask timerTask = timeout -> {
314                 if (!future.isDone()) {
315                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
316                     future.cancel(false);
317                 }
318             };
319             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
320         }
321     }
322
323     @VisibleForTesting
324     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
325         deviceContexts.put(deviceInfo, deviceContext);
326     }
327
328     @Override
329     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
330         return (T) deviceContexts.get(deviceInfo);
331     }
332
333     @Override
334     public ListenableFuture<Void> onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) {
335         DeviceContext deviceContext = deviceContexts.get(deviceInfo);
336         LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId());
337         if (OfpRole.BECOMEMASTER.equals(role)) {
338             return onDeviceTakeClusterLeadership(deviceInfo);
339         }
340         return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager();
341     }
342
343     @Override
344     public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) {
345         this.deviceSynchronizedListeners.add(deviceSynchronizeListener);
346     }
347
348     @Override
349     public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) {
350         for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) {
351             listener.deviceIsSynchronized(deviceInfo, deviceSynchronized);
352         }
353     }
354
355     @Override
356     public void addDeviceValidListener(final DeviceValidListener deviceValidListener) {
357         this.deviceValidListeners.add(deviceValidListener);
358     }
359
360     @Override
361     public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) {
362         for (DeviceValidListener listener : deviceValidListeners) {
363             listener.deviceIsValid(deviceInfo, deviceValid);
364         }
365     }
366
367     @Override
368     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
369         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
370     }
371
372     @Override
373     public boolean getIsNotificationFlowRemovedOff() {
374         return this.isNotificationFlowRemovedOff;
375     }
376
377     private ListenableFuture<Void> onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) {
378         LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId());
379         /* validation */
380         StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo);
381         if (statisticsContext == null) {
382             final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId());
383             LOG.warn(errMsg);
384             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
385         }
386         DeviceContext deviceContext = deviceContexts.get(deviceInfo);
387         /* Prepare init info collecting */
388         notifyDeviceSynchronizeListeners(deviceInfo, false);
389         ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager();
390         ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction();
391         /* Init Collecting NodeInfo */
392         final ListenableFuture<Void> initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation(
393                 deviceContext, switchFeaturesMandatory, convertorExecutor);
394         /* Init Collecting StatInfo */
395         final ListenableFuture<Boolean> statPollFuture = Futures.transform(initCollectingDeviceInfo,
396                 new AsyncFunction<Void, Boolean>() {
397
398                     @Override
399                     public ListenableFuture<Boolean> apply(@Nonnull final Void input) throws Exception {
400                         statisticsContext.statListForCollectingInitialization();
401                         return statisticsContext.initialGatherDynamicData();
402                     }
403                 });
404
405         return Futures.transform(statPollFuture, getInitialDeviceInformation(deviceContext));
406     }
407
408     private Function<Boolean, Void> getInitialDeviceInformation(final DeviceContext deviceContext) {
409         return input -> {
410             if (ConnectionContext.CONNECTION_STATE.RIP.equals(
411                     conductor.gainConnectionStateSafely(deviceContext.getDeviceInfo())
412             )) {
413                 final String errMsg =
414                         String.format("We lost connection for Device %s, context has to be closed.",
415                         deviceContext.getDeviceInfo().getNodeId());
416                 LOG.warn(errMsg);
417                 throw new IllegalStateException(errMsg);
418             }
419
420             if (input == null || !input) {
421                 final String errMsg =
422                         String.format("Get Initial Device %s information fails",
423                         deviceContext.getDeviceInfo().getNodeId());
424                 LOG.warn(errMsg);
425                 throw new IllegalStateException(errMsg);
426             }
427             LOG.debug("Get Initial Device {} information is successful",
428                     deviceContext.getDeviceInfo().getNodeId());
429             notifyDeviceSynchronizeListeners(deviceContext.getDeviceInfo(), true);
430             deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true);
431             return null;
432         };
433     }
434
435 }