bug 7985 Using UncaughtExceptionHandler in TransactionInvokerImpl 17/51517/15
authorK.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
Tue, 7 Feb 2017 14:13:26 +0000 (19:43 +0530)
committerAnil Vishnoi <vishnoianil@gmail.com>
Wed, 10 May 2017 18:19:52 +0000 (18:19 +0000)
when a particular transaction command fails , it is killing the only
worker processing the transaction commands queue.

when the exection is caught log the exception and resubmit the worker

if 10 commands are to be exectuded
while executing 5th command some exception happens
then the execution resumes from 6th command ignoring the 5th command.

Again if 6th command also fails because of some exception,
then execution resumes from 7th command onwards that way it wont result in
infinite loops.

This patch does not try to address transaction failures, only addressing any
exceptions in the commands.

for transaction chain failures, let us say all 10 commands are executed
successfully and submitted to transaction chain, now we wait for
success/failure of each such transaction asynchronously.
Now our pending tx list reads as 1,2,3,4,5,6,7,8,9,10

we may get success response for 1,2,3,6,7 and failure for 5th.
Then pending tx list reads as 4,5,8,9,10.

then the transactions will resume from 8th onwards. Note 4th is not resumed.
This behavior may not be desired as we are trying to resubmit 8th one which
already got executed.
If the transactions are idempotent then it should not be an issue at all to
reexecute the succeded ones.
Now if the 8th transaction is about deleting an entry ,
in its previous run it already deleted it, so this run will fail.

Now for the 4th one we may get a success callback or failure callback.
If we get failure callback , then again 8,9,10 will be reexecuted.

This patch is not addressing this scenario , can be taken up in later
patches with the right design.

Note the difference between command execution and transaction result.
This patch is addressing exceptions in command execution.

Change-Id: I93bba958784637a4752b860d23354506f0bd98bb
Signed-off-by: K.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionCommand.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/TransactionInvokerImplTest.java [new file with mode: 0644]

index 89dda90f4b6bd7450b7b82076288e2401da3581d..fc130212c91fc4ae9c1cd42f65deed44691ccdab 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ * Copyright (c) 2015, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,10 +8,21 @@
 
 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 
 public interface TransactionCommand {
 
     void execute(ReadWriteTransaction transaction);
 
+    /**
+     * Sets the result future of the executed/submitted transaction.
+     * @param future
+     */
+    default void setTransactionResultFuture(ListenableFuture future) {
+    }
+
+    default ListenableFuture getTransactionResultFuture() {
+        return null;
+    }
 }
\ No newline at end of file
index 6be19c89e01da10df5b24995f011482ab53a34eb..62ed95d20835c8d379e34d472904bd1e7dda92f3 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -10,6 +10,7 @@ package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -18,6 +19,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -33,9 +35,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /*  TODO:
  * Copied over as-is from southbound plugin. Good candidate to be common
- * when refactoring code. 
+ * when refactoring code.
  */
-public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
+public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable,
+        Thread.UncaughtExceptionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
     private BindingTransactionChain chain;
@@ -49,13 +52,19 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
         = new HashMap<>();
     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    //This is made volatile as it is accessed from uncaught exception handler thread also
+    private volatile ReadWriteTransaction transactionInFlight = null;
+    private Iterator<TransactionCommand> commandIterator = null;
 
     public TransactionInvokerImpl(DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
-        ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
+        ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
+                .setUncaughtExceptionHandler(this).build();
         executor = Executors.newSingleThreadExecutor(threadFact);
-        executor.submit(this);
+        //Using the execute method here so that un caught exception handler gets triggered upon exception.
+        //The other way to do it is using submit method and wait on the future to catch any exceptions
+        executor.execute(this);
     }
 
     @Override
@@ -88,15 +97,17 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
                 LOG.warn("Extracting commands was interrupted.", e);
                 continue;
             }
