* @author mirehak
*/
public class ConnectionConductorImpl implements OpenflowProtocolListener,
- SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
+ SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer {
/** ingress queue limit */
private static final int INGRESS_QUEUE_MAX_SIZE = 200;
private void enqueueMessage(OfHeader message) {
enqueueMessage(message, QueueType.DEFAULT);
}
+
+ @Override
+ public void enqueueNotification(NotificationQueueWrapper notification) {
+ enqueueMessage(notification);
+ }
/**
* @param message
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartReplyPortToNodeConnectorUpdatedTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTableFeaturesToTableUpdatedTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.translator.NotificationPlainTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInV10Translator;
import org.opendaylight.openflowplugin.openflow.md.core.translator.PortStatusMessageToNodeConnectorUpdatedTranslator;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
addMessageTranslator(MultipartReplyMessage.class,OF13, new MultipartReplyTranslator());
addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator());
addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator());
+ addMessageTranslator(NotificationQueueWrapper.class, OF10, new NotificationPlainTranslator());
+ addMessageTranslator(NotificationQueueWrapper.class, OF13, new NotificationPlainTranslator());
NotificationPopListener<DataObject> notificationPopListener = new NotificationPopListener<DataObject>();
notificationPopListener.setNotificationProviderService(
addMessagePopListener(PacketReceived.class,notificationPopListener);
addMessagePopListener(TransmitPacketInput.class, notificationPopListener);
addMessagePopListener(NodeUpdated.class, notificationPopListener);
+ addMessagePopListener(NodeRemoved.class, notificationPopListener);
addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener);
addMessagePopListener(TableUpdated.class, notificationPopListener);
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+/**
+ * provider of wrapped notification enqueue
+ */
+public interface NotificationEnqueuer {
+
+ /**
+ * enqueue given notification into standard message processing queue
+ *
+ * @param notification
+ */
+ void enqueueNotification(NotificationQueueWrapper notification);
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class NotificationQueueWrapper implements OfHeader {
+
+ private final Notification notification;
+ private final Short version;
+ private Long xid = -1L;
+
+
+ /**
+ * @param notification
+ * @param version
+ */
+ public NotificationQueueWrapper(final Notification notification, final Short version) {
+ Preconditions.checkArgument(notification != null, "wrapped notification must not be null");
+ Preconditions.checkArgument(version != null, "message version of wrapped notification must not be null");
+ this.notification = notification;
+ this.version = version;
+ }
+
+ @Override
+ public Class<? extends DataContainer> getImplementedInterface() {
+ return NotificationQueueWrapper.class;
+ }
+
+ @Override
+ public Short getVersion() {
+ return version;
+ }
+
+ @Override
+ public Long getXid() {
+ return xid;
+ }
+
+ /**
+ * @return the notification
+ */
+ public Notification getNotification() {
+ return notification;
+ }
+
+ /**
+ * @param xid the xid to set
+ */
+ public void setXid(Long xid) {
+ this.xid = xid;
+ }
+}
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionListener;
LOG.debug("ModelDrivenSwitch for {} registered to MD-SAL.", datapathId.toString());
- publishService.publish(nodeAdded(ofSwitch, features, nodeRef));
+ NotificationQueueWrapper wrappedNotification = new NotificationQueueWrapper(
+ nodeAdded(ofSwitch, features, nodeRef),
+ context.getFeatures().getVersion());
+ context.getNotificationEnqueuer().enqueueNotification(wrappedNotification);
}
@Override
}
LOG.debug("ModelDrivenSwitch for {} unregistered from MD-SAL.", datapathId.toString());
- publishService.publish(nodeRemoved);
+
+ NotificationQueueWrapper wrappedNotification = new NotificationQueueWrapper(
+ nodeRemoved, context.getFeatures().getVersion());
+ context.getNotificationEnqueuer().enqueueNotification(wrappedNotification);
}
private NodeUpdated nodeAdded(ModelDrivenSwitch sw, GetFeaturesOutput features, NodeRef nodeRef) {
import java.util.List;
import java.util.Map;
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
* @param features
* @param version
*/
- public static void registerSession(ConnectionConductor connectionConductor,
+ public static void registerSession(ConnectionConductorImpl connectionConductor,
GetFeaturesOutput features, short version) {
SwitchSessionKeyOF sessionKey = createSwitchSessionKey(features
.getDatapathId());
// register new session context (based primary conductor)
SessionContextOFImpl context = new SessionContextOFImpl();
context.setPrimaryConductor(connectionConductor);
+ context.setNotificationEnqueuer(connectionConductor);
context.setFeatures(features);
context.setSessionKey(sessionKey);
context.setSeed((int) System.currentTimeMillis());
import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
* @return seed value for random operations
*/
int getSeed();
+
+ /**
+ * @return (wrapped) notification enqueue service - {@link NotificationQueueWrapper}
+ */
+ NotificationEnqueuer getNotificationEnqueuer();
}
import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
private GetFeaturesOutput features;
private ConnectionConductor primaryConductor;
+ private NotificationEnqueuer notificationEnqueuer;
private ConcurrentHashMap<SwitchConnectionDistinguisher, ConnectionConductor> auxiliaryConductors;
private boolean valid;
private SwitchSessionKeyOF sessionKey;
public int getSeed() {
return seed;
}
+
+ /**
+ * @param notificationEnqueuer the notificationEnqueuer to set
+ */
+ public void setNotificationEnqueuer(
+ NotificationEnqueuer notificationEnqueuer) {
+ this.notificationEnqueuer = notificationEnqueuer;
+ }
+
+ @Override
+ public NotificationEnqueuer getNotificationEnqueuer() {
+ return notificationEnqueuer;
+ }
}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.translator;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.List;
+
+import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ */
+public class NotificationPlainTranslator implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(NotificationPlainTranslator.class);
+
+ @Override
+ public List<DataObject> translate(SwitchConnectionDistinguisher cookie,
+ SessionContext sc, OfHeader msg) {
+ List<DataObject> results = null;
+
+ if(msg instanceof NotificationQueueWrapper) {
+ NotificationQueueWrapper wrappedNotification = (NotificationQueueWrapper) msg;
+ BigInteger datapathId = sc.getFeatures().getDatapathId();
+ Short version = wrappedNotification.getVersion();
+ LOG.debug("NotificationQueueWrapper: version {} dataPathId {} notification {}", version, datapathId, wrappedNotification.getImplementedInterface());
+ results = Lists.newArrayList((DataObject) wrappedNotification.getNotification());
+ } else {
+ // TODO - Do something smarter than returning null if translation fails... what Exception should we throw here?
+ results = Collections.emptyList();
+ }
+ return results;
+ }
+
+}
import static org.junit.Assert.assertNotNull;
import com.google.common.util.concurrent.Futures;
+
import java.math.BigInteger;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
private GetFeaturesOutput features;
@Mock
private BindingAwareBroker.ProviderContext providerContext;
+ @Mock
+ private NotificationEnqueuer notificationEnqueuer;
private ModelDrivenSwitch mdSwitchOF13;
@Before
public void setUp() {
-
Mockito.when(context.getPrimaryConductor()).thenReturn(conductor);
Mockito.when(context.getMessageDispatchService()).thenReturn(messageDispatchService);
Mockito.when(conductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_0)
registration = new CompositeObjectRegistration<>(mdSwitchOF13, Collections.EMPTY_LIST);
Mockito.when(context.getProviderRegistration()).thenReturn(registration);
+ Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
Mockito.when(features.getDatapathId()).thenReturn(BigInteger.valueOf(1));
Mockito.when(features.getVersion()).thenReturn((short) 1);
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandler;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
public void setSeed(int seed) {
this.seed = seed;
}
+
+ @Override
+ public NotificationEnqueuer getNotificationEnqueuer() {
+ return conductor;
+ }
}
-class MockConnectionConductor implements ConnectionConductor {
+class MockConnectionConductor implements ConnectionConductor, NotificationEnqueuer {
private int conductorNum;
private MockConnectionAdapter adapter;
public void setId(int conductorId) {
// NOOP
}
+
+ @Override
+ public void enqueueNotification(NotificationQueueWrapper notification) {
+ // NOOP
+ }
}
enum MessageType {