Add single layer deserialization support
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
39 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
42 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
45 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
50 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
51 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
52 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
53 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
54 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
55 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
56 import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
57 import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
58 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  *
70  */
71 public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
72
73     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
74
75     private final long globalNotificationQuota;
76     private final boolean switchFeaturesMandatory;
77     private boolean isFlowRemovedNotificationOn;
78     private boolean skipTableFeatures;
79     private static final int SPY_RATE = 10;
80
81     private final DataBroker dataBroker;
82     private final DeviceInitializerProvider deviceInitializerProvider;
83     private final ConvertorExecutor convertorExecutor;
84     private TranslatorLibrary translatorLibrary;
85     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
86     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
87
88     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
89     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
90
91     private long barrierIntervalNanos;
92     private int barrierCountLimit;
93
94     private ExtensionConverterProvider extensionConverterProvider;
95     private ScheduledThreadPoolExecutor spyPool;
96     private final ClusterSingletonServiceProvider singletonServiceProvider;
97     private final NotificationPublishService notificationPublishService;
98     private final MessageSpy messageSpy;
99     private final HashedWheelTimer hashedWheelTimer;
100     private final boolean useSingleLayerSerialization;
101
102     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
103                              final long globalNotificationQuota,
104                              final boolean switchFeaturesMandatory,
105                              final long barrierInterval,
106                              final int barrierCountLimit,
107                              final MessageSpy messageSpy,
108                              final boolean isFlowRemovedNotificationOn,
109                              final ClusterSingletonServiceProvider singletonServiceProvider,
110                              final NotificationPublishService notificationPublishService,
111                              final HashedWheelTimer hashedWheelTimer,
112                              final ConvertorExecutor convertorExecutor,
113                              final boolean skipTableFeatures,
114                              final boolean useSingleLayerSerialization,
115                              final DeviceInitializerProvider deviceInitializerProvider) {
116
117         this.dataBroker = dataBroker;
118         this.deviceInitializerProvider = deviceInitializerProvider;
119
120         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
121         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
122         final NodesBuilder nodesBuilder = new NodesBuilder();
123         nodesBuilder.setNode(Collections.<Node>emptyList());
124         tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
125         try {
126             tx.submit().get();
127         } catch (ExecutionException | InterruptedException e) {
128             LOG.error("Creation of node failed.", e);
129             throw new IllegalStateException(e);
130         }
131
132         this.switchFeaturesMandatory = switchFeaturesMandatory;
133         this.globalNotificationQuota = globalNotificationQuota;
134         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
135         this.skipTableFeatures = skipTableFeatures;
136         this.convertorExecutor = convertorExecutor;
137         this.hashedWheelTimer = hashedWheelTimer;
138         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
139         this.barrierCountLimit = barrierCountLimit;
140         this.spyPool = new ScheduledThreadPoolExecutor(1);
141         this.singletonServiceProvider = singletonServiceProvider;
142         this.notificationPublishService = notificationPublishService;
143         this.messageSpy = messageSpy;
144         this.useSingleLayerSerialization = useSingleLayerSerialization;
145     }
146
147
148     @Override
149     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
150         this.deviceInitPhaseHandler = handler;
151     }
152
153     @Override
154     public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
155         // final phase - we have to add new Device to MD-SAL DataStore
156         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
157         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
158         deviceContext.onPublished();
159         lifecycleService.registerDeviceRemovedHandler(this);
160         lifecycleService.registerService(this.singletonServiceProvider);
161     }
162
163     @Override
164     public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
165         Preconditions.checkArgument(connectionContext != null);
166         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
167
168         /*
169          * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
170          * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
171          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
172          */
173          if (deviceContexts.containsKey(deviceInfo)) {
174              DeviceContext deviceContext = deviceContexts.get(deviceInfo);
175              LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
176              if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
177                  LOG.warn("Node {} context state not in TERMINATION state.",
178                          connectionContext.getDeviceInfo().getLOGValue());
179                  return ConnectionStatus.ALREADY_CONNECTED;
180              } else {
181                  return ConnectionStatus.CLOSING;
182              }
183          }
184
185         LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
186                 connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
187
188         // Add Disconnect handler
189         connectionContext.setDeviceDisconnectedHandler(this);
190
191         // Cache this for clarity
192         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
193
194         // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
195         connectionAdapter.setPacketInFiltering(true);
196
197         final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
198
199         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
200         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
201                 connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
202         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
203
204         final LifecycleService lifecycleService = new LifecycleServiceImpl();
205         final DeviceContext deviceContext = new DeviceContextImpl(
206                 connectionContext,
207                 dataBroker,
208                 messageSpy,
209                 translatorLibrary,
210                 this,
211                 convertorExecutor,
212                 skipTableFeatures,
213                 hashedWheelTimer,
214                 this,
215                 useSingleLayerSerialization,
216                 deviceInitializerProvider);
217
218         deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
219         deviceContexts.put(deviceInfo, deviceContext);
220
221         lifecycleService.setDeviceContext(deviceContext);
222         deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
223
224         lifecycleServices.put(deviceInfo, lifecycleService);
225
226         addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService);
227
228         deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
229
230         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
231         deviceContext.setNotificationPublishService(notificationPublishService);
232
233         updatePacketInRateLimiters();
234
235         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
236                 connectionAdapter, deviceContext);
237
238         connectionAdapter.setMessageListener(messageListener);
239         deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
240         return ConnectionStatus.MAY_CONTINUE;
241     }
242
243     @Override
244     public TranslatorLibrary oook() {
245         return translatorLibrary;
246     }
247
248     @Override
249     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
250         this.translatorLibrary = translatorLibrary;
251     }
252
253     @Override
254     public void close() {
255         for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
256                 iterator.hasNext();) {
257             final DeviceContext deviceCtx = iterator.next();
258             deviceCtx.shutdownConnection();
259             deviceCtx.shuttingDownDataStoreTransactions();
260         }
261
262         Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow);
263         spyPool = null;
264
265     }
266
267     @Override
268     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
269         updatePacketInRateLimiters();
270         Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
271     }
272
273     @Override
274     public void initialize() {
275         spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS);
276     }
277
278     @Override
279     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
280         this.extensionConverterProvider = extensionConverterProvider;
281     }
282
283     @Override
284     public ExtensionConverterProvider getExtensionConverterProvider() {
285         return extensionConverterProvider;
286     }
287
288     @Override
289     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
290         this.deviceTerminPhaseHandler = handler;
291     }
292
293     @Override
294     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
295         LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
296         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
297         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
298
299         if (Objects.isNull(deviceCtx)) {
300             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
301             return;
302         }
303
304         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
305             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
306             return;
307         }
308
309         deviceCtx.close();
310
311         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
312             LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
313             // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
314             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
315         }
316
317         // TODO: Auxiliary connections supported ?
318         // Device is disconnected and so we need to close TxManager
319         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
320         Futures.addCallback(future, new FutureCallback<Void>() {
321             @Override
322             public void onSuccess(final Void result) {
323                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
324                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
325             }
326
327             @Override
328             public void onFailure(final Throwable t) {
329                 LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue());
330                 LOG.trace("TxChainManager failed by closing. ", t);
331                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
332             }
333         });
334
335         // Add timer for Close TxManager because it could fail in cluster without notification
336         final TimerTask timerTask = timeout -> {
337             if (!future.isDone()) {
338                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
339                 future.cancel(false);
340             }
341         };
342
343         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
344     }
345
346     @VisibleForTesting
347     void addDeviceContextToMap(final DeviceInfo deviceInfo, final DeviceContext deviceContext){
348         deviceContexts.put(deviceInfo, deviceContext);
349     }
350
351     @Override
352     public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
353         this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
354     }
355
356     @Override
357     public boolean isFlowRemovedNotificationOn() {
358         return this.isFlowRemovedNotificationOn;
359     }
360
361
362     @Override
363     public void setSkipTableFeatures(boolean skipTableFeaturesValue) {
364         skipTableFeatures = skipTableFeaturesValue;
365     }
366
367     @Override
368     public void setBarrierCountLimit(final int barrierCountLimit) {
369         this.barrierCountLimit = barrierCountLimit;
370     }
371
372     @Override
373     public void setBarrierInterval(final long barrierTimeoutLimit) {
374         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
375     }
376
377     @Override
378     public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
379         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
380         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
381         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
382
383         Futures.addCallback(delFuture, new FutureCallback<Void>() {
384             @Override
385             public void onSuccess(final Void result) {
386                 if (LOG.isDebugEnabled()) {
387                     LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
388                 }
389             }
390
391             @Override
392             public void onFailure(@Nonnull final Throwable t) {
393                 LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
394             }
395         });
396
397         return delFuture;
398     }
399
400
401     private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) {
402         Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
403             @Override
404             public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
405                 if (LOG.isDebugEnabled()) {
406                     LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
407                 }
408             }
409
410             @Override
411             public void onFailure(Throwable throwable) {
412                 LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue());
413                 lifecycleService.closeConnection();
414             }
415         });
416     }
417
418     private void updatePacketInRateLimiters() {
419         synchronized (deviceContexts) {
420             final int deviceContextsSize = deviceContexts.size();
421             if (deviceContextsSize > 0) {
422                 long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
423                 if (freshNotificationLimit < 100) {
424                     freshNotificationLimit = 100;
425                 }
426                 if (LOG.isDebugEnabled()) {
427                     LOG.debug("fresh notification limit = {}", freshNotificationLimit);
428                 }
429                 for (final DeviceContext deviceContext : deviceContexts.values()) {
430                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
431                 }
432             }
433         }
434     }
435
436     public void onDeviceRemoved(DeviceInfo deviceInfo) {
437         deviceContexts.remove(deviceInfo);
438         LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
439
440         lifecycleServices.remove(deviceInfo);
441         LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
442     }
443 }