2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
49 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
50 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
53 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
55 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
56 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
57 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
59 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
60 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
61 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
62 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
63 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
64 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
65 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
66 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
68 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
70 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
71 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
72 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
73 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
99 import org.opendaylight.yangtools.yang.binding.DataContainer;
100 import org.opendaylight.yangtools.yang.binding.DataObject;
101 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
102 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
106 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
108 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
110 // TODO: drain factor should be parametrized
111 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
112 // TODO: low water mark factor should be parametrized
113 private static final float LOW_WATERMARK_FACTOR = 0.75f;
114 // TODO: high water mark factor should be parametrized
115 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
117 // Timeout in milliseconds after what we will give up on initializing device
118 private static final int DEVICE_INIT_TIMEOUT = 9000;
120 // Timeout in milliseconds after what we will give up on closing transaction chain
121 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
123 private static final int LOW_WATERMARK = 1000;
124 private static final int HIGH_WATERMARK = 2000;
126 private final MultipartWriterProvider writerProvider;
127 private final HashedWheelTimer hashedWheelTimer;
128 private final DeviceState deviceState;
129 private final DataBroker dataBroker;
130 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
131 private final MessageSpy messageSpy;
132 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
133 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
134 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
135 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
136 private final TranslatorLibrary translatorLibrary;
137 private final ConvertorExecutor convertorExecutor;
138 private final DeviceInitializerProvider deviceInitializerProvider;
139 private final PacketInRateLimiter packetInLimiter;
140 private final DeviceInfo deviceInfo;
141 private final ConnectionContext primaryConnectionContext;
142 private final boolean skipTableFeatures;
143 private final boolean switchFeaturesMandatory;
144 private final boolean isFlowRemovedNotificationOn;
145 private final boolean useSingleLayerSerialization;
146 private final AtomicBoolean initialized = new AtomicBoolean(false);
147 private final AtomicBoolean hasState = new AtomicBoolean(false);
148 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
149 private NotificationPublishService notificationPublishService;
150 private TransactionChainManager transactionChainManager;
151 private DeviceFlowRegistry deviceFlowRegistry;
152 private DeviceGroupRegistry deviceGroupRegistry;
153 private DeviceMeterRegistry deviceMeterRegistry;
154 private ExtensionConverterProvider extensionConverterProvider;
155 private ContextChainMastershipWatcher contextChainMastershipWatcher;
157 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
158 @Nonnull final DataBroker dataBroker,
159 @Nonnull final MessageSpy messageSpy,
160 @Nonnull final TranslatorLibrary translatorLibrary,
161 final ConvertorExecutor convertorExecutor,
162 final boolean skipTableFeatures,
163 final HashedWheelTimer hashedWheelTimer,
164 final boolean useSingleLayerSerialization,
165 final DeviceInitializerProvider deviceInitializerProvider,
166 final boolean isFlowRemovedNotificationOn,
167 final boolean switchFeaturesMandatory) {
169 this.primaryConnectionContext = primaryConnectionContext;
170 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
171 this.hashedWheelTimer = hashedWheelTimer;
172 this.deviceInitializerProvider = deviceInitializerProvider;
173 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
174 this.switchFeaturesMandatory = switchFeaturesMandatory;
175 this.deviceState = new DeviceStateImpl();
176 this.dataBroker = dataBroker;
177 this.messageSpy = messageSpy;
179 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
180 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
182 this.translatorLibrary = translatorLibrary;
183 this.portStatusTranslator = translatorLibrary.lookupTranslator(
184 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
185 this.packetInTranslator = translatorLibrary.lookupTranslator(
186 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
188 .PacketIn.class.getName()));
189 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
190 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
192 this.convertorExecutor = convertorExecutor;
193 this.skipTableFeatures = skipTableFeatures;
194 this.useSingleLayerSerialization = useSingleLayerSerialization;
195 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
199 public boolean initialSubmitTransaction() {
200 if (!initialized.get()) {
204 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
205 isInitialTransactionSubmitted.set(initialSubmit);
206 return initialSubmit;
210 public DeviceState getDeviceState() {
215 public ReadOnlyTransaction getReadTransaction() {
216 return dataBroker.newReadOnlyTransaction();
220 public boolean isTransactionsEnabled() {
221 return isInitialTransactionSubmitted.get();
225 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
226 final InstanceIdentifier<T> path,
228 if (initialized.get()) {
229 transactionChainManager.writeToTransaction(store, path, data, false);
234 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
235 final InstanceIdentifier<T> path,
237 if (initialized.get()) {
238 transactionChainManager.writeToTransaction(store, path, data, true);
243 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
244 final InstanceIdentifier<T> path) {
245 if (initialized.get()) {
246 transactionChainManager.addDeleteOperationToTxChain(store, path);
251 public boolean submitTransaction() {
252 return initialized.get() && transactionChainManager.submitTransaction();
256 public ConnectionContext getPrimaryConnectionContext() {
257 return primaryConnectionContext;
261 public DeviceFlowRegistry getDeviceFlowRegistry() {
262 return deviceFlowRegistry;
266 public DeviceGroupRegistry getDeviceGroupRegistry() {
267 return deviceGroupRegistry;
271 public DeviceMeterRegistry getDeviceMeterRegistry() {
272 return deviceMeterRegistry;
276 public void processReply(final OfHeader ofHeader) {
277 messageSpy.spyMessage(
278 ofHeader.getImplementedInterface(),
279 ofHeader instanceof Error
280 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
281 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
285 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
286 ofHeaderList.forEach(header -> messageSpy.spyMessage(
287 header.getImplementedInterface(),
288 header instanceof Error
289 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
290 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
294 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
295 //1. translate to general flow (table, priority, match, cookie)
296 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
297 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
299 if (isFlowRemovedNotificationOn) {
300 // Trigger off a notification
301 notificationPublishService.offerNotification(flowRemovedNotification);
306 @SuppressWarnings("checkstyle:IllegalCatch")
307 public void processPortStatusMessage(final PortStatusMessage portStatus) {
308 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
309 .FROM_SWITCH_PUBLISHED_SUCCESS);
311 if (initialized.get()) {
313 writePortStatusMessage(portStatus);
315 } catch (final Exception e) {
316 LOG.warn("Error processing port status message for port {} on device {}",
317 portStatus.getPortNo(), getDeviceInfo(), e);
319 } else if (!hasState.get()) {
320 primaryConnectionContext.handlePortStatusMessage(portStatus);
324 private void writePortStatusMessage(final PortStatus portStatusMessage) {
325 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
326 .translate(portStatusMessage, getDeviceInfo(), null);
328 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
329 .getNodeInstanceIdentifier()
330 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
331 .nodeConnectorIdfromDatapathPortNo(
332 deviceInfo.getDatapathId(),
333 portStatusMessage.getPortNo(),
334 OpenflowVersion.get(deviceInfo.getVersion()))));
336 if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
337 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
338 // because of ADD status node connector has to be created
339 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
340 .setKey(iiToNodeConnector.getKey())
341 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
342 FlowCapableNodeConnectorStatisticsDataBuilder().build())
343 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
345 } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
346 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
351 public void processPacketInMessage(final PacketInMessage packetInMessage) {
352 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
353 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
356 private void handlePacketInMessage(final PacketIn packetIn,
357 final Class<?> implementedInterface,
359 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
360 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
362 if (packetIn == null) {
363 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
364 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
368 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
370 // Try to get ingress from match
371 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
372 ? packetIn.getIngress() : Optional.ofNullable(match)
373 .map(Match::getInPort)
374 .map(nodeConnectorId -> InventoryDataServiceUtil
375 .portNumberfromNodeConnectorId(
378 .map(portNumber -> InventoryDataServiceUtil
379 .nodeConnectorRefFromDatapathIdPortno(
380 deviceInfo.getDatapathId(),
385 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
387 if (!packetInLimiter.acquirePermit()) {
388 LOG.debug("Packet limited");
389 // TODO: save packet into emergency slot if possible
390 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
391 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
395 final ListenableFuture<?> offerNotification = notificationPublishService
396 .offerNotification(new PacketReceivedBuilder(packetIn)
397 .setIngress(nodeConnectorRef)
398 .setMatch(MatchUtil.transformMatch(match,
399 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
403 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
404 LOG.debug("notification offer rejected");
405 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
406 packetInLimiter.drainLowWaterMark();
407 packetInLimiter.releasePermit();
411 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
413 public void onSuccess(final Object result) {
414 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
415 packetInLimiter.releasePermit();
419 public void onFailure(final Throwable throwable) {
420 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
421 .FROM_SWITCH_NOTIFICATION_REJECTED);
422 LOG.debug("notification offer failed: {}", throwable.getMessage());
423 LOG.trace("notification offer failed..", throwable);
424 packetInLimiter.releasePermit();
426 }, MoreExecutors.directExecutor());
430 public void processExperimenterMessage(final ExperimenterMessage notification) {
432 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
433 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
434 getDeviceInfo().getVersion(),
435 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
436 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
437 extensionConverterProvider.getMessageConverter(key);
438 if (messageConverter == null) {
439 LOG.warn("custom converter for {}[OF:{}] not found",
440 notification.getExperimenterDataOfChoice().getImplementedInterface(),
441 getDeviceInfo().getVersion());
444 // build notification
445 final ExperimenterMessageOfChoice messageOfChoice;
447 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
448 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
449 ExperimenterMessageFromDevBuilder()
450 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
451 .setExperimenterMessageOfChoice(messageOfChoice);
453 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
454 } catch (final ConversionException e) {
455 LOG.error("Conversion of experimenter notification failed", e);
460 public boolean processAlienMessage(final OfHeader message) {
461 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
463 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
464 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
465 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
466 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
467 .rev130709.PacketInMessage.class.cast(message);
469 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
477 public TranslatorLibrary oook() {
478 return translatorLibrary;
482 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
483 this.notificationPublishService = notificationPublishService;
487 public MessageSpy getMessageSpy() {
492 public void onPublished() {
493 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
497 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
499 return new MultiMsgCollectorImpl<>(this, requestContext);
503 public void updatePacketInRateLimit(final long upperBound) {
504 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
505 (int) (HIGH_WATERMARK_FACTOR * upperBound));
509 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
510 this.extensionConverterProvider = extensionConverterProvider;
514 public ExtensionConverterProvider getExtensionConverterProvider() {
515 return extensionConverterProvider;
519 TransactionChainManager getTransactionChainManager() {
520 return this.transactionChainManager;
524 public ListenableFuture<Void> closeServiceInstance() {
525 final ListenableFuture<Void> listenableFuture = initialized.get()
526 ? transactionChainManager.deactivateTransactionManager()
527 : Futures.immediateFuture(null);
529 hashedWheelTimer.newTimeout((timerTask) -> {
530 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
531 listenableFuture.cancel(true);
533 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
535 return listenableFuture;
539 public DeviceInfo getDeviceInfo() {
540 return this.deviceInfo;
544 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
545 this.contextChainMastershipWatcher = newWatcher;
550 public ServiceGroupIdentifier getIdentifier() {
551 return deviceInfo.getServiceIdentifier();
555 public void close() {
556 // Close all datastore registries and transactions
557 if (initialized.getAndSet(false)) {
558 deviceGroupRegistry.close();
559 deviceFlowRegistry.close();
560 deviceMeterRegistry.close();
562 final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
564 Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
566 public void onSuccess(@Nullable final Void result) {
567 transactionChainManager.close();
568 transactionChainManager = null;
572 public void onFailure(final Throwable throwable) {
573 transactionChainManager.close();
574 transactionChainManager = null;
576 }, MoreExecutors.directExecutor());
579 requestContexts.forEach(requestContext -> RequestContextUtil
580 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
581 requestContexts.clear();
585 public boolean canUseSingleLayerSerialization() {
586 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
589 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
591 @SuppressWarnings({"checkstyle:IllegalCatch"})
592 public void instantiateServiceInstance() {
593 lazyTransactionManagerInitialization();
596 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
597 .retrieveAndClearPortStatusMessages();
599 portStatusMessages.forEach(this::writePortStatusMessage);
601 } catch (final Exception ex) {
602 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
603 deviceInfo.toString(), ex.toString()), ex);
606 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
607 .lookup(deviceInfo.getVersion());
609 if (initializer.isPresent()) {
610 final Future<Void> initialize = initializer
612 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
615 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
616 } catch (TimeoutException ex) {
617 initialize.cancel(true);
618 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
619 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
620 } catch (ExecutionException | InterruptedException ex) {
621 throw new RuntimeException(
622 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
625 throw new RuntimeException(String.format("Unsupported version %s for device %s",
626 deviceInfo.getVersion(),
627 deviceInfo.toString()));
630 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
631 getDeviceFlowRegistry().fill();
632 Futures.addCallback(deviceFlowRegistryFill,
633 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
634 MoreExecutors.directExecutor());
638 void lazyTransactionManagerInitialization() {
639 if (!this.initialized.get()) {
640 if (LOG.isDebugEnabled()) {
641 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
643 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
644 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
645 deviceInfo.getNodeInstanceIdentifier());
646 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
647 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
650 transactionChainManager.activateTransactionManager();
651 initialized.set(true);
656 public <T> RequestContext<T> createRequestContext() {
657 final Long xid = deviceInfo.reserveXidForDeviceMessage();
659 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
661 public void close() {
662 requestContexts.remove(this);
666 requestContexts.add(abstractRequestContext);
667 return abstractRequestContext;
671 public void onStateAcquired(final ContextChainState state) {
675 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
676 .Optional<FlowCapableNode>>> {
677 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
678 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
680 DeviceFlowRegistryCallback(
681 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
682 ContextChainMastershipWatcher contextChainMastershipWatcher) {
683 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
684 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
688 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
689 if (LOG.isDebugEnabled()) {
690 // Count all flows we read from datastore for debugging purposes.
691 // This number do not always represent how many flows were actually added
692 // to DeviceFlowRegistry, because of possible duplicates.
693 long flowCount = Optional.ofNullable(result)
694 .map(Collections::singleton)
695 .orElse(Collections.emptySet())
697 .flatMap(Collection::stream)
698 .filter(Objects::nonNull)
699 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
700 .filter(Objects::nonNull)
701 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
702 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
703 .filter(Objects::nonNull)
704 .filter(table -> Objects.nonNull(table.getFlow()))
705 .flatMap(table -> table.getFlow().stream())
706 .filter(Objects::nonNull)
709 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
711 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
712 .INITIAL_FLOW_REGISTRY_FILL);
716 public void onFailure(Throwable throwable) {
717 if (deviceFlowRegistryFill.isCancelled()) {
718 if (LOG.isDebugEnabled()) {
719 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
722 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
725 contextChainMastershipWatcher.onNotAbleToStartMastership(
727 "Was not able to fill flow registry on device",