BUG-2091: notification NodeRemoved and processing queue 91/11591/1
authorMichal Rehak <mirehak@cisco.com>
Thu, 25 Sep 2014 16:32:18 +0000 (18:32 +0200)
committerMichal Rehak <mirehak@cisco.com>
Thu, 25 Sep 2014 17:02:49 +0000 (19:02 +0200)
- NodeRemoved notification is now enqueued in the same way
  as other notifications so that original order will be preserved
  and "homeless" messages eliminated
- NodeUpdated must use the same way of delivery to MD-SAL in order
  to preclude state where during reconnect nodeAdded would arrive
  before nodeRemoved

Change-Id: Iede472870fac9ce12e7e5817c9b21967a2a43173
Signed-off-by: Michal Rehak <mirehak@cisco.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationEnqueuer.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationQueueWrapper.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContext.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/NotificationPlainTranslator.java [new file with mode: 0644]
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManagerTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java

index fe9dc4b4f836419d9a429b9d0ab8c27c480b7f63..285d5a80e6c6859cea1b80b1c8eccec3d25a4a8e 100644 (file)
@@ -65,7 +65,7 @@ import com.google.common.util.concurrent.Futures;
  * @author mirehak
  */
 public class ConnectionConductorImpl implements OpenflowProtocolListener,
