Bug 5596 Initial commit failed
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  *
62  */
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
64
65     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
66
67     private final long globalNotificationQuota;
68     private final boolean switchFeaturesMandatory;
69     private boolean isNotificationFlowRemovedOff;
70
71     private static final int SPY_RATE = 10;
72
73     private final DataBroker dataBroker;
74     private final ConvertorExecutor convertorExecutor;
75     private TranslatorLibrary translatorLibrary;
76     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
78
79     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
81
82     private final long barrierIntervalNanos;
83     private final int barrierCountLimit;
84     private ExtensionConverterProvider extensionConverterProvider;
85     private ScheduledThreadPoolExecutor spyPool;
86     private final ClusterSingletonServiceProvider singletonServiceProvider;
87     private final NotificationPublishService notificationPublishService;
88     private final MessageSpy messageSpy;
89     private final HashedWheelTimer hashedWheelTimer;
90
91     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
92                              final long globalNotificationQuota,
93                              final boolean switchFeaturesMandatory,
94                              final long barrierInterval,
95                              final int barrierCountLimit,
96                              
97                              final MessageSpy messageSpy,
98                              final boolean isNotificationFlowRemovedOff,
99                              final ClusterSingletonServiceProvider singletonServiceProvider,
100                              final NotificationPublishService notificationPublishService,
101                              final HashedWheelTimer hashedWheelTimer,
102                              final ConvertorExecutor convertorExecutor) {
103         this.switchFeaturesMandatory = switchFeaturesMandatory;
104         this.globalNotificationQuota = globalNotificationQuota;
105         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
106         this.dataBroker = Preconditions.checkNotNull(dataBroker);
107         this.convertorExecutor = convertorExecutor;
108         this.hashedWheelTimer = hashedWheelTimer;
109         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
110         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
111
112         final NodesBuilder nodesBuilder = new NodesBuilder();
113         nodesBuilder.setNode(Collections.<Node>emptyList());
114         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
115         try {
116             tx.submit().get();
117         } catch (ExecutionException | InterruptedException e) {
118             LOG.error("Creation of node failed.", e);
119             throw new IllegalStateException(e);
120         }
121
122         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
123         this.barrierCountLimit = barrierCountLimit;
124
125         spyPool = new ScheduledThreadPoolExecutor(1);
126         this.singletonServiceProvider = singletonServiceProvider;
127         this.notificationPublishService = notificationPublishService;
128         this.messageSpy = messageSpy;
129     }
130
131
132     @Override
133     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
134         this.deviceInitPhaseHandler = handler;
135     }
136
137     @Override
138     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
139         // final phase - we have to add new Device to MD-SAL DataStore
140         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
141         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
142         deviceContext.onPublished();
143         lifecycleService.registerService(this.singletonServiceProvider);
144     }
145
146     @Override
147     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
148         Preconditions.checkArgument(connectionContext != null);
149
150         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
151         /*
152          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
153          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
154          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
155          */
156          if (deviceContexts.containsKey(deviceInfo)) {
157              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
158              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
159                  LOG.info("Node {} already connected but context state not in TERMINATION state, replacing connection context",
160                          connectionContext.getDeviceInfo().getLOGValue());
161                  deviceContext.replaceConnectionContext(connectionContext);
162                  return ConnectionStatus.ALREADY_CONNECTED;
163              } else {
164                  LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
165                          connectionContext.getDeviceInfo().getLOGValue());
166                  return ConnectionStatus.CLOSING;
167              }
168          }
169
170         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
171                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
172
173         // Add Disconnect handler
174         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
175         // Cache this for clarity
176         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
177
178         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
179         connectionAdapter.setPacketInFiltering(true);
180
181         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
182
183         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
184         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
185                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
186         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
187
188         final DeviceContext deviceContext = new DeviceContextImpl(
189                 connectionContext,
190                 dataBroker,
191                 messageSpy,
192                 translatorLibrary,
193                 this,
194                 convertorExecutor);
195
196         deviceContexts.putIfAbsent(deviceInfo, deviceContext);
197
198         final LifecycleService lifecycleService = new LifecycleServiceImpl();
199         lifecycleService.setDeviceContext(deviceContext);
200         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
201
202         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
203
204         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
205
206         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
207         deviceContext.setNotificationPublishService(notificationPublishService);
208
209         updatePacketInRateLimiters();
210
211         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
212                 connectionAdapter, deviceContext);
213
214         connectionAdapter.setMessageListener(messageListener);
215         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
216         return ConnectionStatus.MAY_CONTINUE;
217     }
218
219     private void updatePacketInRateLimiters() {
220         synchronized (deviceContexts) {
221             final int deviceContextsSize = deviceContexts.size();
222             if (deviceContextsSize > 0) {
223                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
224                 if (freshNotificationLimit < 100) {
225                     freshNotificationLimit = 100;
226                 }
227                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
228                 for (final DeviceContext deviceContext : deviceContexts.values()) {
229                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
230                 }
231             }
232         }
233     }
234
235     @Override
236     public TranslatorLibrary oook() {
237         return translatorLibrary;
238     }
239
240     @Override
241     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
242         this.translatorLibrary = translatorLibrary;
243     }
244
245     @Override
246     public void close() {
247         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
248                 iterator.hasNext();) {
249             final DeviceContext deviceCtx = iterator.next();
250             deviceCtx.shutdownConnection();
251             deviceCtx.shuttingDownDataStoreTransactions();
252         }
253
254         if (spyPool != null) {
255             spyPool.shutdownNow();
256             spyPool = null;
257         }
258     }
259
260     @Override
261     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
262
263         deviceContexts.remove(deviceInfo);
264         if (LOG.isDebugEnabled()) {
265             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
266         }
267
268         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
269         if (LOG.isDebugEnabled()) {
270             LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
271         }
272
273         updatePacketInRateLimiters();
274         if (Objects.nonNull(lifecycleService)) {
275             try {
276                 lifecycleService.close();
277                 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
278             } catch (Exception e) {
279                 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
280             }
281         }
282     }
283
284     @Override
285     public void initialize() {
286         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
287     }
288
289     @Override
290     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
291         this.extensionConverterProvider = extensionConverterProvider;
292     }
293
294     @Override
295     public ExtensionConverterProvider getExtensionConverterProvider() {
296         return extensionConverterProvider;
297     }
298
299     @Override
300     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
301         this.deviceTerminPhaseHandler = handler;
302     }
303
304     @Override
305     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
306         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
307         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
308         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
309
310         if (null == deviceCtx) {
311             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
312             return;
313         }
314
315         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
316             LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
317             return;
318         }
319
320         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
321
322         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
323             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
324             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
325             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
326         }
327         //TODO: Auxiliary connections supported ?
328         {
329             /* Device is disconnected and so we need to close TxManager */
330             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
331             Futures.addCallback(future, new FutureCallback<Void>() {
332
333                 @Override
334                 public void onSuccess(final Void result) {
335                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
336                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
337                 }
338
339                 @Override
340                 public void onFailure(final Throwable t) {
341                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
342                     LOG.trace("TxChainManager failed by closing. ", t);
343                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
344                 }
345             });
346             /* Add timer for Close TxManager because it could fain ind cluster without notification */
347             final TimerTask timerTask = timeout -> {
348                 if (!future.isDone()) {
349                     LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
350                     future.cancel(false);
351                 }
352             };
353             hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
354         }
355     }
356
357     @VisibleForTesting
358     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
359         deviceContexts.put(deviceInfo, deviceContext);
360     }
361
362     @Override
363     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
364         return (T) deviceContexts.get(deviceInfo);
365     }
366
367     @Override
368     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
369         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
370     }
371
372     @Override
373     public boolean getIsNotificationFlowRemovedOff() {
374         return this.isNotificationFlowRemovedOff;
375     }
376
377 }