From 24b4558eb542c58a57071542b20dd058c65ee7e6 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Wed, 14 Jun 2017 15:42:07 +0200 Subject: [PATCH] BUG 8629: Try to allow notification processing to finish in unsubscribe of listeners. Change-Id: I8638c6066b86b101484d3d80cd0fed146a478778 Signed-off-by: Tomas Cere (cherry picked from commit bc5486e6d9fab8f550be8b72874ce96a9eb52651) --- .../provider/MdsalLowLevelTestProvider.java | 22 ++++++++++ .../impl/IdIntsDOMDataTreeLIstener.java | 41 +++++++++++++++++++ .../it/provider/impl/IdIntsListener.java | 40 ++++++++++++++++++ 3 files changed, 103 insertions(+) diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 688e7c0a51..a93d99fa45 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -26,8 +26,10 @@ import java.io.StringWriter; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; @@ -463,6 +465,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } + try { + idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.", + "clustering-it", "clustering-it", e); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + dtclReg.close(); dtclReg = null; @@ -676,6 +688,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } + try { + idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + final RpcError error = RpcResultBuilder.newError( + ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.", + "clustering-it", "clustering-it", e); + return Futures.immediateFuture(RpcResultBuilder.failed() + .withRpcError(error).build()); + } + ddtlReg.close(); ddtlReg = null; diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java index 1fa3f1b61b..f5b55fd077 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsDOMDataTreeLIstener.java @@ -9,8 +9,15 @@ package org.opendaylight.controller.clustering.it.provider.impl; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; @@ -23,8 +30,12 @@ import org.slf4j.LoggerFactory; public class IdIntsDOMDataTreeLIstener implements DOMDataTreeListener { private static final Logger LOG = LoggerFactory.getLogger(IdIntsDOMDataTreeLIstener.class); + private static final long SECOND_AS_NANO = 1000000000; private NormalizedNode localCopy = null; + private AtomicLong lastNotifTimestamp = new AtomicLong(0); + private ScheduledFuture scheduledFuture; + private ScheduledExecutorService executorService; @Override public void onDataTreeChanged(@Nonnull final Collection changes, @@ -33,6 +44,8 @@ public class IdIntsDOMDataTreeLIstener implements DOMDataTreeListener { // There should only be one candidate reported Preconditions.checkState(changes.size() == 1); + lastNotifTimestamp.set(System.nanoTime()); + // do not log the change into debug, only use trace since it will lead to OOM on default heap settings LOG.debug("Received data tree changed"); @@ -64,7 +77,35 @@ public class IdIntsDOMDataTreeLIstener implements DOMDataTreeListener { return localCopy != null; } + public Future tryFinishProcessing() { + executorService = Executors.newSingleThreadScheduledExecutor(); + final SettableFuture settableFuture = SettableFuture.create(); + + scheduledFuture = executorService.scheduleAtFixedRate(new CheckFinishedTask(settableFuture), 0, 1, TimeUnit.SECONDS); + return settableFuture; + } + public boolean checkEqual(final NormalizedNode expected) { return localCopy.equals(expected); } + + private class CheckFinishedTask implements Runnable { + + private final SettableFuture future; + + CheckFinishedTask(final SettableFuture future) { + this.future = future; + } + + @Override + public void run() { + if (System.nanoTime() - lastNotifTimestamp.get() > (SECOND_AS_NANO * 4)) { + scheduledFuture.cancel(false); + future.set(null); + + executorService.shutdown(); + } + } + } + } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsListener.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsListener.java index 445b22c7e7..f98822b6f7 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsListener.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/IdIntsListener.java @@ -9,7 +9,14 @@ package org.opendaylight.controller.clustering.it.provider.impl; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -20,8 +27,12 @@ import org.slf4j.LoggerFactory; public class IdIntsListener implements ClusteredDOMDataTreeChangeListener { private static final Logger LOG = LoggerFactory.getLogger(IdIntsListener.class); + private static final long SECOND_AS_NANO = 1000000000; private NormalizedNode localCopy = null; + private AtomicLong lastNotifTimestamp = new AtomicLong(0); + private ScheduledExecutorService executorService; + private ScheduledFuture scheduledFuture; @Override public void onDataTreeChanged(@Nonnull final Collection changes) { @@ -29,6 +40,8 @@ public class IdIntsListener implements ClusteredDOMDataTreeChangeListener { // There should only be one candidate reported Preconditions.checkState(changes.size() == 1); + lastNotifTimestamp.set(System.nanoTime()); + // do not log the change into debug, only use trace since it will lead to OOM on default heap settings LOG.debug("Received data tree changed"); @@ -58,4 +71,31 @@ public class IdIntsListener implements ClusteredDOMDataTreeChangeListener { public boolean checkEqual(final NormalizedNode expected) { return localCopy.equals(expected); } + + public Future tryFinishProcessing() { + executorService = Executors.newSingleThreadScheduledExecutor(); + final SettableFuture settableFuture = SettableFuture.create(); + + scheduledFuture = executorService.scheduleAtFixedRate(new CheckFinishedTask(settableFuture), 0, 1, TimeUnit.SECONDS); + return settableFuture; + } + + private class CheckFinishedTask implements Runnable { + + private final SettableFuture future; + + public CheckFinishedTask(final SettableFuture future) { + this.future = future; + } + + @Override + public void run() { + if (System.nanoTime() - lastNotifTimestamp.get() > SECOND_AS_NANO * 4) { + scheduledFuture.cancel(false); + future.set(null); + + executorService.shutdown(); + } + } + } } -- 2.36.6