Merge "Fix "Bug 5589 - Deprecate PortNumberCache" - New class that can convert Device...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ScheduledThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
35 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
44 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
45 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
46 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
47 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  *
58  */
59 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
60
61     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
62
63     private static final long TICK_DURATION = 10; // 0.5 sec.
64     private final long globalNotificationQuota;
65     private final boolean switchFeaturesMandatory;
66
67     private ScheduledThreadPoolExecutor spyPool;
68     private final int spyRate = 10;
69
70     private final DataBroker dataBroker;
71     private final HashedWheelTimer hashedWheelTimer;
72     private TranslatorLibrary translatorLibrary;
73     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
74     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
75     private NotificationService notificationService;
76     private NotificationPublishService notificationPublishService;
77
78     private final ConcurrentMap<NodeId, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
79     private final MessageIntelligenceAgency messageIntelligenceAgency;
80
81     private final long barrierIntervalNanos;
82     private final int barrierCountLimit;
83     private ExtensionConverterProvider extensionConverterProvider;
84
85     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
86                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
87                              final long globalNotificationQuota, final boolean switchFeaturesMandatory,
88                              final long barrierInterval, final int barrierCountLimit) {
89         this.switchFeaturesMandatory = switchFeaturesMandatory;
90         this.globalNotificationQuota = globalNotificationQuota;
91         this.dataBroker = Preconditions.checkNotNull(dataBroker);
92         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
93         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
94         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
95
96         final NodesBuilder nodesBuilder = new NodesBuilder();
97         nodesBuilder.setNode(Collections.<Node>emptyList());
98         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
99         try {
100             tx.submit().get();
101         } catch (ExecutionException | InterruptedException e) {
102             LOG.error("Creation of node failed.", e);
103             throw new IllegalStateException(e);
104         }
105
106         this.messageIntelligenceAgency = messageIntelligenceAgency;
107         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
108         this.barrierCountLimit = barrierCountLimit;
109     }
110
111
112     @Override
113     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
114         this.deviceInitPhaseHandler = handler;
115     }
116
117     @Override
118     public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
119         // final phase - we have to add new Device to MD-SAL DataStore
120         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
121         Preconditions.checkNotNull(deviceContext);
122         ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
123         deviceContext.onPublished();
124     }
125
126     @Override
127     public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
128         Preconditions.checkArgument(connectionContext != null);
129         Preconditions.checkState(!deviceContexts.containsKey(connectionContext.getNodeId()),
130                 "Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
131                 connectionContext.getNodeId()
132         );
133         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
134                 connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId());
135
136         // Add Disconnect handler
137         connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
138         // Cache this for clarity
139         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
140
141         //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
142         connectionAdapter.setPacketInFiltering(true);
143
144         final Short version = connectionContext.getFeatures().getVersion();
145         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
146
147         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
148         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
149                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
150         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
151
152         final DeviceState deviceState = createDeviceState(connectionContext);
153         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
154                 hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
155
156         Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null, "DeviceCtx still not closed.");
157         deviceContext.addDeviceContextClosedHandler(this);
158
159         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
160         deviceContext.setNotificationService(notificationService);
161         deviceContext.setNotificationPublishService(notificationPublishService);
162
163         updatePacketInRateLimiters();
164
165         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
166                 connectionAdapter, deviceContext);
167         connectionAdapter.setMessageListener(messageListener);
168         deviceState.setValid(true);
169
170         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
171     }
172
173     private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) {
174         return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
175     }
176
177     private void updatePacketInRateLimiters() {
178         synchronized (deviceContexts) {
179             final int deviceContextsSize = deviceContexts.size();
180             if (deviceContextsSize > 0) {
181                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
182                 if (freshNotificationLimit < 100) {
183                     freshNotificationLimit = 100;
184                 }
185                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
186                 for (final DeviceContext deviceContext : deviceContexts.values()) {
187                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
188                 }
189             }
190         }
191     }
192
193     @Override
194     public TranslatorLibrary oook() {
195         return translatorLibrary;
196     }
197
198     @Override
199     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
200         this.translatorLibrary = translatorLibrary;
201     }
202
203     @Override
204     public void setNotificationService(final NotificationService notificationServiceParam) {
205         notificationService = notificationServiceParam;
206     }
207
208     @Override
209     public void setNotificationPublishService(final NotificationPublishService notificationService) {
210         notificationPublishService = notificationService;
211     }
212
213     @Override
214     public void close() {
215         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
216                 iterator.hasNext();) {
217             final DeviceContext deviceCtx = iterator.next();
218             deviceCtx.shutdownConnection();
219             deviceCtx.shuttingDownDataStoreTransactions();
220         }
221
222         if (spyPool != null) {
223             spyPool.shutdownNow();
224             spyPool = null;
225         }
226     }
227
228     @Override
229     public void onDeviceContextLevelDown(final DeviceContext deviceContext) {
230         LOG.debug("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId());
231         deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId(), deviceContext);
232         updatePacketInRateLimiters();
233     }
234
235     @Override
236     public void initialize() {
237         spyPool = new ScheduledThreadPoolExecutor(1);
238         spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS);
239     }
240
241     @Override
242     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
243         this.extensionConverterProvider = extensionConverterProvider;
244     }
245
246     @Override
247     public ExtensionConverterProvider getExtensionConverterProvider() {
248         return extensionConverterProvider;
249     }
250
251     @Override
252     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
253         this.deviceTerminPhaseHandler = handler;
254     }
255
256     @Override
257     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
258         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
259         Preconditions.checkArgument(connectionContext != null);
260         final NodeId nodeId = connectionContext.getNodeId();
261         final DeviceContext deviceCtx = this.deviceContexts.get(nodeId);
262
263         if (null == deviceCtx) {
264             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.",
265                     connectionContext.getNodeId());
266             return;
267         }
268
269         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
270             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
271             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
272         } else {
273             /* Device is disconnected and so we need to close TxManager */
274             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
275             Futures.addCallback(future, new FutureCallback<Void>() {
276
277                 @Override
278                 public void onSuccess(final Void result) {
279                     LOG.debug("TxChainManager for device {} is closed successful.", deviceCtx.getDeviceState().getNodeId());
280                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
281                 }
282
283                 @Override
284                 public void onFailure(final Throwable t) {
285                     LOG.warn("TxChainManager for device {} failed by closing.", deviceCtx.getDeviceState().getNodeId(), t);
286                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
287                 }
288             });
289             /* Add timer for Close TxManager because it could fain ind cluster without notification */
290             final TimerTask timerTask = new TimerTask() {
291
292                 @Override
293                 public void run(final Timeout timeout) throws Exception {
294                     if (!future.isDone()) {
295                         LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.",
296                                 deviceCtx.getDeviceState().getNodeId());
297                         future.cancel(false);
298                     }
299                 }
300             };
301             deviceCtx.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS);
302         }
303     }
304 }