From 2e7de34840ef72e7b57209fc9a57d334b0a6402a Mon Sep 17 00:00:00 2001 From: Tomas Slusny Date: Fri, 28 Jul 2017 15:35:33 +0200 Subject: [PATCH] Guard lifecycle of contexts - Add GuardedContext wrapper for OFPContext that will guard each phase of context lifecycle (new instance, instantiateServiceInstance, closeServiceInstance, close) Resolves: bug 8913 Change-Id: I6c054575c9f93f03e45dcc23feefa0e7dae6fa88 Signed-off-by: Tomas Slusny --- .../api/openflow/OFPContext.java | 13 -- .../api/openflow/device/DeviceContext.java | 27 ++- .../openflow/lifecycle/ContextChainState.java | 11 +- .../openflow/lifecycle/GuardedContext.java | 33 ++++ .../impl/device/DeviceContextImpl.java | 43 ++--- .../lifecycle/ContextChainHolderImpl.java | 24 +-- .../impl/lifecycle/ContextChainImpl.java | 105 +++++------ .../impl/lifecycle/GuardedContextImpl.java | 163 ++++++++++++++++++ .../impl/rpc/RpcContextImpl.java | 26 +-- ...tractMultipartRequestOnTheFlyCallback.java | 14 +- .../statistics/StatisticsContextImpl.java | 21 +-- .../impl/util/DeviceInitializationUtil.java | 1 - .../StatisticsContextImplParamTest.java | 2 +- 13 files changed, 313 insertions(+), 170 deletions(-) create mode 100644 openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/GuardedContext.java create mode 100644 openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/GuardedContextImpl.java diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/OFPContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/OFPContext.java index 94424bc132..b073d4ceb1 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/OFPContext.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/OFPContext.java @@ -16,19 +16,6 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMaster * General API for all OFP Context. */ public interface OFPContext extends AutoCloseable, ClusterSingletonService { - - /** - * Context state. - */ - enum ContextState { - /* Initialization phase, context not yet fully initialized */ - INITIALIZATION, - /* Standard working phase everything is fine */ - WORKING, - /* Termination phase context is being shutting down */ - TERMINATION - } - /** * Get device info. * @return device info diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java index 24bf326927..bbb4497ce4 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java @@ -55,24 +55,43 @@ public interface DeviceContext extends /** * Getter. + * * @return current devices connection context */ ConnectionContext getPrimaryConnectionContext(); /** * Getter. + * * @return translator library */ TranslatorLibrary oook(); + /** + * Sets notification publish service. + * + * @param notificationPublishService the notification publish service + */ void setNotificationPublishService(NotificationPublishService notificationPublishService); + /** + * Gets message spy. + * + * @return the message spy + */ MessageSpy getMessageSpy(); + /** + * Gets multi msg collector. + * + * @param the type parameter + * @param requestContext the request context + * @return the multi msg collector + */ MultiMsgCollector getMultiMsgCollector(RequestContext> requestContext); /** - * indicates that device context is fully published (e.g.: packetIn messages should be passed). + * Indicates that device context is fully published (e.g.: packetIn messages should be passed). */ void onPublished(); @@ -90,7 +109,7 @@ public interface DeviceContext extends /** * Setter for sal role service. - * @param salRoleService Role Service + * @param salRoleService role service */ void setSalRoleService(@Nonnull SalRoleService salRoleService); @@ -100,6 +119,10 @@ public interface DeviceContext extends */ ListenableFuture> makeDeviceSlave(); + /** + * Checks if device and controller supports single layer serialization. + * @return true if single layer serialization is supported + */ boolean canUseSingleLayerSerialization(); /** diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainState.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainState.java index 8cfc3eecec..887358b35e 100644 --- a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainState.java +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainState.java @@ -8,6 +8,10 @@ package org.opendaylight.openflowplugin.api.openflow.lifecycle; public enum ContextChainState { + /** + * Context chain is undefined. + */ + UNDEFINED, /** * Context chain is working as MASTER. */ @@ -17,8 +21,7 @@ public enum ContextChainState { */ WORKING_SLAVE, /** - * Context chain is undefined. + * Context chain is closed. */ - UNDEFINED - -} + CLOSED +} \ No newline at end of file diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/GuardedContext.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/GuardedContext.java new file mode 100644 index 0000000000..77f7d7c75e --- /dev/null +++ b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/GuardedContext.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.api.openflow.lifecycle; + +import com.google.common.util.concurrent.Service; +import java.util.function.Function; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; + +/** + * Stateful OpenFlow context wrapper. + */ +public interface GuardedContext extends OFPContext { + /** + * Returns the lifecycle state of the service. + * + * @return the service state + */ + Service.State state(); + + /** + * Maps delegate inside guarded context to T. + * + * @param the type parameter + * @param transformer the transformer + * @return the t + */ + T map(Function transformer); +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 45f5dc231f..7b4ba521f0 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -8,7 +8,6 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; @@ -26,7 +25,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; @@ -47,8 +45,8 @@ import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.Xid; import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion; import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry; @@ -168,7 +166,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private final AtomicBoolean initialized = new AtomicBoolean(false); private final AtomicBoolean hasState = new AtomicBoolean(false); private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false); - private final AtomicReference state = new AtomicReference<>(ContextState.INITIALIZATION); private NotificationPublishService notificationPublishService; private TransactionChainManager transactionChainManager; private DeviceFlowRegistry deviceFlowRegistry; @@ -529,8 +526,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void onPublished() { - Verify.verify(ContextState.INITIALIZATION.equals(state.get())); - state.set(ContextState.WORKING); primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); } @@ -566,8 +561,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public ListenableFuture closeServiceInstance() { - LOG.info("Stopping device context cluster services for node {}", deviceInfo.getLOGValue()); - final ListenableFuture listenableFuture = initialized.get() ? transactionChainManager.deactivateTransactionManager() : Futures.immediateFuture(null); @@ -599,34 +592,27 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void close() { - if (ContextState.TERMINATION.equals(state)) { - if (LOG.isDebugEnabled()) { - LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo()); - } - - return; - } - - state.set(ContextState.TERMINATION); - // Close all datastore registries and transactions - if (initialized.get()) { - initialized.set(false); + if (initialized.getAndSet(false)) { deviceGroupRegistry.close(); deviceFlowRegistry.close(); deviceMeterRegistry.close(); final ListenableFuture txChainShuttingDown = transactionChainManager.shuttingDown(); - try { - txChainShuttingDown.get(TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - txChainShuttingDown.cancel(true); - LOG.warn("Failed to shut down transaction chain for device {}: {}", deviceInfo, e); - } + Futures.addCallback(txChainShuttingDown, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + transactionChainManager.close(); + transactionChainManager = null; + } - transactionChainManager.close(); - transactionChainManager = null; + @Override + public void onFailure(final Throwable t) { + transactionChainManager.close(); + transactionChainManager = null; + } + }); } requestContexts.forEach(requestContext -> RequestContextUtil @@ -646,7 +632,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void instantiateServiceInstance() { - LOG.info("Starting device context cluster services for node {}", deviceInfo); lazyTransactionManagerInitialization(); try { diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java index 9ad10e494e..49ac750480 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java @@ -12,13 +12,13 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.HashedWheelTimer; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -65,7 +65,7 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L; private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType"; - private final Map contextChainMap = Collections.synchronizedMap(new HashMap<>()); + private final Map contextChainMap = new ConcurrentHashMap<>(); private final EntityOwnershipListenerRegistration eosListenerRegistration; private final ClusterSingletonServiceProvider singletonServiceProvider; private final ItemScheduler scheduler; @@ -230,14 +230,16 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker public void onDeviceDisconnected(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { - if (contextChain.auxiliaryConnectionDropped(connectionContext)) { - LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo); - } else { - LOG.info("Device {} disconnected.", deviceInfo); - destroyContextChain(deviceInfo); - } - }); + Optional.ofNullable(connectionContext.getDeviceInfo()) + .map(contextChainMap::get) + .ifPresent(contextChain -> { + if (contextChain.auxiliaryConnectionDropped(connectionContext)) { + LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo); + } else { + LOG.info("Device {} disconnected.", deviceInfo); + destroyContextChain(deviceInfo); + } + }); } @VisibleForTesting @@ -284,7 +286,7 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker } } - private synchronized void destroyContextChain(final DeviceInfo deviceInfo) { + private void destroyContextChain(final DeviceInfo deviceInfo) { scheduler.remove(deviceInfo); Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java index 153945c307..094338dd56 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java @@ -7,8 +7,6 @@ */ package org.opendaylight.openflowplugin.impl.lifecycle; -import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState; - import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -18,12 +16,12 @@ import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; -import org.opendaylight.openflowplugin.api.ConnectionException; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; @@ -34,6 +32,7 @@ import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMaster import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -42,22 +41,22 @@ import org.slf4j.LoggerFactory; public class ContextChainImpl implements ContextChain { private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class); + private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false); private final AtomicBoolean initialGathering = new AtomicBoolean(false); private final AtomicBoolean initialSubmitting = new AtomicBoolean(false); private final AtomicBoolean registryFilling = new AtomicBoolean(false); private final AtomicBoolean rpcRegistration = new AtomicBoolean(false); private final List deviceRemovedHandlers = new CopyOnWriteArrayList<>(); - private final List contexts = new CopyOnWriteArrayList<>(); + private final List contexts = new CopyOnWriteArrayList<>(); private final List auxiliaryConnections = new CopyOnWriteArrayList<>(); private final ExecutorService executorService; private final ContextChainMastershipWatcher contextChainMastershipWatcher; private final DeviceInfo deviceInfo; private final ConnectionContext primaryConnection; + private final AtomicReference contextChainState = + new AtomicReference<>(ContextChainState.UNDEFINED); private AutoCloseable registration; - private ContextState state = ContextState.INITIALIZATION; - - private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED; ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher, @Nonnull final ConnectionContext connectionContext, @@ -70,7 +69,7 @@ public class ContextChainImpl implements ContextChain { @Override public void addContext(@Nonnull final T context) { - contexts.add(context); + contexts.add(new GuardedContextImpl(context)); } @Override @@ -78,11 +77,11 @@ public class ContextChainImpl implements ContextChain { LOG.info("Starting clustering services for node {}", deviceInfo); try { - contexts.forEach(this::initializeContextService); + contexts.forEach(OFPContext::instantiateServiceInstance); LOG.info("Started clustering services for node {}", deviceInfo); } catch (final Exception ex) { executorService.submit(() -> contextChainMastershipWatcher - .onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage())); + .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString())); } } @@ -92,9 +91,9 @@ public class ContextChainImpl implements ContextChain { contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); final ListenableFuture> servicesToBeClosed = Futures - .successfulAsList(Lists.reverse(contexts) + .allAsList(Lists.reverse(contexts) .stream() - .map(this::closeContextService) + .map(OFPContext::closeServiceInstance) .collect(Collectors.toList())); return Futures.transform(servicesToBeClosed, (input) -> { @@ -111,12 +110,12 @@ public class ContextChainImpl implements ContextChain { @Override public void close() { - if (ContextState.TERMINATION.equals(state)) { + if (ContextChainState.CLOSED.equals(contextChainState.get())) { LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo); return; } - state = ContextState.TERMINATION; + contextChainState.set(ContextChainState.CLOSED); contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); // Close all connections to devices @@ -149,13 +148,12 @@ public class ContextChainImpl implements ContextChain { @Override public void makeContextChainStateSlave() { unMasterMe(); - changeState(ContextChainState.WORKING_SLAVE); + changeMastershipState(ContextChainState.WORKING_SLAVE); } @Override public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { LOG.info("Registering clustering services for node {}", deviceInfo); - state = ContextState.WORKING; registration = Objects.requireNonNull(clusterSingletonServiceProvider .registerClusterSingletonService(this)); LOG.info("Registered clustering services for node {}", deviceInfo); @@ -165,15 +163,12 @@ public class ContextChainImpl implements ContextChain { public void makeDeviceSlave() { unMasterMe(); - contexts.stream() - .filter(DeviceContext.class::isInstance) - .map(DeviceContext.class::cast) - .findAny() - .ifPresent(deviceContext -> Futures - .addCallback( - deviceContext.makeDeviceSlave(), - new DeviceSlaveCallback(), - executorService)); + contexts.forEach(context -> { + if (context.map(DeviceContext.class::isInstance)) { + Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(), + new DeviceSlaveCallback(), executorService); + } + }); } @Override @@ -212,7 +207,7 @@ public class ContextChainImpl implements ContextChain { LOG.info("Device {} is able to work as master{}", deviceInfo, registryFilling.get() ? "." : " WITHOUT flow registry !!!"); - changeState(ContextChainState.WORKING_MASTER); + changeMastershipState(ContextChainState.WORKING_MASTER); } return result; @@ -220,7 +215,7 @@ public class ContextChainImpl implements ContextChain { @Override public boolean isClosing() { - return ContextState.TERMINATION.equals(state); + return ContextChainState.CLOSED.equals(contextChainState.get()); } @Override @@ -232,13 +227,15 @@ public class ContextChainImpl implements ContextChain { @Override public boolean continueInitializationAfterReconciliation() { - return contexts.stream() - .filter(StatisticsContext.class::isInstance) - .map(StatisticsContext.class::cast) - .findAny() - .map(StatisticsContext::initialSubmitAfterReconciliation) - .orElse(false) && - isMastered(ContextChainMastershipState.INITIAL_SUBMIT); + final AtomicBoolean initialSubmit = new AtomicBoolean(false); + + contexts.forEach(context -> { + if (context.map(StatisticsContext.class::isInstance)) { + initialSubmit.set(context.map(StatisticsContext.class::cast).initialSubmitAfterReconciliation()); + } + }); + + return initialSubmit.get() && isMastered(ContextChainMastershipState.INITIAL_SUBMIT); } @Override @@ -258,39 +255,21 @@ public class ContextChainImpl implements ContextChain { deviceRemovedHandlers.add(deviceRemovedHandler); } - private void changeState(final ContextChainState contextChainState) { - boolean propagate = this.contextChainState == ContextChainState.UNDEFINED; - this.contextChainState = contextChainState; - - if (propagate) { - contexts.stream() - .filter(ContextChainStateListener.class::isInstance) - .map(ContextChainStateListener.class::cast) - .forEach(listener -> listener.onStateAcquired(contextChainState)); - } - } - - private void initializeContextService(final OFPContext context) { - if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) { - context.instantiateServiceInstance(); - } else { - LOG.warn("Device connection for node {} doesn't exist anymore. Primary connection status: {}", - deviceInfo, - primaryConnection.getConnectionState()); + private void changeMastershipState(final ContextChainState contextChainState) { + if (ContextChainState.CLOSED.equals(this.contextChainState.get())) { + return; } - } - private ListenableFuture closeContextService(final OFPContext context) { - if (ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())) { - final String errMsg = String - .format("Device connection for node %s doesn't exist anymore. Primary connection status: %s", - deviceInfo.toString(), - primaryConnection.getConnectionState()); + boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get()); + this.contextChainState.set(contextChainState); - return Futures.immediateFailedFuture(new ConnectionException(errMsg)); + if (propagate) { + contexts.forEach(context -> { + if (context.map(ContextChainStateListener.class::isInstance)) { + context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState); + } + }); } - - return context.closeServiceInstance(); } private void unMasterMe() { diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/GuardedContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/GuardedContextImpl.java new file mode 100644 index 0000000000..227da2f6ca --- /dev/null +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/GuardedContextImpl.java @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.impl.lifecycle; + +import static com.google.common.util.concurrent.Service.State.FAILED; +import static com.google.common.util.concurrent.Service.State.NEW; +import static com.google.common.util.concurrent.Service.State.RUNNING; +import static com.google.common.util.concurrent.Service.State.STARTING; +import static com.google.common.util.concurrent.Service.State.STOPPING; +import static com.google.common.util.concurrent.Service.State.TERMINATED; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Monitor; +import com.google.common.util.concurrent.Monitor.Guard; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Service; +import java.util.function.Function; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.openflow.OFPContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GuardedContextImpl implements GuardedContext { + private static final Logger LOG = LoggerFactory.getLogger(GuardedContextImpl.class); + + private final Monitor monitor = new Monitor(); + private final OFPContext delegate; + private Service.State state = NEW; + + private final Guard isStartable = new Guard(monitor) { + @Override + public boolean isSatisfied() { + return state == NEW || state == TERMINATED; + } + }; + + private final Guard isStoppable = new Guard(monitor) { + @Override + public boolean isSatisfied() { + return state.compareTo(RUNNING) <= 0; + } + }; + + private final Guard isCloseable = new Guard(monitor) { + @Override + public boolean isSatisfied() { + return state != FAILED; + } + }; + + GuardedContextImpl(final OFPContext delegate) { + this.delegate = delegate; + } + + @Override + public Service.State state() { + monitor.enter(); + final Service.State stateSnapshot = state; + monitor.leave(); + return stateSnapshot; + } + + @Override + public T map(final Function transformer) { + return transformer.apply(delegate); + } + + @Override + public void instantiateServiceInstance() { + if (monitor.enterIf(isStartable)) { + try { + LOG.info("Starting {} service for node {}", this, getDeviceInfo()); + state = STARTING; + delegate.instantiateServiceInstance(); + state = RUNNING; + } finally { + monitor.leave(); + } + } else { + throw new IllegalStateException("Service " + this + " has already been started"); + } + } + + @Override + public ListenableFuture closeServiceInstance() { + ListenableFuture result = Futures.immediateFuture(null); + + if (monitor.enterIf(isStoppable)) { + try { + LOG.info("Stopping {} service for node {}", this, getDeviceInfo()); + state = STOPPING; + final ListenableFuture resultFuture = delegate.closeServiceInstance(); + + Futures.addCallback(resultFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable final Void result) { + state = TERMINATED; + } + + @Override + public void onFailure(@Nonnull final Throwable t) { + state = TERMINATED; + } + }, MoreExecutors.directExecutor()); + + result = resultFuture; + } catch (final Exception e) { + result = Futures.immediateFailedFuture(e); + } finally { + monitor.leave(); + } + } + + state = TERMINATED; + return result; + } + + @Nonnull + @Override + public ServiceGroupIdentifier getIdentifier() { + return delegate.getIdentifier(); + } + + @Override + public String toString() { + return delegate.getClass().getSimpleName() + "[" + state + "]"; + } + + @Override + public DeviceInfo getDeviceInfo() { + return delegate.getDeviceInfo(); + } + + @Override + public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) { + delegate.registerMastershipWatcher(contextChainMastershipWatcher); + } + + @Override + public void close() { + if (monitor.enterIf(isCloseable)) { + try { + LOG.info("Terminating {} service for node {}", this, getDeviceInfo()); + state = FAILED; + delegate.close(); + } finally { + monitor.leave(); + } + } + } +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java index b4802747d8..2c7238d1b2 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java @@ -51,7 +51,6 @@ class RpcContextImpl implements RpcContext { // TODO: add private Sal salBroker private final ConcurrentMap, RoutedRpcRegistration> rpcRegistrations = new ConcurrentHashMap<>(); private final KeyedInstanceIdentifier nodeInstanceIdentifier; - private volatile ContextState state = ContextState.INITIALIZATION; private final DeviceInfo deviceInfo; private final DeviceContext deviceContext; private final ExtensionConverterProvider extensionConverterProvider; @@ -78,14 +77,10 @@ class RpcContextImpl implements RpcContext { this.tracker = new Semaphore(maxRequests, true); } - /** - * @see org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext#registerRpcServiceImplementation(java.lang.Class, - * org.opendaylight.yangtools.yang.binding.RpcService) - */ @Override public void registerRpcServiceImplementation(final Class serviceClass, final S serviceInstance) { - if (! rpcRegistrations.containsKey(serviceClass)) { + if (!rpcRegistrations.containsKey(serviceClass)) { final RoutedRpcRegistration routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance); routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier); rpcRegistrations.put(serviceClass, routedRpcReg); @@ -101,24 +96,12 @@ class RpcContextImpl implements RpcContext { public S lookupRpcService(final Class serviceClass) { RoutedRpcRegistration registration = rpcRegistrations.get(serviceClass); final RpcService rpcService = registration.getInstance(); - return (S) rpcService; + return serviceClass.cast(rpcService); } - /** - * Unregisters all services. - * - * @see java.lang.AutoCloseable#close() - */ @Override public void close() { - if (ContextState.TERMINATION.equals(state)) { - if (LOG.isDebugEnabled()) { - LOG.debug("RpcContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); - } - } else { - this.state = ContextState.TERMINATION; - unregisterRPCs(); - } + unregisterRPCs(); } private void unregisterRPCs() { @@ -190,8 +173,6 @@ class RpcContextImpl implements RpcContext { @Override public ListenableFuture closeServiceInstance() { - LOG.info("Stopping rpc context cluster services for node {}", deviceInfo.getLOGValue()); - return Futures.transform(Futures.immediateFuture(null), new Function() { @Nullable @Override @@ -204,7 +185,6 @@ class RpcContextImpl implements RpcContext { @Override public void instantiateServiceInstance() { - LOG.info("Starting rpc context cluster services for node {}", deviceInfo.getLOGValue()); MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor); if (isStatisticsRpcEnabled && !deviceContext.canUseSingleLayerSerialization()) { diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/AbstractMultipartRequestOnTheFlyCallback.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/AbstractMultipartRequestOnTheFlyCallback.java index 1277c78f22..6150fa9d80 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/AbstractMultipartRequestOnTheFlyCallback.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/AbstractMultipartRequestOnTheFlyCallback.java @@ -7,10 +7,10 @@ */ package org.opendaylight.openflowplugin.impl.services; +import com.google.common.util.concurrent.Service; import java.util.Collections; import java.util.List; import java.util.Objects; -import org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry; @@ -39,7 +39,7 @@ public abstract class AbstractMultipartRequestOnTheFlyCallback> context, Class requestType, @@ -62,12 +62,12 @@ public abstract class AbstractMultipartRequestOnTheFlyCallback instanceIdentifier = deviceInfo .getNodeInstanceIdentifier() @@ -157,7 +157,7 @@ public abstract class AbstractMultipartRequestOnTheFlyCallback implements StatisticsContext { private Timeout pollTimeout; private ContextChainMastershipWatcher contextChainMastershipWatcher; - private volatile ContextState state = ContextState.INITIALIZATION; private volatile boolean schedulingEnabled; private volatile ListenableFuture lastDataGathering; @@ -218,17 +217,10 @@ class StatisticsContextImpl implements StatisticsContext { @Override public void close() { - if (ContextState.TERMINATION.equals(state)) { - if (LOG.isDebugEnabled()) { - LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo()); - } - } else { - this.state = ContextState.TERMINATION; - stopGatheringData(); - requestContexts.forEach(requestContext -> RequestContextUtil - .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); - requestContexts.clear(); - } + stopGatheringData(); + requestContexts.forEach(requestContext -> RequestContextUtil + .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED)); + requestContexts.clear(); } @Override @@ -313,8 +305,6 @@ class StatisticsContextImpl implements StatisticsContext { @Override public ListenableFuture closeServiceInstance() { - LOG.info("Stopping statistics context cluster services for node {}", deviceInfo); - return Futures.transform(Futures.immediateFuture(null), new Function() { @Nullable @Override @@ -360,7 +350,6 @@ class StatisticsContextImpl implements StatisticsContext { @Override public void instantiateServiceInstance() { - LOG.info("Starting statistics context cluster services for node {}", deviceInfo); this.statListForCollectingInitialization(); Futures.addCallback(this.gatherDynamicData(), new FutureCallback() { @@ -405,4 +394,4 @@ class StatisticsContextImpl implements StatisticsContext { public ServiceGroupIdentifier getIdentifier() { return deviceInfo.getServiceIdentifier(); } -} +} \ No newline at end of file diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtil.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtil.java index 397c4abce9..116cfe35b0 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtil.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtil.java @@ -38,7 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DeviceInitializationUtil { - private static final Logger LOG = LoggerFactory.getLogger(DeviceInitializationUtil.class); private DeviceInitializationUtil() { diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplParamTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplParamTest.java index be19bfa35a..234b5e4d2a 100644 --- a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplParamTest.java +++ b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImplParamTest.java @@ -91,4 +91,4 @@ public class StatisticsContextImplParamTest extends StatisticsContextImpMockInit } -} +} \ No newline at end of file -- 2.36.6