Bug 6465 Controller goes into slave mode
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.Objects;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
38 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
47 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
48 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
49 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
50 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
51 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
52 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  *
62  */
63 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
64
65     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
66
67     private final long globalNotificationQuota;
68     private final boolean switchFeaturesMandatory;
69     private boolean isNotificationFlowRemovedOff;
70     private boolean skipTableFeatures;
71     private static final int SPY_RATE = 10;
72
73     private final DataBroker dataBroker;
74     private final ConvertorExecutor convertorExecutor;
75     private TranslatorLibrary translatorLibrary;
76     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
77     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
78
79     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
80     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
81
82     private final long barrierIntervalNanos;
83     private final int barrierCountLimit;
84
85     private ExtensionConverterProvider extensionConverterProvider;
86     private ScheduledThreadPoolExecutor spyPool;
87     private final ClusterSingletonServiceProvider singletonServiceProvider;
88     private final NotificationPublishService notificationPublishService;
89     private final MessageSpy messageSpy;
90     private final HashedWheelTimer hashedWheelTimer;
91
92     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
93                              final long globalNotificationQuota,
94                              final boolean switchFeaturesMandatory,
95                              final long barrierInterval,
96                              final int barrierCountLimit,
97                              final MessageSpy messageSpy,
98                              final boolean isNotificationFlowRemovedOff,
99                              final ClusterSingletonServiceProvider singletonServiceProvider,
100                              final NotificationPublishService notificationPublishService,
101                              final HashedWheelTimer hashedWheelTimer,
102                              final ConvertorExecutor convertorExecutor,
103                              final boolean skipTableFeatures) {
104
105         this.switchFeaturesMandatory = switchFeaturesMandatory;
106         this.globalNotificationQuota = globalNotificationQuota;
107         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
108         this.skipTableFeatures = skipTableFeatures;
109         this.dataBroker = Preconditions.checkNotNull(dataBroker);
110         this.convertorExecutor = convertorExecutor;
111         this.hashedWheelTimer = hashedWheelTimer;
112         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
113         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
114
115         final NodesBuilder nodesBuilder = new NodesBuilder();
116         nodesBuilder.setNode(Collections.<Node>emptyList());
117         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
118         try {
119             tx.submit().get();
120         } catch (ExecutionException | InterruptedException e) {
121             LOG.error("Creation of node failed.", e);
122             throw new IllegalStateException(e);
123         }
124
125         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
126         this.barrierCountLimit = barrierCountLimit;
127
128         spyPool = new ScheduledThreadPoolExecutor(1);
129         this.singletonServiceProvider = singletonServiceProvider;
130         this.notificationPublishService = notificationPublishService;
131         this.messageSpy = messageSpy;
132     }
133
134
135     @Override
136     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
137         this.deviceInitPhaseHandler = handler;
138     }
139
140     @Override
141     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
142         // final phase - we have to add new Device to MD-SAL DataStore
143         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
144         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
145         deviceContext.onPublished();
146         lifecycleService.registerService(this.singletonServiceProvider);
147     }
148
149     @Override
150     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
151         Preconditions.checkArgument(connectionContext != null);
152
153         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
154         /*
155          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
156          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
157          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
158          */
159          if (deviceContexts.containsKey(deviceInfo)) {
160              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
161              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo);
162              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
163                  LOG.warn("Node {} context state not in TERMINATION state.",
164                          connectionContext.getDeviceInfo().getLOGValue());
165                  return ConnectionStatus.ALREADY_CONNECTED;
166              } else {
167                  return ConnectionStatus.CLOSING;
168              }
169          }
170
171         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
172                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
173
174         // Add Disconnect handler
175         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
176         // Cache this for clarity
177         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
178
179         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
180         connectionAdapter.setPacketInFiltering(true);
181
182         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
183
184         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
185         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
186                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
187         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
188
189         final DeviceContext deviceContext = new DeviceContextImpl(
190                 connectionContext,
191                 dataBroker,
192                 messageSpy,
193                 translatorLibrary,
194                 this,
195                 convertorExecutor,
196                 skipTableFeatures);
197
198         deviceContexts.putIfAbsent(deviceInfo, deviceContext);
199
200         final LifecycleService lifecycleService = new LifecycleServiceImpl();
201         lifecycleService.setDeviceContext(deviceContext);
202         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
203
204         lifecycleServices.putIfAbsent(deviceInfo, lifecycleService);
205
206         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
207
208         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
209         deviceContext.setNotificationPublishService(notificationPublishService);
210
211         updatePacketInRateLimiters();
212
213         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
214                 connectionAdapter, deviceContext);
215
216         connectionAdapter.setMessageListener(messageListener);
217         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
218         return ConnectionStatus.MAY_CONTINUE;
219     }
220
221     private void updatePacketInRateLimiters() {
222         synchronized (deviceContexts) {
223             final int deviceContextsSize = deviceContexts.size();
224             if (deviceContextsSize > 0) {
225                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
226                 if (freshNotificationLimit < 100) {
227                     freshNotificationLimit = 100;
228                 }
229                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
230                 for (final DeviceContext deviceContext : deviceContexts.values()) {
231                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
232                 }
233             }
234         }
235     }
236
237     @Override
238     public TranslatorLibrary oook() {
239         return translatorLibrary;
240     }
241
242     @Override
243     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
244         this.translatorLibrary = translatorLibrary;
245     }
246
247     @Override
248     public void close() {
249         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
250                 iterator.hasNext();) {
251             final DeviceContext deviceCtx = iterator.next();
252             deviceCtx.shutdownConnection();
253             deviceCtx.shuttingDownDataStoreTransactions();
254         }
255
256         if (spyPool != null) {
257             spyPool.shutdownNow();
258             spyPool = null;
259         }
260     }
261
262     @Override
263     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
264
265         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
266         if (LOG.isDebugEnabled()) {
267             LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
268         }
269
270         updatePacketInRateLimiters();
271         if (Objects.nonNull(lifecycleService)) {
272             try {
273                 lifecycleService.close();
274                 LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
275             } catch (Exception e) {
276                 LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
277             }
278         }
279
280         deviceContexts.remove(deviceInfo);
281         if (LOG.isDebugEnabled()) {
282             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
283         }
284
285     }
286
287     @Override
288     public void initialize() {
289         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
290     }
291
292     @Override
293     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
294         this.extensionConverterProvider = extensionConverterProvider;
295     }
296
297     @Override
298     public ExtensionConverterProvider getExtensionConverterProvider() {
299         return extensionConverterProvider;
300     }
301
302     @Override
303     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
304         this.deviceTerminPhaseHandler = handler;
305     }
306
307     @Override
308     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
309         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
310         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
311         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
312
313         if (null == deviceCtx) {
314             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
315             return;
316         }
317
318         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
319             LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
320             return;
321         }
322
323         deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
324
325         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
326             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
327             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
328             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
329         }
330         //TODO: Auxiliary connections supported ?
331         {
332             /* Device is disconnected and so we need to close TxManager */
333             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
334             Futures.addCallback(future, new FutureCallback<Void>() {
335
336                 @Override
337                 public void onSuccess(final Void result) {
338                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
339                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
340                 }
341
342                 @Override
343                 public void onFailure(final Throwable t) {
344                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
345                     LOG.trace("TxChainManager failed by closing. ", t);
346                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
347                 }
348             });
349             /* Add timer for Close TxManager because it could fain ind cluster without notification */
350             final TimerTask timerTask = timeout -> {
351                 if (!future.isDone()) {
352                     LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
353                     future.cancel(false);
354                 }
355             };
356             hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
357         }
358     }
359
360     @VisibleForTesting
361     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
362         deviceContexts.put(deviceInfo, deviceContext);
363     }
364
365     @Override
366     public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
367         this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
368     }
369
370     @Override
371     public boolean getIsNotificationFlowRemovedOff() {
372         return this.isNotificationFlowRemovedOff;
373     }
374
375
376     @Override
377     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
378         skipTableFeatures = skipTableFeaturesValue;
379     }
380
381 }