From ae698f4ec9346ba8d2a26750f289882d483cbfaf Mon Sep 17 00:00:00 2001 From: Martin Bobak Date: Mon, 22 Sep 2014 14:42:13 +0200 Subject: [PATCH] Bug 1985 - NPE when running groupbasedpolicy POC demo - OFSessionUtild doesn't set session as valid anymore, session management moved to SessionManagerOFImpl - session validation/invalidation is synchronized - when registration is about to be closed SalRegistrationManager checks whether session from which registration is originated is valid Change-Id: I2c523586676d3533aaf84afd2cecc38bd53181f4 Signed-off-by: Martin Bobak --- .../md/core/sal/SalRegistrationManager.java | 7 +- .../md/core/session/OFSessionUtil.java | 1 - .../md/core/session/SessionManagerOFImpl.java | 42 +-- .../ConcurrentSalRegistrationManagerTest.java | 239 ++++++++++++++++++ 4 files changed, 267 insertions(+), 22 deletions(-) create mode 100644 openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ConcurrentSalRegistrationManagerTest.java diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java index c3c18595a1..7a2d0c7121 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java @@ -12,7 +12,6 @@ import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; - import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; @@ -112,8 +111,10 @@ public class SalRegistrationManager implements SessionListener, AutoCloseable { NodeRef nodeRef = new NodeRef(identifier); NodeRemoved nodeRemoved = nodeRemoved(nodeRef); LLDPSpeaker.getInstance().removeModelDrivenSwitch(identifier); - CompositeObjectRegistration registration = context.getProviderRegistration(); - registration.close(); + if (context.isValid()) { + CompositeObjectRegistration registration = context.getProviderRegistration(); + registration.close(); + } LOG.debug("ModelDrivenSwitch for {} unregistered from MD-SAL.", datapathId.toString()); publishService.publish(nodeRemoved); diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java index 0a8b12b64f..697bd76903 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java @@ -62,7 +62,6 @@ public abstract class OFSessionUtil { context.setSessionKey(sessionKey); context.setSeed((int) System.currentTimeMillis()); connectionConductor.setSessionContext(context); - context.setValid(true); getSessionManager().addSessionContext(sessionKey, context); } else { // handle auxiliary diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java index e4f6875b3e..cf6e4e8356 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java @@ -17,13 +17,13 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; -import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.api.statistics.MessageSpy; +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.openflow.md.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; -import org.opendaylight.openflowplugin.api.statistics.MessageSpy; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; @@ -95,13 +95,15 @@ public class SessionManagerOFImpl implements SessionManager { if (context == null) { LOG.warn("context for invalidation not found"); } else { - for (Entry auxEntry : context.getAuxiliaryConductors()) { - invalidateAuxiliary(sessionKey, auxEntry.getKey()); + synchronized (context) { + for (Entry auxEntry : context.getAuxiliaryConductors()) { + invalidateAuxiliary(sessionKey, auxEntry.getKey()); + } + context.getPrimaryConductor().disconnect(); + context.setValid(false); + removeSessionContext(context); + // TODO:: notify listeners } - context.getPrimaryConductor().disconnect(); - context.setValid(false); - removeSessionContext(context); - // TODO:: notify listeners } } @@ -109,13 +111,15 @@ public class SessionManagerOFImpl implements SessionManager { if (sessionContext == null) { LOG.warn("context for invalidation not found"); } else { - for (Entry auxEntry : sessionContext - .getAuxiliaryConductors()) { - invalidateAuxiliary(sessionContext, auxEntry.getKey(), true); + synchronized (sessionContext) { + for (Entry auxEntry : sessionContext + .getAuxiliaryConductors()) { + invalidateAuxiliary(sessionContext, auxEntry.getKey(), true); + } + sessionContext.setValid(false); + removeSessionContext(sessionContext); + // TODO:: notify listeners } - sessionContext.setValid(false); - removeSessionContext(sessionContext); - // TODO:: notify listeners } } @@ -129,10 +133,12 @@ public class SessionManagerOFImpl implements SessionManager { @Override public void addSessionContext(SwitchSessionKeyOF sessionKey, SessionContext context) { - sessionLot.put(sessionKey, context); - - sessionNotifier.onSessionAdded(sessionKey, context); + synchronized (context) { + sessionLot.put(sessionKey, context); + sessionNotifier.onSessionAdded(sessionKey, context); + context.setValid(true); + } } @Override diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ConcurrentSalRegistrationManagerTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ConcurrentSalRegistrationManagerTest.java new file mode 100644 index 0000000000..825203e04f --- /dev/null +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ConcurrentSalRegistrationManagerTest.java @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import static java.lang.Thread.sleep; +import static org.mockito.Mockito.when; + +import java.math.BigInteger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.MockitoAnnotations.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; +import org.opendaylight.controller.sal.binding.api.BindingAwareService; +import org.opendaylight.controller.sal.binding.api.NotificationListener; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; +import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchSessionKeyOF; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.Path; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.Notification; +import org.opendaylight.yangtools.yang.binding.RpcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by Martin Bobak mbobak@cisco.com on 9/22/14. + */ +@RunWith(MockitoJUnitRunner.class) +public class ConcurrentSalRegistrationManagerTest { + + + private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(3); + private static final SalRegistrationManager registrationManager = new SalRegistrationManager(); + + private static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF(); + private static final MockProviderContext MOCK_PROVIDER_CONTEXT = new MockProviderContext(); + private final MockNotificationProviderService MOCK_NOTIFICATION_PROVIDER_SERVICE = new MockNotificationProviderService(); + + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class); + private static final long THREAD_SLEEP_MILLIS = 1000; + private static final String DELAYED_THREAD = "DELAYED_THREAD"; + private static final String NO_DELAY_THREAD = "NO_DELAY_THREAD"; + + @Mock + private SessionContext context; + + @Mock + private GetFeaturesOutput features; + + @Mock + private ConnectionConductor connectionConductor; + + @Before + public void setupRegistrationManager() { + registrationManager.onSessionInitiated(MOCK_PROVIDER_CONTEXT); + SWITCH_SESSION_KEY_OF.setDatapathId(BigInteger.ONE); + + when(context.getFeatures()).thenReturn(features); + when(features.getDatapathId()).thenReturn(BigInteger.valueOf(42)); + when(context.getPrimaryConductor()).thenReturn(connectionConductor); + when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3); + } + + @Test + /** + * Test method which verifies that session could not be invalidated while in creation. + */ + public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException { + + + Thread delayedThread = new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Delayed session adding thread started."); + Thread.currentThread().setName(DELAYED_THREAD); + registrationManager.setPublishService(MOCK_NOTIFICATION_PROVIDER_SERVICE); + registrationManager.onSessionAdded(SWITCH_SESSION_KEY_OF, context); + LOG.info("Delayed session adding thread finished."); + } + } + ); + taskExecutor.execute(delayedThread); + + Thread noDelayThread = new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Session removing thread started."); + Thread.currentThread().setName(NO_DELAY_THREAD); + registrationManager.setPublishService(MOCK_NOTIFICATION_PROVIDER_SERVICE); + registrationManager.onSessionRemoved(context); + LOG.info("Session removing thread finished."); + } + } + ); + taskExecutor.execute(noDelayThread); + taskExecutor.shutdown(); + while (!taskExecutor.isTerminated()) { + } + LOG.info("All tasks have finished."); + } + + private final class MockNotificationProviderService implements NotificationProviderService { + + @Override + public void publish(Notification notification) { + if (Thread.currentThread().getName().equals(DELAYED_THREAD)) { + try { + LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS)); + sleep(THREAD_SLEEP_MILLIS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + @Override + public void publish(Notification notification, ExecutorService executorService) { + + } + + @Override + public ListenerRegistration registerInterestListener(NotificationInterestListener notificationInterestListener) { + return null; + } + + @Override + public ListenerRegistration> registerNotificationListener(Class tClass, NotificationListener tNotificationListener) { + return null; + } + + @Override + public ListenerRegistration registerNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener notificationListener) { + return null; + } + } + + private static final class MockProviderContext implements BindingAwareBroker.ProviderContext { + + + @Override + public void registerFunctionality(BindingAwareProvider.ProviderFunctionality functionality) { + + } + + @Override + public void unregisterFunctionality(BindingAwareProvider.ProviderFunctionality functionality) { + + } + + @Override + public T getSALService(Class service) { + return null; + } + + @Override + public BindingAwareBroker.RpcRegistration addRpcImplementation(Class serviceInterface, T implementation) throws IllegalStateException { + return null; + } + + @Override + public BindingAwareBroker.RoutedRpcRegistration addRoutedRpcImplementation(Class serviceInterface, T implementation) throws IllegalStateException { + return new MockRpcRegistration(implementation); + } + + @Override + public >> ListenerRegistration registerRouteChangeListener(L listener) { + return null; + } + + @Override + public T getRpcService(Class serviceInterface) { + return null; + } + } + + private static final class MockRpcRegistration implements BindingAwareBroker.RoutedRpcRegistration { + + private Object implementation; + + public MockRpcRegistration(Object instance) { + this.implementation = instance; + + } + + @Override + public void registerInstance(Class context, InstanceIdentifier instance) { + + } + + @Override + public void unregisterInstance(Class context, InstanceIdentifier instance) { + + } + + @Override + public Object getInstance() { + return this.implementation; + } + + @Override + public void registerPath(Object context, Path path) { + + } + + @Override + public void unregisterPath(Object context, Path path) { + + } + + @Override + public Class getServiceType() { + return null; + } + + @Override + public void close() { + + } + } + +} -- 2.36.6