2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
41 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
48 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
49 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
50 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
54 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
55 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
57 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
58 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
59 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
60 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
61 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
62 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
64 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
65 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
66 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
69 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
70 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
71 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
72 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
98 import org.opendaylight.yangtools.yang.binding.DataContainer;
99 import org.opendaylight.yangtools.yang.binding.DataObject;
100 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
101 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
102 import org.slf4j.Logger;
103 import org.slf4j.LoggerFactory;
105 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
107 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
109 // TODO: drain factor should be parametrized
110 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
111 // TODO: low water mark factor should be parametrized
112 private static final float LOW_WATERMARK_FACTOR = 0.75f;
113 // TODO: high water mark factor should be parametrized
114 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
116 // Timeout in milliseconds after what we will give up on initializing device
117 private static final int DEVICE_INIT_TIMEOUT = 9000;
119 // Timeout in milliseconds after what we will give up on closing transaction chain
120 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
122 private static final int LOW_WATERMARK = 1000;
123 private static final int HIGH_WATERMARK = 2000;
125 private final MultipartWriterProvider writerProvider;
126 private final HashedWheelTimer hashedWheelTimer;
127 private final DeviceState deviceState;
128 private final DataBroker dataBroker;
129 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
130 private final MessageSpy messageSpy;
131 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
132 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
133 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
134 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
135 private final TranslatorLibrary translatorLibrary;
136 private final ConvertorExecutor convertorExecutor;
137 private final DeviceInitializerProvider deviceInitializerProvider;
138 private final PacketInRateLimiter packetInLimiter;
139 private final DeviceInfo deviceInfo;
140 private final ConnectionContext primaryConnectionContext;
141 private final boolean skipTableFeatures;
142 private final boolean switchFeaturesMandatory;
143 private final boolean isFlowRemovedNotificationOn;
144 private final boolean useSingleLayerSerialization;
145 private final AtomicBoolean initialized = new AtomicBoolean(false);
146 private final AtomicBoolean hasState = new AtomicBoolean(false);
147 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
148 private NotificationPublishService notificationPublishService;
149 private TransactionChainManager transactionChainManager;
150 private DeviceFlowRegistry deviceFlowRegistry;
151 private DeviceGroupRegistry deviceGroupRegistry;
152 private DeviceMeterRegistry deviceMeterRegistry;
153 private ExtensionConverterProvider extensionConverterProvider;
154 private ContextChainMastershipWatcher contextChainMastershipWatcher;
156 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
157 @Nonnull final DataBroker dataBroker,
158 @Nonnull final MessageSpy messageSpy,
159 @Nonnull final TranslatorLibrary translatorLibrary,
160 final ConvertorExecutor convertorExecutor,
161 final boolean skipTableFeatures,
162 final HashedWheelTimer hashedWheelTimer,
163 final boolean useSingleLayerSerialization,
164 final DeviceInitializerProvider deviceInitializerProvider,
165 final boolean isFlowRemovedNotificationOn,
166 final boolean switchFeaturesMandatory) {
168 this.primaryConnectionContext = primaryConnectionContext;
169 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
170 this.hashedWheelTimer = hashedWheelTimer;
171 this.deviceInitializerProvider = deviceInitializerProvider;
172 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
173 this.switchFeaturesMandatory = switchFeaturesMandatory;
174 this.deviceState = new DeviceStateImpl();
175 this.dataBroker = dataBroker;
176 this.messageSpy = messageSpy;
178 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
179 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
181 this.translatorLibrary = translatorLibrary;
182 this.portStatusTranslator = translatorLibrary.lookupTranslator(
183 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
184 this.packetInTranslator = translatorLibrary.lookupTranslator(
185 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
187 .PacketIn.class.getName()));
188 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
189 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
191 this.convertorExecutor = convertorExecutor;
192 this.skipTableFeatures = skipTableFeatures;
193 this.useSingleLayerSerialization = useSingleLayerSerialization;
194 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
198 public boolean initialSubmitTransaction() {
199 if (!initialized.get()) {
203 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
204 isInitialTransactionSubmitted.set(initialSubmit);
205 return initialSubmit;
209 public DeviceState getDeviceState() {
214 public ReadOnlyTransaction getReadTransaction() {
215 return dataBroker.newReadOnlyTransaction();
219 public boolean isTransactionsEnabled() {
220 return isInitialTransactionSubmitted.get();
224 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
225 final InstanceIdentifier<T> path,
227 if (initialized.get()) {
228 transactionChainManager.writeToTransaction(store, path, data, false);
233 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
234 final InstanceIdentifier<T> path,
236 if (initialized.get()) {
237 transactionChainManager.writeToTransaction(store, path, data, true);
242 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
243 final InstanceIdentifier<T> path) {
244 if (initialized.get()) {
245 transactionChainManager.addDeleteOperationToTxChain(store, path);
250 public boolean submitTransaction() {
251 return initialized.get() && transactionChainManager.submitTransaction();
255 public ConnectionContext getPrimaryConnectionContext() {
256 return primaryConnectionContext;
260 public DeviceFlowRegistry getDeviceFlowRegistry() {
261 return deviceFlowRegistry;
265 public DeviceGroupRegistry getDeviceGroupRegistry() {
266 return deviceGroupRegistry;
270 public DeviceMeterRegistry getDeviceMeterRegistry() {
271 return deviceMeterRegistry;
275 public void processReply(final OfHeader ofHeader) {
276 messageSpy.spyMessage(
277 ofHeader.getImplementedInterface(),
278 (ofHeader instanceof Error)
279 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
280 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
284 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
285 ofHeaderList.forEach(header -> messageSpy.spyMessage(
286 header.getImplementedInterface(),
287 (header instanceof Error)
288 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
289 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
293 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
294 //1. translate to general flow (table, priority, match, cookie)
295 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
296 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
298 if (isFlowRemovedNotificationOn) {
299 // Trigger off a notification
300 notificationPublishService.offerNotification(flowRemovedNotification);
305 @SuppressWarnings("checkstyle:IllegalCatch")
306 public void processPortStatusMessage(final PortStatusMessage portStatus) {
307 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
308 .FROM_SWITCH_PUBLISHED_SUCCESS);
310 if (initialized.get()) {
312 writePortStatusMessage(portStatus);
314 } catch (final Exception e) {
315 LOG.warn("Error processing port status message for port {} on device {}",
316 portStatus.getPortNo(), getDeviceInfo(), e);
318 } else if (!hasState.get()) {
319 primaryConnectionContext.handlePortStatusMessage(portStatus);
323 private void writePortStatusMessage(final PortStatus portStatusMessage) {
324 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
325 .translate(portStatusMessage, getDeviceInfo(), null);
327 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
328 .getNodeInstanceIdentifier()
329 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
330 .nodeConnectorIdfromDatapathPortNo(
331 deviceInfo.getDatapathId(),
332 portStatusMessage.getPortNo(),
333 OpenflowVersion.get(deviceInfo.getVersion()))));
335 if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
336 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
337 // because of ADD status node connector has to be created
338 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
339 .setKey(iiToNodeConnector.getKey())
340 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
341 FlowCapableNodeConnectorStatisticsDataBuilder().build())
342 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
344 } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
345 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
350 public void processPacketInMessage(final PacketInMessage packetInMessage) {
351 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
352 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
355 private void handlePacketInMessage(final PacketIn packetIn,
356 final Class<?> implementedInterface,
358 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
359 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
361 if (packetIn == null) {
362 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
363 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
367 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
369 // Try to get ingress from match
370 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
371 ? packetIn.getIngress() : Optional.ofNullable(match)
372 .map(Match::getInPort)
373 .map(nodeConnectorId -> InventoryDataServiceUtil
374 .portNumberfromNodeConnectorId(
377 .map(portNumber -> InventoryDataServiceUtil
378 .nodeConnectorRefFromDatapathIdPortno(
379 deviceInfo.getDatapathId(),
384 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
386 if (!packetInLimiter.acquirePermit()) {
387 LOG.debug("Packet limited");
388 // TODO: save packet into emergency slot if possible
389 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
390 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
394 final ListenableFuture<?> offerNotification = notificationPublishService
395 .offerNotification(new PacketReceivedBuilder(packetIn)
396 .setIngress(nodeConnectorRef)
397 .setMatch(MatchUtil.transformMatch(match,
398 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
402 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
403 LOG.debug("notification offer rejected");
404 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
405 packetInLimiter.drainLowWaterMark();
406 packetInLimiter.releasePermit();
410 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
412 public void onSuccess(final Object result) {
413 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
414 packetInLimiter.releasePermit();
418 public void onFailure(final Throwable throwable) {
419 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
420 .FROM_SWITCH_NOTIFICATION_REJECTED);
421 LOG.debug("notification offer failed: {}", throwable.getMessage());
422 LOG.trace("notification offer failed..", throwable);
423 packetInLimiter.releasePermit();
429 public void processExperimenterMessage(final ExperimenterMessage notification) {
431 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
432 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
433 getDeviceInfo().getVersion(),
434 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
435 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
436 extensionConverterProvider.getMessageConverter(key);
437 if (messageConverter == null) {
438 LOG.warn("custom converter for {}[OF:{}] not found",
439 notification.getExperimenterDataOfChoice().getImplementedInterface(),
440 getDeviceInfo().getVersion());
443 // build notification
444 final ExperimenterMessageOfChoice messageOfChoice;
446 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
447 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
448 ExperimenterMessageFromDevBuilder()
449 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
450 .setExperimenterMessageOfChoice(messageOfChoice);
452 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
453 } catch (final ConversionException e) {
454 LOG.error("Conversion of experimenter notification failed", e);
459 public boolean processAlienMessage(final OfHeader message) {
460 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
462 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
463 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
464 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
465 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
466 .rev130709.PacketInMessage.class.cast(message);
468 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
476 public TranslatorLibrary oook() {
477 return translatorLibrary;
481 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
482 this.notificationPublishService = notificationPublishService;
486 public MessageSpy getMessageSpy() {
491 public void onPublished() {
492 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
496 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
498 return new MultiMsgCollectorImpl<>(this, requestContext);
502 public void updatePacketInRateLimit(final long upperBound) {
503 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
504 (int) (HIGH_WATERMARK_FACTOR * upperBound));
508 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
509 this.extensionConverterProvider = extensionConverterProvider;
513 public ExtensionConverterProvider getExtensionConverterProvider() {
514 return extensionConverterProvider;
518 TransactionChainManager getTransactionChainManager() {
519 return this.transactionChainManager;
523 public ListenableFuture<Void> closeServiceInstance() {
524 final ListenableFuture<Void> listenableFuture = initialized.get()
525 ? transactionChainManager.deactivateTransactionManager()
526 : Futures.immediateFuture(null);
528 hashedWheelTimer.newTimeout((timerTask) -> {
529 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
530 listenableFuture.cancel(true);
532 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
534 return listenableFuture;
538 public DeviceInfo getDeviceInfo() {
539 return this.deviceInfo;
543 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
544 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
549 public ServiceGroupIdentifier getIdentifier() {
550 return deviceInfo.getServiceIdentifier();
554 public void close() {
555 // Close all datastore registries and transactions
556 if (initialized.getAndSet(false)) {
557 deviceGroupRegistry.close();
558 deviceFlowRegistry.close();
559 deviceMeterRegistry.close();
561 final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
563 Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
565 public void onSuccess(@Nullable final Void result) {
566 transactionChainManager.close();
567 transactionChainManager = null;
571 public void onFailure(final Throwable throwable) {
572 transactionChainManager.close();
573 transactionChainManager = null;
578 requestContexts.forEach(requestContext -> RequestContextUtil
579 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
580 requestContexts.clear();
584 public boolean canUseSingleLayerSerialization() {
585 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
588 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
590 @SuppressWarnings({"checkstyle:IllegalCatch", "checkstyle:AvoidHidingCauseExceptionCheck"})
591 public void instantiateServiceInstance() {
592 lazyTransactionManagerInitialization();
595 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
596 .retrieveAndClearPortStatusMessages();
598 portStatusMessages.forEach(this::writePortStatusMessage);
600 } catch (final Exception ex) {
601 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
602 deviceInfo.toString(),
606 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
607 .lookup(deviceInfo.getVersion());
609 if (initializer.isPresent()) {
610 final Future<Void> initialize = initializer
612 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
615 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
616 } catch (TimeoutException ex) {
617 initialize.cancel(true);
618 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
619 deviceInfo.toString(),
620 String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
622 } catch (ExecutionException | InterruptedException ex) {
623 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
624 deviceInfo.toString(),
628 throw new RuntimeException(String.format("Unsupported version %s for device %s",
629 deviceInfo.getVersion(),
630 deviceInfo.toString()));
633 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
634 getDeviceFlowRegistry().fill();
635 Futures.addCallback(deviceFlowRegistryFill,
636 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
640 void lazyTransactionManagerInitialization() {
641 if (!this.initialized.get()) {
642 if (LOG.isDebugEnabled()) {
643 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
645 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
646 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
647 deviceInfo.getNodeInstanceIdentifier());
648 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
649 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
652 transactionChainManager.activateTransactionManager();
653 initialized.set(true);
658 public <T> RequestContext<T> createRequestContext() {
659 final Long xid = deviceInfo.reserveXidForDeviceMessage();
661 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
663 public void close() {
664 requestContexts.remove(this);
668 requestContexts.add(abstractRequestContext);
669 return abstractRequestContext;
673 public void onStateAcquired(final ContextChainState state) {
677 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
678 .Optional<FlowCapableNode>>> {
679 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
680 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
682 DeviceFlowRegistryCallback(
683 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
684 ContextChainMastershipWatcher contextChainMastershipWatcher) {
685 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
686 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
690 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
691 if (LOG.isDebugEnabled()) {
692 // Count all flows we read from datastore for debugging purposes.
693 // This number do not always represent how many flows were actually added
694 // to DeviceFlowRegistry, because of possible duplicates.
695 long flowCount = Optional.ofNullable(result)
696 .map(Collections::singleton)
697 .orElse(Collections.emptySet())
699 .flatMap(Collection::stream)
700 .filter(Objects::nonNull)
701 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
702 .filter(Objects::nonNull)
703 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
704 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
705 .filter(Objects::nonNull)
706 .filter(table -> Objects.nonNull(table.getFlow()))
707 .flatMap(table -> table.getFlow().stream())
708 .filter(Objects::nonNull)
711 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
713 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
714 .INITIAL_FLOW_REGISTRY_FILL);
718 public void onFailure(Throwable throwable) {
719 if (deviceFlowRegistryFill.isCancelled()) {
720 if (LOG.isDebugEnabled()) {
721 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
724 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
727 contextChainMastershipWatcher.onNotAbleToStartMastership(
729 "Was not able to fill flow registry on device",