Change dropping mastership.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
37 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
48 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
49 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
50 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
51 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
52 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
53 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
54 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  *
64  */
65 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
66
67     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
68
69     private final long globalNotificationQuota;
70     private final boolean switchFeaturesMandatory;
71     private boolean isFlowRemovedNotificationOn;
72     private boolean skipTableFeatures;
73     private static final int SPY_RATE = 10;
74
75     private final DataBroker dataBroker;
76     private final DeviceInitializerProvider deviceInitializerProvider;
77     private final ConvertorExecutor convertorExecutor;
78     private TranslatorLibrary translatorLibrary;
79     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
80
81     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
82     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
83
84     private long barrierIntervalNanos;
85     private int barrierCountLimit;
86
87     private ExtensionConverterProvider extensionConverterProvider;
88     private ScheduledThreadPoolExecutor spyPool;
89     private final NotificationPublishService notificationPublishService;
90     private final MessageSpy messageSpy;
91     private final HashedWheelTimer hashedWheelTimer;
92     private boolean useSingleLayerSerialization;
93
94     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
95                              final long globalNotificationQuota,
96                              final boolean switchFeaturesMandatory,
97                              final long barrierInterval,
98                              final int barrierCountLimit,
99                              final MessageSpy messageSpy,
100                              final boolean isFlowRemovedNotificationOn,
101                              final ClusterSingletonServiceProvider singletonServiceProvider,
102                              final NotificationPublishService notificationPublishService,
103                              final HashedWheelTimer hashedWheelTimer,
104                              final ConvertorExecutor convertorExecutor,
105                              final boolean skipTableFeatures,
106                              final boolean useSingleLayerSerialization,
107                              final DeviceInitializerProvider deviceInitializerProvider) {
108
109         this.dataBroker = dataBroker;
110         this.deviceInitializerProvider = deviceInitializerProvider;
111
112         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
113         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
114         final NodesBuilder nodesBuilder = new NodesBuilder();
115         nodesBuilder.setNode(Collections.<Node>emptyList());
116         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
117         try {
118             tx.submit().get();
119         } catch (ExecutionException | InterruptedException e) {
120             LOG.error("Creation of node failed.", e);
121             throw new IllegalStateException(e);
122         }
123
124         this.switchFeaturesMandatory = switchFeaturesMandatory;
125         this.globalNotificationQuota = globalNotificationQuota;
126         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
127         this.skipTableFeatures = skipTableFeatures;
128         this.convertorExecutor = convertorExecutor;
129         this.hashedWheelTimer = hashedWheelTimer;
130         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
131         this.barrierCountLimit = barrierCountLimit;
132         this.spyPool = new ScheduledThreadPoolExecutor(1);
133         this.notificationPublishService = notificationPublishService;
134         this.messageSpy = messageSpy;
135         this.useSingleLayerSerialization = useSingleLayerSerialization;
136     }
137
138
139     @Override
140     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
141     }
142
143     @Override
144     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
145         // final phase - we have to add new Device to MD-SAL DataStore
146         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
147         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
148         deviceContext.onPublished();
149         lifecycleService.registerDeviceRemovedHandler(this);
150     }
151
152     @Override
153     public TranslatorLibrary oook() {
154         return translatorLibrary;
155     }
156
157     @Override
158     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
159         this.translatorLibrary = translatorLibrary;
160     }
161
162     @Override
163     public void close() {
164         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
165                 iterator.hasNext();) {
166             final DeviceContext deviceCtx = iterator.next();
167             deviceCtx.shutdownConnection();
168             deviceCtx.shuttingDownDataStoreTransactions();
169         }
170
171         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
172         spyPool = null;
173
174     }
175
176     @Override
177     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
178         updatePacketInRateLimiters();
179         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
180     }
181
182     @Override
183     public void initialize() {
184         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
185     }
186
187     @Override
188     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
189         this.extensionConverterProvider = extensionConverterProvider;
190     }
191
192     @Override
193     public ExtensionConverterProvider getExtensionConverterProvider() {
194         return extensionConverterProvider;
195     }
196
197     @Override
198     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
199         this.deviceTerminPhaseHandler = handler;
200     }
201
202     @Override
203     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
204         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
205         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
206         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
207
208         if (Objects.isNull(deviceCtx)) {
209             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
210             return;
211         }
212
213         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
214             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
215             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
216             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
217             // If this is not primary connection, we should not continue disabling everything
218             return;
219         }
220
221         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
222             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
223             return;
224         }
225
226         deviceCtx.close();
227
228         // TODO: Auxiliary connections supported ?
229         // Device is disconnected and so we need to close TxManager
230         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
231         Futures.addCallback(future, new FutureCallback<Void>() {
232             @Override
233             public void onSuccess(final Void result) {
234                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
235                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
236             }
237
238             @Override
239             public void onFailure(final Throwable t) {
240                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
241                 LOG.trace("TxChainManager failed by closing. ", t);
242                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
243             }
244         });
245
246         // Add timer for Close TxManager because it could fail in cluster without notification
247         final TimerTask timerTask = timeout -> {
248             if (!future.isDone()) {
249                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
250                 future.cancel(false);
251             }
252         };
253
254         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
255     }
256
257     @VisibleForTesting
258     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
259         deviceContexts.put(deviceInfo, deviceContext);
260     }
261
262     @Override
263     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
264         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
265     }
266
267     @Override
268     public boolean isFlowRemovedNotificationOn() {
269         return this.isFlowRemovedNotificationOn;
270     }
271
272
273     @Override
274     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
275         skipTableFeatures = skipTableFeaturesValue;
276     }
277
278     @Override
279     public void setBarrierCountLimit(final int barrierCountLimit) {
280         this.barrierCountLimit = barrierCountLimit;
281     }
282
283     @Override
284     public void setBarrierInterval(final long barrierTimeoutLimit) {
285         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
286     }
287
288     @Override
289     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
290         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
291         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
292         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
293
294         Futures.addCallback(delFuture, new FutureCallback<Void>() {
295             @Override
296             public void onSuccess(final Void result) {
297                 if (LOG.isDebugEnabled()) {
298                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
299                 }
300             }
301
302             @Override
303             public void onFailure(@Nonnull final Throwable t) {
304                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
305             }
306         });
307
308         return delFuture;
309     }
310
311     @Override
312     public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
313         this.useSingleLayerSerialization = useSingleLayerSerialization;
314     }
315
316     public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
317
318         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
319                 connectionContext.getConnectionAdapter().getRemoteAddress(),
320                 connectionContext.getDeviceInfo().getNodeId());
321
322         connectionContext.getConnectionAdapter().setPacketInFiltering(true);
323
324         final OutboundQueueProvider outboundQueueProvider
325                 = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
326
327         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
328         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
329                 connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
330                         outboundQueueProvider,
331                         barrierCountLimit,
332                         barrierIntervalNanos);
333         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
334
335
336         final DeviceContext deviceContext = new DeviceContextImpl(
337                 connectionContext,
338                 dataBroker,
339                 messageSpy,
340                 translatorLibrary,
341                 this,
342                 convertorExecutor,
343                 skipTableFeatures,
344                 hashedWheelTimer,
345                 useSingleLayerSerialization,
346                 deviceInitializerProvider);
347
348         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
349         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
350         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
351         deviceContext.setNotificationPublishService(notificationPublishService);
352
353         deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
354         updatePacketInRateLimiters();
355
356         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
357                 connectionContext.getConnectionAdapter(), deviceContext);
358
359         connectionContext.getConnectionAdapter().setMessageListener(messageListener);
360
361         return deviceContext;
362     }
363
364     private void updatePacketInRateLimiters() {
365         synchronized (deviceContexts) {
366             final int deviceContextsSize = deviceContexts.size();
367             if (deviceContextsSize > 0) {
368                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
369                 if (freshNotificationLimit < 100) {
370                     freshNotificationLimit = 100;
371                 }
372                 if (LOG.isDebugEnabled()) {
373                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
374                 }
375                 for (final DeviceContext deviceContext : deviceContexts.values()) {
376                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
377                 }
378             }
379         }
380     }
381
382     @Override
383     public void onDeviceRemoved(final DeviceInfo deviceInfo) {
384         deviceContexts.remove(deviceInfo);
385         if (LOG.isDebugEnabled()) {
386             LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
387         }
388         this.updatePacketInRateLimiters();
389     }
390
391     @Override
392     public long getBarrierIntervalNanos() {
393         return barrierIntervalNanos;
394     }
395
396     @Override
397     public int getBarrierCountLimit() {
398         return barrierCountLimit;
399     }
400 }