2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
49 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
50 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
53 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
55 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
56 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
57 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
59 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
60 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
61 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
62 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
63 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
64 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
65 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
66 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
68 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
70 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
71 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
72 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
73 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
99 import org.opendaylight.yangtools.yang.binding.DataContainer;
100 import org.opendaylight.yangtools.yang.binding.DataObject;
101 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
102 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
106 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
108 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
110 // TODO: drain factor should be parametrized
111 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
112 // TODO: low water mark factor should be parametrized
113 private static final float LOW_WATERMARK_FACTOR = 0.75f;
114 // TODO: high water mark factor should be parametrized
115 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
117 // Timeout in milliseconds after what we will give up on initializing device
118 private static final int DEVICE_INIT_TIMEOUT = 9000;
120 // Timeout in milliseconds after what we will give up on closing transaction chain
121 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
123 private static final int LOW_WATERMARK = 1000;
124 private static final int HIGH_WATERMARK = 2000;
126 private final MultipartWriterProvider writerProvider;
127 private final HashedWheelTimer hashedWheelTimer;
128 private final DeviceState deviceState;
129 private final DataBroker dataBroker;
130 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
131 private final MessageSpy messageSpy;
132 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
133 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
134 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
135 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
136 private final TranslatorLibrary translatorLibrary;
137 private final ConvertorExecutor convertorExecutor;
138 private final DeviceInitializerProvider deviceInitializerProvider;
139 private final PacketInRateLimiter packetInLimiter;
140 private final DeviceInfo deviceInfo;
141 private final ConnectionContext primaryConnectionContext;
142 private final boolean skipTableFeatures;
143 private final boolean switchFeaturesMandatory;
144 private final boolean isFlowRemovedNotificationOn;
145 private final boolean useSingleLayerSerialization;
146 private final AtomicBoolean initialized = new AtomicBoolean(false);
147 private final AtomicBoolean hasState = new AtomicBoolean(false);
148 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
149 private NotificationPublishService notificationPublishService;
150 private TransactionChainManager transactionChainManager;
151 private DeviceFlowRegistry deviceFlowRegistry;
152 private DeviceGroupRegistry deviceGroupRegistry;
153 private DeviceMeterRegistry deviceMeterRegistry;
154 private ExtensionConverterProvider extensionConverterProvider;
155 private ContextChainMastershipWatcher contextChainMastershipWatcher;
157 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
158 @Nonnull final DataBroker dataBroker,
159 @Nonnull final MessageSpy messageSpy,
160 @Nonnull final TranslatorLibrary translatorLibrary,
161 final ConvertorExecutor convertorExecutor,
162 final boolean skipTableFeatures,
163 final HashedWheelTimer hashedWheelTimer,
164 final boolean useSingleLayerSerialization,
165 final DeviceInitializerProvider deviceInitializerProvider,
166 final boolean isFlowRemovedNotificationOn,
167 final boolean switchFeaturesMandatory) {
169 this.primaryConnectionContext = primaryConnectionContext;
170 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
171 this.hashedWheelTimer = hashedWheelTimer;
172 this.deviceInitializerProvider = deviceInitializerProvider;
173 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
174 this.switchFeaturesMandatory = switchFeaturesMandatory;
175 this.deviceState = new DeviceStateImpl();
176 this.dataBroker = dataBroker;
177 this.messageSpy = messageSpy;
179 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
180 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
182 this.translatorLibrary = translatorLibrary;
183 this.portStatusTranslator = translatorLibrary.lookupTranslator(
184 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
185 this.packetInTranslator = translatorLibrary.lookupTranslator(
186 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
188 .PacketIn.class.getName()));
189 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
190 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
192 this.convertorExecutor = convertorExecutor;
193 this.skipTableFeatures = skipTableFeatures;
194 this.useSingleLayerSerialization = useSingleLayerSerialization;
195 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
199 public boolean initialSubmitTransaction() {
200 if (!initialized.get()) {
204 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
205 isInitialTransactionSubmitted.set(initialSubmit);
206 return initialSubmit;
210 public DeviceState getDeviceState() {
215 public ReadOnlyTransaction getReadTransaction() {
216 return dataBroker.newReadOnlyTransaction();
220 public boolean isTransactionsEnabled() {
221 return isInitialTransactionSubmitted.get();
225 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
226 final InstanceIdentifier<T> path,
228 if (initialized.get()) {
229 transactionChainManager.writeToTransaction(store, path, data, false);
234 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
235 final InstanceIdentifier<T> path,
237 if (initialized.get()) {
238 transactionChainManager.writeToTransaction(store, path, data, true);
243 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
244 final InstanceIdentifier<T> path) {
245 if (initialized.get()) {
246 transactionChainManager.addDeleteOperationToTxChain(store, path);
251 public boolean submitTransaction() {
252 return initialized.get() && transactionChainManager.submitTransaction();
256 public ConnectionContext getPrimaryConnectionContext() {
257 return primaryConnectionContext;
261 public DeviceFlowRegistry getDeviceFlowRegistry() {
262 return deviceFlowRegistry;
266 public DeviceGroupRegistry getDeviceGroupRegistry() {
267 return deviceGroupRegistry;
271 public DeviceMeterRegistry getDeviceMeterRegistry() {
272 return deviceMeterRegistry;
276 public void processReply(final OfHeader ofHeader) {
277 messageSpy.spyMessage(
278 ofHeader.getImplementedInterface(),
279 ofHeader instanceof Error
280 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
281 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
285 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
286 ofHeaderList.forEach(header -> messageSpy.spyMessage(
287 header.getImplementedInterface(),
288 header instanceof Error
289 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
290 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
294 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
295 //1. translate to general flow (table, priority, match, cookie)
296 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
297 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
299 if (isFlowRemovedNotificationOn) {
300 // Trigger off a notification
301 notificationPublishService.offerNotification(flowRemovedNotification);
306 @SuppressWarnings("checkstyle:IllegalCatch")
307 public void processPortStatusMessage(final PortStatusMessage portStatus) {
308 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
309 .FROM_SWITCH_PUBLISHED_SUCCESS);
311 if (initialized.get()) {
313 writePortStatusMessage(portStatus);
315 } catch (final Exception e) {
316 LOG.warn("Error processing port status message for port {} on device {}",
317 portStatus.getPortNo(), getDeviceInfo(), e);
319 } else if (!hasState.get()) {
320 primaryConnectionContext.handlePortStatusMessage(portStatus);
324 private void writePortStatusMessage(final PortStatus portStatusMessage) {
325 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
326 .translate(portStatusMessage, getDeviceInfo(), null);
328 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
329 .getNodeInstanceIdentifier()
330 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
331 .nodeConnectorIdfromDatapathPortNo(
332 deviceInfo.getDatapathId(),
333 portStatusMessage.getPortNo(),
334 OpenflowVersion.get(deviceInfo.getVersion()))));
336 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
337 .setKey(iiToNodeConnector.getKey())
338 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
339 FlowCapableNodeConnectorStatisticsDataBuilder().build())
340 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
343 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
344 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
350 public void processPacketInMessage(final PacketInMessage packetInMessage) {
351 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
352 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
355 private void handlePacketInMessage(final PacketIn packetIn,
356 final Class<?> implementedInterface,
358 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
359 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
361 if (packetIn == null) {
362 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
363 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
367 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
369 // Try to get ingress from match
370 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
371 ? packetIn.getIngress() : Optional.ofNullable(match)
372 .map(Match::getInPort)
373 .map(nodeConnectorId -> InventoryDataServiceUtil
374 .portNumberfromNodeConnectorId(
377 .map(portNumber -> InventoryDataServiceUtil
378 .nodeConnectorRefFromDatapathIdPortno(
379 deviceInfo.getDatapathId(),
384 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
386 if (!packetInLimiter.acquirePermit()) {
387 LOG.debug("Packet limited");
388 // TODO: save packet into emergency slot if possible
389 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
390 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
394 final ListenableFuture<?> offerNotification = notificationPublishService
395 .offerNotification(new PacketReceivedBuilder(packetIn)
396 .setIngress(nodeConnectorRef)
397 .setMatch(MatchUtil.transformMatch(match,
398 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
402 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
403 LOG.debug("notification offer rejected");
404 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
405 packetInLimiter.drainLowWaterMark();
406 packetInLimiter.releasePermit();
410 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
412 public void onSuccess(final Object result) {
413 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
414 packetInLimiter.releasePermit();
418 public void onFailure(final Throwable throwable) {
419 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
420 .FROM_SWITCH_NOTIFICATION_REJECTED);
421 LOG.debug("notification offer failed: {}", throwable.getMessage());
422 LOG.trace("notification offer failed..", throwable);
423 packetInLimiter.releasePermit();
425 }, MoreExecutors.directExecutor());
429 public void processExperimenterMessage(final ExperimenterMessage notification) {
431 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
432 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
433 getDeviceInfo().getVersion(),
434 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
435 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
436 extensionConverterProvider.getMessageConverter(key);
437 if (messageConverter == null) {
438 LOG.warn("custom converter for {}[OF:{}] not found",
439 notification.getExperimenterDataOfChoice().getImplementedInterface(),
440 getDeviceInfo().getVersion());
443 // build notification
444 final ExperimenterMessageOfChoice messageOfChoice;
446 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
447 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
448 ExperimenterMessageFromDevBuilder()
449 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
450 .setExperimenterMessageOfChoice(messageOfChoice);
452 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
453 } catch (final ConversionException e) {
454 LOG.error("Conversion of experimenter notification failed", e);
459 public boolean processAlienMessage(final OfHeader message) {
460 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
462 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
463 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
464 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
465 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
466 .rev130709.PacketInMessage.class.cast(message);
468 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
476 public TranslatorLibrary oook() {
477 return translatorLibrary;
481 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
482 this.notificationPublishService = notificationPublishService;
486 public MessageSpy getMessageSpy() {
491 public void onPublished() {
492 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
496 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
498 return new MultiMsgCollectorImpl<>(this, requestContext);
502 public void updatePacketInRateLimit(final long upperBound) {
503 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
504 (int) (HIGH_WATERMARK_FACTOR * upperBound));
508 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
509 this.extensionConverterProvider = extensionConverterProvider;
513 public ExtensionConverterProvider getExtensionConverterProvider() {
514 return extensionConverterProvider;
518 TransactionChainManager getTransactionChainManager() {
519 return this.transactionChainManager;
523 public ListenableFuture<Void> closeServiceInstance() {
524 final ListenableFuture<Void> listenableFuture = initialized.get()
525 ? transactionChainManager.deactivateTransactionManager()
526 : Futures.immediateFuture(null);
528 hashedWheelTimer.newTimeout((timerTask) -> {
529 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
530 listenableFuture.cancel(true);
532 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
534 return listenableFuture;
538 public DeviceInfo getDeviceInfo() {
539 return this.deviceInfo;
543 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
544 this.contextChainMastershipWatcher = newWatcher;
549 public ServiceGroupIdentifier getIdentifier() {
550 return deviceInfo.getServiceIdentifier();
554 public void close() {
555 // Close all datastore registries and transactions
556 if (initialized.getAndSet(false)) {
557 deviceGroupRegistry.close();
558 deviceFlowRegistry.close();
559 deviceMeterRegistry.close();
561 final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
563 Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
565 public void onSuccess(@Nullable final Void result) {
566 transactionChainManager.close();
567 transactionChainManager = null;
571 public void onFailure(final Throwable throwable) {
572 transactionChainManager.close();
573 transactionChainManager = null;
575 }, MoreExecutors.directExecutor());
578 requestContexts.forEach(requestContext -> RequestContextUtil
579 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
580 requestContexts.clear();
584 public boolean canUseSingleLayerSerialization() {
585 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
588 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
590 @SuppressWarnings({"checkstyle:IllegalCatch"})
591 public void instantiateServiceInstance() {
592 lazyTransactionManagerInitialization();
595 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
596 .retrieveAndClearPortStatusMessages();
598 portStatusMessages.forEach(this::writePortStatusMessage);
600 } catch (final Exception ex) {
601 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
602 deviceInfo.toString(), ex.toString()), ex);
605 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
606 .lookup(deviceInfo.getVersion());
608 if (initializer.isPresent()) {
609 final Future<Void> initialize = initializer
611 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
614 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
615 } catch (TimeoutException ex) {
616 initialize.cancel(true);
617 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
618 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
619 } catch (ExecutionException | InterruptedException ex) {
620 throw new RuntimeException(
621 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
624 throw new RuntimeException(String.format("Unsupported version %s for device %s",
625 deviceInfo.getVersion(),
626 deviceInfo.toString()));
629 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
630 getDeviceFlowRegistry().fill();
631 Futures.addCallback(deviceFlowRegistryFill,
632 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
633 MoreExecutors.directExecutor());
637 void lazyTransactionManagerInitialization() {
638 if (!this.initialized.get()) {
639 if (LOG.isDebugEnabled()) {
640 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
642 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
643 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
644 deviceInfo.getNodeInstanceIdentifier());
645 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
646 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
649 transactionChainManager.activateTransactionManager();
650 initialized.set(true);
655 public <T> RequestContext<T> createRequestContext() {
656 final Long xid = deviceInfo.reserveXidForDeviceMessage();
658 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
660 public void close() {
661 requestContexts.remove(this);
665 requestContexts.add(abstractRequestContext);
666 return abstractRequestContext;
670 public void onStateAcquired(final ContextChainState state) {
674 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
675 .Optional<FlowCapableNode>>> {
676 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
677 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
679 DeviceFlowRegistryCallback(
680 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
681 ContextChainMastershipWatcher contextChainMastershipWatcher) {
682 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
683 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
687 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
688 if (LOG.isDebugEnabled()) {
689 // Count all flows we read from datastore for debugging purposes.
690 // This number do not always represent how many flows were actually added
691 // to DeviceFlowRegistry, because of possible duplicates.
692 long flowCount = Optional.ofNullable(result)
693 .map(Collections::singleton)
694 .orElse(Collections.emptySet())
696 .flatMap(Collection::stream)
697 .filter(Objects::nonNull)
698 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
699 .filter(Objects::nonNull)
700 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
701 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
702 .filter(Objects::nonNull)
703 .filter(table -> Objects.nonNull(table.getFlow()))
704 .flatMap(table -> table.getFlow().stream())
705 .filter(Objects::nonNull)
708 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
710 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
711 .INITIAL_FLOW_REGISTRY_FILL);
715 public void onFailure(Throwable throwable) {
716 if (deviceFlowRegistryFill.isCancelled()) {
717 if (LOG.isDebugEnabled()) {
718 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
721 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
724 contextChainMastershipWatcher.onNotAbleToStartMastership(
726 "Was not able to fill flow registry on device",