-
-            ReadWriteTransaction transactionInFlight = null;
+            commandIterator = commands.iterator();
             try {
-                for (TransactionCommand command: commands) {
+                while (commandIterator.hasNext()) {
+                    TransactionCommand command = commandIterator.next();
                     final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
                     transactionInFlight = transaction;
                     recordPendingTransaction(command, transaction);
                     command.execute(transaction);
-                    Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+                    ListenableFuture<Void> ft = transaction.submit();
+                    command.setTransactionResultFuture(ft);
+                    Futures.addCallback(ft, new FutureCallback<Void>() {
                         @Override
                         public void onSuccess(final Void result) {
                             successfulTransactionQueue.offer(transaction);
@@ -108,6 +119,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
                         }
                     });
                 }
+                transactionInFlight = null;
             } catch (IllegalStateException e) {
                 if (transactionInFlight != null) {
                     // TODO: This method should distinguish exceptions on which the command should be
@@ -116,6 +128,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
                     // this method will retry commands which will never be successful forever.
                     failedTransactionQueue.offer(transactionInFlight);
                 }
+                transactionInFlight = null;
                 LOG.warn("Failed to process an update notification from OVS.", e);
             }
         }
@@ -126,6 +139,9 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         List<TransactionCommand> commands = new ArrayList<>();
         if (transaction != null) {
             int index = pendingTransactions.lastIndexOf(transaction);
+            //This logic needs to be revisited. Is it ok to resubmit these things again ?
+            //are these operations idempotent ?
+            //Does the transaction chain execute n+1th if nth one threw error ?
             List<ReadWriteTransaction> transactions =
                     pendingTransactions.subList(index, pendingTransactions.size() - 1);
             for (ReadWriteTransaction tx: transactions) {
@@ -133,6 +149,11 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             }
             resetTransactionQueue();
         }
+        if (commandIterator != null) {
+            while (commandIterator.hasNext()) {
+                commands.add(commandIterator.next());
+            }
+        }
         return commands;
     }
 
@@ -153,6 +174,11 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
 
     private List<TransactionCommand> extractCommands() throws InterruptedException {
         List<TransactionCommand> commands = extractResubmitCommands();
+        if (!commands.isEmpty() && inputQueue.isEmpty()) {
+            //we got some commands to be executed let us not sit and wait on empty queue
+            return commands;
+        }
+        //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
         commands.addAll(extractCommandsFromQueue());
         return commands;
     }
@@ -180,4 +206,14 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     public void close() throws Exception {
         this.executor.shutdown();
     }
