Merge "DeviceManager map key change"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.TimerTask;
18 import java.util.Collections;
19 import java.util.Iterator;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.ScheduledThreadPoolExecutor;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
32 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
39 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
40 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
43 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
44 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
45 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
46 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  *
57  */
58 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
59
60     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
61
62     private final long globalNotificationQuota;
63     private final boolean switchFeaturesMandatory;
64
65     private final int spyRate = 10;
66
67     private final DataBroker dataBroker;
68     private TranslatorLibrary translatorLibrary;
69     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
70     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
71     private NotificationPublishService notificationPublishService;
72
73     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
74
75     private final long barrierIntervalNanos;
76     private final int barrierCountLimit;
77     private ExtensionConverterProvider extensionConverterProvider;
78     private ScheduledThreadPoolExecutor spyPool;
79
80     private final LifecycleConductor conductor;
81     private boolean isStatisticsRpcEnabled;
82
83     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
84                              final long globalNotificationQuota, final boolean switchFeaturesMandatory,
85                              final long barrierInterval, final int barrierCountLimit,
86                              final LifecycleConductor lifecycleConductor) {
87         this.switchFeaturesMandatory = switchFeaturesMandatory;
88         this.globalNotificationQuota = globalNotificationQuota;
89         this.dataBroker = Preconditions.checkNotNull(dataBroker);
90         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
91         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
92
93         final NodesBuilder nodesBuilder = new NodesBuilder();
94         nodesBuilder.setNode(Collections.<Node>emptyList());
95         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
96         try {
97             tx.submit().get();
98         } catch (ExecutionException | InterruptedException e) {
99             LOG.error("Creation of node failed.", e);
100             throw new IllegalStateException(e);
101         }
102
103         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
104         this.barrierCountLimit = barrierCountLimit;
105
106         this.conductor = lifecycleConductor;
107         spyPool = new ScheduledThreadPoolExecutor(1);
108     }
109
110
111     @Override
112     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
113         this.deviceInitPhaseHandler = handler;
114     }
115
116     @Override
117     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception {
118         // final phase - we have to add new Device to MD-SAL DataStore
119         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
120         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
121         ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
122         deviceContext.onPublished();
123     }
124
125     @Override
126     public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
127         Preconditions.checkArgument(connectionContext != null);
128
129         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
130         /**
131          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
132          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
133          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
134          */
135          if (deviceContexts.containsKey(deviceInfo.getNodeId())) {
136             LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
137              return false;
138          }
139
140         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
141                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
142
143         // Add Disconnect handler
144         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
145         // Cache this for clarity
146         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
147
148         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
149         connectionAdapter.setPacketInFiltering(true);
150
151         final Short version = connectionContext.getFeatures().getVersion();
152         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
153
154         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
155         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
156                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
157         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
158
159         final DeviceState deviceState = createDeviceState(connectionContext);
160         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext,
161                 deviceState,
162                 dataBroker,
163                 conductor,
164                 outboundQueueProvider,
165                 translatorLibrary,
166                 switchFeaturesMandatory);
167
168         Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
169
170         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
171         deviceContext.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
172         deviceContext.setNotificationPublishService(notificationPublishService);
173
174         updatePacketInRateLimiters();
175
176         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
177                 connectionAdapter, deviceContext);
178         connectionAdapter.setMessageListener(messageListener);
179         deviceState.setValid(true);
180
181         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo());
182
183         return true;
184     }
185
186     private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) {
187         return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
188     }
189
190     private void updatePacketInRateLimiters() {
191         synchronized (deviceContexts) {
192             final int deviceContextsSize = deviceContexts.size();
193             if (deviceContextsSize > 0) {
194                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
195                 if (freshNotificationLimit < 100) {
196                     freshNotificationLimit = 100;
197                 }
198                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
199                 for (final DeviceContext deviceContext : deviceContexts.values()) {
200                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
201                 }
202             }
203         }
204     }
205
206     @Override
207     public TranslatorLibrary oook() {
208         return translatorLibrary;
209     }
210
211     @Override
212     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
213         this.translatorLibrary = translatorLibrary;
214     }
215
216     @Override
217     public void setNotificationPublishService(final NotificationPublishService notificationService) {
218         notificationPublishService = notificationService;
219     }
220
221     @Override
222     public void close() {
223         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
224                 iterator.hasNext();) {
225             final DeviceContext deviceCtx = iterator.next();
226             deviceCtx.shutdownConnection();
227             deviceCtx.shuttingDownDataStoreTransactions();
228         }
229
230         if (spyPool != null) {
231             spyPool.shutdownNow();
232             spyPool = null;
233         }
234     }
235
236     @Override
237     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
238         LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
239         deviceContexts.remove(deviceInfo.getNodeId());
240         updatePacketInRateLimiters();
241     }
242
243     @Override
244     public void initialize() {
245         spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS);
246     }
247
248     @Override
249     public DeviceContext getDeviceContextFromNodeId(final NodeId nodeId) {
250         return deviceContexts.get(nodeId);
251     }
252
253     @Override
254     public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) {
255         this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
256     }
257
258     @Override
259     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
260         this.extensionConverterProvider = extensionConverterProvider;
261     }
262
263     @Override
264     public ExtensionConverterProvider getExtensionConverterProvider() {
265         return extensionConverterProvider;
266     }
267
268     @Override
269     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
270         this.deviceTerminPhaseHandler = handler;
271     }
272
273     @Override
274     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
275         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
276         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
277         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo.getNodeId());
278
279         if (null == deviceCtx) {
280             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId());
281             return;
282         }
283
284         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
285             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
286             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
287         } else {
288             /* Device is disconnected and so we need to close TxManager */
289             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
290             Futures.addCallback(future, new FutureCallback<Void>() {
291
292                 @Override
293                 public void onSuccess(final Void result) {
294                     LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
295                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
296                 }
297
298                 @Override
299                 public void onFailure(final Throwable t) {
300                     LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
301                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
302                 }
303             });
304             /* Add timer for Close TxManager because it could fain ind cluster without notification */
305             final TimerTask timerTask = timeout -> {
306                 if (!future.isDone()) {
307                     LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
308                     future.cancel(false);
309                 }
310             };
311             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
312         }
313     }
314
315     @VisibleForTesting
316     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
317         deviceContexts.put(deviceInfo, deviceContext);
318     }
319 }