-        SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener {
+        SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer {
 
     /** ingress queue limit */
     private static final int INGRESS_QUEUE_MAX_SIZE = 200;
@@ -180,6 +180,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     private void enqueueMessage(OfHeader message) {
         enqueueMessage(message, QueueType.DEFAULT);
     }
+    
+    @Override
+    public void enqueueNotification(NotificationQueueWrapper notification) {
+        enqueueMessage(notification);
+    }
 
     /**
      * @param message
index aef940b1763d5f9eab68750c03fb381a1714c5ad..1b1e36df06992aac16053708ccc815b45b1ef2d3 100644 (file)
@@ -25,7 +25,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
@@ -40,6 +39,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartMess
 import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartReplyPortToNodeConnectorUpdatedTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTableFeaturesToTableUpdatedTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.translator.NotificationPlainTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInV10Translator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.PortStatusMessageToNodeConnectorUpdatedTranslator;
@@ -56,6 +56,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
@@ -154,6 +155,8 @@ public class MDController implements IMDController, AutoCloseable {
         addMessageTranslator(MultipartReplyMessage.class,OF13, new MultipartReplyTranslator());
         addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator());
         addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator());
+        addMessageTranslator(NotificationQueueWrapper.class, OF10, new NotificationPlainTranslator());
+        addMessageTranslator(NotificationQueueWrapper.class, OF13, new NotificationPlainTranslator());
 
         NotificationPopListener<DataObject> notificationPopListener = new NotificationPopListener<DataObject>();
         notificationPopListener.setNotificationProviderService(
@@ -181,6 +184,7 @@ public class MDController implements IMDController, AutoCloseable {
         addMessagePopListener(PacketReceived.class,notificationPopListener);
         addMessagePopListener(TransmitPacketInput.class, notificationPopListener);
         addMessagePopListener(NodeUpdated.class, notificationPopListener);
+        addMessagePopListener(NodeRemoved.class, notificationPopListener);
 
         addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener);
         addMessagePopListener(TableUpdated.class, notificationPopListener);
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationEnqueuer.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationEnqueuer.java
new file mode 100644 (file)
index 0000000..2729cef
--- /dev/null
@@ -0,0 +1,22 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+/**
+ * provider of wrapped notification enqueue
+ */
+public interface NotificationEnqueuer {
+
+    /**
+     * enqueue given notification into standard message processing queue
+     * 
+     * @param notification
+     */
+    void enqueueNotification(NotificationQueueWrapper notification);
+
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationQueueWrapper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationQueueWrapper.java
new file mode 100644 (file)
index 0000000..508e28f
--- /dev/null
@@ -0,0 +1,65 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * 
+ */
+public class NotificationQueueWrapper implements OfHeader {
+    
+    private final Notification notification;
+    private final Short version;
+    private Long xid = -1L;
+
+    
+    /**
+     * @param notification
+     * @param version 
+     */
+    public NotificationQueueWrapper(final Notification notification, final Short version) {
+        Preconditions.checkArgument(notification != null, "wrapped notification must not be null");
+        Preconditions.checkArgument(version != null, "message version of wrapped notification must not be null");
+        this.notification = notification; 
+        this.version = version;
+    }
+
+    @Override
+    public Class<? extends DataContainer> getImplementedInterface() {
+        return NotificationQueueWrapper.class;
+    }
+
+    @Override
+    public Short getVersion() {
+        return version;
+    }
+
+    @Override
+    public Long getXid() {
+        return xid;
+    }
+
+    /**
+     * @return the notification
+     */
+    public Notification getNotification() {
+        return notification;
+    }
+
+    /**
+     * @param xid the xid to set
+     */
+    public void setXid(Long xid) {
+        this.xid = xid;
+    }
+}
index 7a2d0c7121748919bb12244ba652b37964595f8e..9f9940c099f5d6da185dda1733873d35c2abe0f7 100644 (file)
@@ -12,10 +12,12 @@ import java.net.Inet4Address;
 import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionListener;
@@ -100,7 +102,10 @@ public class SalRegistrationManager implements SessionListener, AutoCloseable {
 
         LOG.debug("ModelDrivenSwitch for {} registered to MD-SAL.", datapathId.toString());
 
-        publishService.publish(nodeAdded(ofSwitch, features, nodeRef));
+        NotificationQueueWrapper wrappedNotification = new NotificationQueueWrapper(
+                nodeAdded(ofSwitch, features, nodeRef), 
+                context.getFeatures().getVersion());
+        context.getNotificationEnqueuer().enqueueNotification(wrappedNotification);
     }
 
     @Override
@@ -117,7 +122,10 @@ public class SalRegistrationManager implements SessionListener, AutoCloseable {
         }
 
         LOG.debug("ModelDrivenSwitch for {} unregistered from MD-SAL.", datapathId.toString());
-        publishService.publish(nodeRemoved);
+        
+        NotificationQueueWrapper wrappedNotification = new NotificationQueueWrapper(
+                nodeRemoved, context.getFeatures().getVersion());
+        context.getNotificationEnqueuer().enqueueNotification(wrappedNotification);
     }
 
     private NodeUpdated nodeAdded(ModelDrivenSwitch sw, GetFeaturesOutput features, NodeRef nodeRef) {
index 697bd7690382875fb5b049d70a4238fb3f78520f..83f1df12dab12d87f3d55212c7456e1cc3ed1a40 100644 (file)
@@ -14,7 +14,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
@@ -39,7 +39,7 @@ public abstract class OFSessionUtil {
      * @param features
      * @param version
      */
-    public static void registerSession(ConnectionConductor connectionConductor,
+    public static void registerSession(ConnectionConductorImpl connectionConductor,
             GetFeaturesOutput features, short version) {
         SwitchSessionKeyOF sessionKey = createSwitchSessionKey(features
                 .getDatapathId());
@@ -58,6 +58,7 @@ public abstract class OFSessionUtil {
             // register new session context (based primary conductor)
             SessionContextOFImpl context = new SessionContextOFImpl();
             context.setPrimaryConductor(connectionConductor);
+            context.setNotificationEnqueuer(connectionConductor);
             context.setFeatures(features);
             context.setSessionKey(sessionKey);
             context.setSeed((int) System.currentTimeMillis());
index 6992a6a26e983279ce34778e66c1d1a9d9a9e71e..a6236248719326823d53f1d838478fee1d7dcd66 100644 (file)
@@ -15,6 +15,8 @@ import java.util.Set;
 
 import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
@@ -159,4 +161,9 @@ public interface SessionContext {
      * @return seed value for random operations
      */
     int getSeed();
+    
+    /**
+     * @return (wrapped) notification enqueue service - {@link NotificationQueueWrapper} 
+     */
+    NotificationEnqueuer getNotificationEnqueuer();
 }
index 8a7b026ff041a6fdba645aa09a9e1248eab75734..678a0bb4e7a693247ca65772475556811a4855c5 100644 (file)
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
@@ -32,6 +33,7 @@ public class SessionContextOFImpl implements SessionContext {
 
     private GetFeaturesOutput features;
     private ConnectionConductor primaryConductor;
+    private NotificationEnqueuer notificationEnqueuer;
     private ConcurrentHashMap<SwitchConnectionDistinguisher, ConnectionConductor> auxiliaryConductors;
     private boolean valid;
     private SwitchSessionKeyOF sessionKey;
@@ -218,4 +220,17 @@ public class SessionContextOFImpl implements SessionContext {
     public int getSeed() {
         return seed;
     }
+    
+    /**
+     * @param notificationEnqueuer the notificationEnqueuer to set
+     */
+    public void setNotificationEnqueuer(
+            NotificationEnqueuer notificationEnqueuer) {
+        this.notificationEnqueuer = notificationEnqueuer;
+    }
+    
+    @Override
+    public NotificationEnqueuer getNotificationEnqueuer() {
+        return notificationEnqueuer;
+    }
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/NotificationPlainTranslator.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/NotificationPlainTranslator.java
new file mode 100644 (file)
index 0000000..0041828
--- /dev/null
@@ -0,0 +1,51 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.translator;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.List;
+
+import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ */
+public class NotificationPlainTranslator implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    
+    private static final Logger LOG = LoggerFactory
+            .getLogger(NotificationPlainTranslator.class);
+    
+    @Override
+    public List<DataObject> translate(SwitchConnectionDistinguisher cookie,
+            SessionContext sc, OfHeader msg) {
+        List<DataObject> results = null;
+        
+        if(msg instanceof NotificationQueueWrapper) {
+            NotificationQueueWrapper wrappedNotification = (NotificationQueueWrapper) msg;
+            BigInteger datapathId = sc.getFeatures().getDatapathId();
+            Short version = wrappedNotification.getVersion();
+            LOG.debug("NotificationQueueWrapper: version {}  dataPathId {} notification {}", version, datapathId, wrappedNotification.getImplementedInterface());
+            results = Lists.newArrayList((DataObject) wrappedNotification.getNotification());
+        } else {
+            // TODO - Do something smarter than returning null if translation fails... what Exception should we throw here?
+            results = Collections.emptyList();
+        }
+        return results;
+    }
+
+}
index d74f303b32b7ec30dccb619b712c39ea011a221c..a0282a5fc99483608889daa85c290d188d88094d 100644 (file)
@@ -12,10 +12,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import com.google.common.util.concurrent.Futures;
+
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -30,6 +32,7 @@ import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
@@ -69,6 +72,8 @@ public class SalRegistrationManagerTest {
     private GetFeaturesOutput features;
     @Mock
     private BindingAwareBroker.ProviderContext providerContext;
+    @Mock
+    private NotificationEnqueuer notificationEnqueuer;
 
     private ModelDrivenSwitch mdSwitchOF13;
 
@@ -77,7 +82,6 @@ public class SalRegistrationManagerTest {
     @Before
     public void setUp() {
 
-
         Mockito.when(context.getPrimaryConductor()).thenReturn(conductor);
         Mockito.when(context.getMessageDispatchService()).thenReturn(messageDispatchService);
         Mockito.when(conductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_0)
@@ -88,6 +92,7 @@ public class SalRegistrationManagerTest {
         registration = new CompositeObjectRegistration<>(mdSwitchOF13, Collections.EMPTY_LIST);
 
         Mockito.when(context.getProviderRegistration()).thenReturn(registration);
+        Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
         Mockito.when(features.getDatapathId()).thenReturn(BigInteger.valueOf(1));
         Mockito.when(features.getVersion()).thenReturn((short) 1);
 
index 113364755799d6dfdc55991056f2b7a93ba54dd0..4dfa9ae7d47a3c6e29ca21005bca4d020bbbf258 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,6 +26,8 @@ import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandler;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationEnqueuer;
+import org.opendaylight.openflowplugin.openflow.md.core.NotificationQueueWrapper;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
@@ -430,9 +433,14 @@ class MockSessionContext implements SessionContext {
     public void setSeed(int seed) {
         this.seed = seed;
     }
+    
+    @Override
+    public NotificationEnqueuer getNotificationEnqueuer() {
+        return conductor;
+    }
 }
 
-class MockConnectionConductor implements ConnectionConductor {
+class MockConnectionConductor implements ConnectionConductor, NotificationEnqueuer {
 
     private int conductorNum;
     private MockConnectionAdapter adapter;
@@ -525,6 +533,11 @@ class MockConnectionConductor implements ConnectionConductor {
     public void setId(int conductorId) {
         // NOOP
     }
+    
+    @Override
+    public void enqueueNotification(NotificationQueueWrapper notification) {
+        // NOOP
+    }
 }
 
 enum MessageType {