Merge "Generify Managers"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ScheduledThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
32 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
33 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
44 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
45 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
46 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
47 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  *
57  */
58 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
59
60     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
61
62     private final long globalNotificationQuota;
63     private final boolean switchFeaturesMandatory;
64
65     private final int spyRate = 10;
66
67     private final DataBroker dataBroker;
68     private TranslatorLibrary translatorLibrary;
69     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
70     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
71     private NotificationPublishService notificationPublishService;
72
73     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
74
75     private final long barrierIntervalNanos;
76     private final int barrierCountLimit;
77     private ExtensionConverterProvider extensionConverterProvider;
78     private ScheduledThreadPoolExecutor spyPool;
79
80     private final LifecycleConductor conductor;
81     private boolean isStatisticsRpcEnabled;
82
83     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
84                              final long globalNotificationQuota, final boolean switchFeaturesMandatory,
85                              final long barrierInterval, final int barrierCountLimit,
86                              final LifecycleConductor lifecycleConductor) {
87         this.switchFeaturesMandatory = switchFeaturesMandatory;
88         this.globalNotificationQuota = globalNotificationQuota;
89         this.dataBroker = Preconditions.checkNotNull(dataBroker);
90         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
91         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
92
93         final NodesBuilder nodesBuilder = new NodesBuilder();
94         nodesBuilder.setNode(Collections.<Node>emptyList());
95         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
96         try {
97             tx.submit().get();
98         } catch (ExecutionException | InterruptedException e) {
99             LOG.error("Creation of node failed.", e);
100             throw new IllegalStateException(e);
101         }
102
103         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
104         this.barrierCountLimit = barrierCountLimit;
105
106         this.conductor = lifecycleConductor;
107         spyPool = new ScheduledThreadPoolExecutor(1);
108     }
109
110
111     @Override
112     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
113         this.deviceInitPhaseHandler = handler;
114     }
115
116     @Override
117     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception {
118         // final phase - we have to add new Device to MD-SAL DataStore
119         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
120         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
121         ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
122         deviceContext.onPublished();
123     }
124
125     @Override
126     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
127         Preconditions.checkArgument(connectionContext != null);
128
129         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
130         /**
131          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
132          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
133          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
134          */
135          if (deviceContexts.containsKey(deviceInfo)) {
136             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
137              return false;
138          }
139
140         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
141                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
142
143         // Add Disconnect handler
144         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
145         // Cache this for clarity
146         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
147
148         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
149         connectionAdapter.setPacketInFiltering(true);
150
151         final Short version = connectionContext.getFeatures().getVersion();
152         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
153
154         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
155         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
156                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
157         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
158
159         final DeviceState deviceState = new DeviceStateImpl();
160         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
161                 deviceState,
162                 dataBroker,
163                 conductor,
164                 outboundQueueProvider,
165                 translatorLibrary,
166                 switchFeaturesMandatory);
167
168         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
169
170         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
171         deviceContext.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
172         deviceContext.setNotificationPublishService(notificationPublishService);
173
174         updatePacketInRateLimiters();
175
176         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
177                 connectionAdapter, deviceContext);
178         connectionAdapter.setMessageListener(messageListener);
179         deviceState.setValid(true);
180
181         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo());
182
183         return true;
184     }
185
186     private void updatePacketInRateLimiters() {
187         synchronized (deviceContexts) {
188             final int deviceContextsSize = deviceContexts.size();
189             if (deviceContextsSize > 0) {
190                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
191                 if (freshNotificationLimit < 100) {
192                     freshNotificationLimit = 100;
193                 }
194                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
195                 for (final DeviceContext deviceContext : deviceContexts.values()) {
196                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
197                 }
198             }
199         }
200     }
201
202     @Override
203     public TranslatorLibrary oook() {
204         return translatorLibrary;
205     }
206
207     @Override
208     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
209         this.translatorLibrary = translatorLibrary;
210     }
211
212     @Override
213     public void setNotificationPublishService(final NotificationPublishService notificationService) {
214         notificationPublishService = notificationService;
215     }
216
217     @Override
218     public void close() {
219         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
220                 iterator.hasNext();) {
221             final DeviceContext deviceCtx = iterator.next();
222             deviceCtx.shutdownConnection();
223             deviceCtx.shuttingDownDataStoreTransactions();
224         }
225
226         if (spyPool != null) {
227             spyPool.shutdownNow();
228             spyPool = null;
229         }
230     }
231
232     @Override
233     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
234         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
235         deviceContexts.remove(deviceInfo);
236         updatePacketInRateLimiters();
237     }
238
239     @Override
240     public void initialize() {
241         spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS);
242     }
243
244     @Override
245     public DeviceContext getDeviceContextFromNodeId(DeviceInfo deviceInfo) {
246         return deviceContexts.get(deviceInfo);
247     }
248
249     @Override
250     public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) {
251         this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
252     }
253
254     @Override
255     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
256         this.extensionConverterProvider = extensionConverterProvider;
257     }
258
259     @Override
260     public ExtensionConverterProvider getExtensionConverterProvider() {
261         return extensionConverterProvider;
262     }
263
264     @Override
265     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
266         this.deviceTerminPhaseHandler = handler;
267     }
268
269     @Override
270     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
271         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
272         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
273         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo.getNodeId());
274
275         if (null == deviceCtx) {
276             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
277             return;
278         }
279
280         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
281             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
282             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
283         } else {
284             /* Device is disconnected and so we need to close TxManager */
285             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
286             Futures.addCallback(future, new FutureCallback<Void>() {
287
288                 @Override
289                 public void onSuccess(final Void result) {
290                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
291                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
292                 }
293
294                 @Override
295                 public void onFailure(final Throwable t) {
296                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
297                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
298                 }
299             });
300             /* Add timer for Close TxManager because it could fain ind cluster without notification */
301             final TimerTask timerTask = timeout -> {
302                 if (!future.isDone()) {
303                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
304                     future.cancel(false);
305                 }
306             };
307             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
308         }
309     }
310
311     @VisibleForTesting
312     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
313         deviceContexts.put(deviceInfo, deviceContext);
314     }
315
316     @Override
317     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
318         return (T) deviceContexts.get(deviceInfo);
319     }
320 }