+
+    @Override
+    public void uncaughtException(Thread thread, Throwable e) {
+        LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", e);
+        if (transactionInFlight != null) {
+            failedTransactionQueue.offer(transactionInFlight);
+        }
+        transactionInFlight = null;
+        executor.execute(this);
+    }
 }
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/TransactionInvokerImplTest.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/TransactionInvokerImplTest.java
new file mode 100644 (file)
index 0000000..fe12a4f
--- /dev/null
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionCommand;
+import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvokerImpl;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentationBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+public class TransactionInvokerImplTest extends AbstractConcurrentDataBrokerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerTestBase.class);
+
+    private TransactionCommand SLEEPING_PILL = new TransactionCommand() {
+        @Override
+        public void execute(ReadWriteTransaction transaction) {
+            try {
+                LOG.debug("Running sleeping pill");
+                SLEEPING_PILL_STARTED_LATCH.countDown();
+                SLEEPING_PILL_END_LATCH.await(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                //ignore the error
+            }
+        }
+    };
+
+    private TransactionCommand NULL_POINTER_PILL = new TransactionCommand() {
+        @Override
+        public void execute(ReadWriteTransaction transaction) {
+            LOG.debug("Running npe TransactionCommand");
+            NULL_POINTER_PILL_START.countDown();
+            throw new NullPointerException("Failed to execute command");
+        }
+    };
+
+    private InstanceIdentifier<Node> nodeIid1;
+    private InstanceIdentifier<Node> nodeIid2;
+    private InstanceIdentifier<Node> nodeIid3;
+
+    private DataBroker dataBroker;
+    private TransactionInvokerImpl invoker;
+    private CountDownLatch SLEEPING_PILL_STARTED_LATCH;
+    private CountDownLatch SLEEPING_PILL_END_LATCH;
+    private CountDownLatch NULL_POINTER_PILL_START;
+
+    @Before
+    public void setupTest() throws Exception {
+        dataBroker = getDataBroker();
+        invoker = new TransactionInvokerImpl(dataBroker);
+        nodeIid1 = createInstanceIdentifier(java.util.UUID.randomUUID().toString());
+        nodeIid2 = createInstanceIdentifier(java.util.UUID.randomUUID().toString());
+        nodeIid3 = createInstanceIdentifier(java.util.UUID.randomUUID().toString());
+        SLEEPING_PILL_STARTED_LATCH = new CountDownLatch(1);
+        SLEEPING_PILL_END_LATCH = new CountDownLatch(1);
+        NULL_POINTER_PILL_START = new CountDownLatch(1);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        deleteNode(nodeIid1);
+        deleteNode(nodeIid2);
+        deleteNode(nodeIid3);
+    }
+
+    private void deleteNode(InstanceIdentifier<Node> iid) {
+        ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+        tx.delete(LogicalDatastoreType.CONFIGURATION, iid);
+        tx.submit();
+    }
+
+    @Test
+    public void testMiddleCommandNullPointerFailure() throws Exception {
+        SettableFuture ft1 = SettableFuture.create();
+        SettableFuture ft2 = SettableFuture.create();
+        SettableFuture ft3 = SettableFuture.create();
+
+        //add a command which does a sleep of 500ms
+        invoker.invoke(SLEEPING_PILL);
+
+        //wait fot the above one to be scheduled
+        SLEEPING_PILL_STARTED_LATCH.await(5, TimeUnit.SECONDS);
+
+        //Now add the commands which will be picked up in one lot
+        invoker.invoke(new AddNodeCmd(nodeIid1, ft1));
+        invoker.invoke(NULL_POINTER_PILL);
+        invoker.invoke(new AddNodeCmd(nodeIid2, ft2));
+
+        SLEEPING_PILL_END_LATCH.countDown();
+
+        ft1.get(5, TimeUnit.SECONDS);
+        ft2.get(5, TimeUnit.SECONDS);
+
+        NULL_POINTER_PILL_START = new CountDownLatch(1);
+        invoker.invoke(NULL_POINTER_PILL);
+        NULL_POINTER_PILL_START.await(5, TimeUnit.SECONDS);
+
+        //make sure that any commands which are submitted after the previous failure run smoothly
+        invoker.invoke(new AddNodeCmd(nodeIid3, ft3));
+        ft3.get(5, TimeUnit.SECONDS);
+    }
+
+
+    private InstanceIdentifier<Node> createInstanceIdentifier(String nodeIdString) {
+        NodeId nodeId = new NodeId(new Uri(nodeIdString));
+        NodeKey nodeKey = new NodeKey(nodeId);
+        TopologyKey topoKey = new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID);
+        return InstanceIdentifier.builder(NetworkTopology.class)
+                .child(Topology.class, topoKey)
+                .child(Node.class, nodeKey)
+                .build();
+    }
+
+    private static class AddNodeCmd extends DefaultTransactionComamndImpl {
+        InstanceIdentifier<Node> iid;
+
+        AddNodeCmd(InstanceIdentifier<Node> iid, SettableFuture ft) {
+            super(ft);
+            this.iid = iid;
+        }
+
+        @Override
+        public void execute(ReadWriteTransaction transaction) {
+            NodeBuilder nodeBuilder = new NodeBuilder();
+            nodeBuilder.setNodeId(iid.firstKeyOf(Node.class).getNodeId());
+            HwvtepGlobalAugmentationBuilder builder = new HwvtepGlobalAugmentationBuilder();
+            nodeBuilder.addAugmentation(HwvtepGlobalAugmentation.class, builder.build());
+            transaction.put(LogicalDatastoreType.CONFIGURATION, iid, nodeBuilder.build(), WriteTransaction.CREATE_MISSING_PARENTS);
+
+        }
+    }
+
+    private static class DeleteNodeCmd extends DefaultTransactionComamndImpl {
+        InstanceIdentifier<Node> iid;
+
+        DeleteNodeCmd(InstanceIdentifier<Node> iid, SettableFuture ft) {
+            super(ft);
+            this.iid = iid;
+        }
+
+        @Override
+        public void execute(ReadWriteTransaction transaction) {
+            transaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
+        }
+    }
+
+    private static class DefaultTransactionComamndImpl implements TransactionCommand {
+        SettableFuture ft;
+        DefaultTransactionComamndImpl(SettableFuture ft) {
+            this.ft = ft;
+        }
+
+        @Override
+        public void execute(ReadWriteTransaction transaction) {
+
+        }
+
+        @Override
+        public void setTransactionResultFuture(ListenableFuture future) {
+            Futures.addCallback(future, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void aVoid) {
+                    ft.set(null);
+                }
+                @Override
+                public void onFailure(Throwable throwable) {
+                    ft.setException(throwable);
+                }
+            });
+        }
+    }
+}