/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
/* TODO:
* Copied over as-is from southbound plugin. Good candidate to be common
- * when refactoring code.
+ * when refactoring code.
*/
-public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
+public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable,
+ Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
private BindingTransactionChain chain;
private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
= new HashMap<>();
private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ //This is made volatile as it is accessed from uncaught exception handler thread also
+ private volatile ReadWriteTransaction transactionInFlight = null;
+ private Iterator<TransactionCommand> commandIterator = null;
public TransactionInvokerImpl(DataBroker db) {
this.db = db;
this.chain = db.createTransactionChain(this);
- ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
+ ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
+ .setUncaughtExceptionHandler(this).build();
executor = Executors.newSingleThreadExecutor(threadFact);
- executor.submit(this);
+ //Using the execute method here so that un caught exception handler gets triggered upon exception.
+ //The other way to do it is using submit method and wait on the future to catch any exceptions
+ executor.execute(this);
}
@Override
LOG.warn("Extracting commands was interrupted.", e);
continue;
}
-
- ReadWriteTransaction transactionInFlight = null;
+ commandIterator = commands.iterator();
try {
- for (TransactionCommand command: commands) {
+ while (commandIterator.hasNext()) {
+ TransactionCommand command = commandIterator.next();
final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
transactionInFlight = transaction;
recordPendingTransaction(command, transaction);
command.execute(transaction);
- Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+ ListenableFuture<Void> ft = transaction.submit();
+ command.setTransactionResultFuture(ft);
+ Futures.addCallback(ft, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
successfulTransactionQueue.offer(transaction);
}
});
}
+ transactionInFlight = null;
} catch (IllegalStateException e) {
if (transactionInFlight != null) {
// TODO: This method should distinguish exceptions on which the command should be
// this method will retry commands which will never be successful forever.
failedTransactionQueue.offer(transactionInFlight);
}
+ transactionInFlight = null;
LOG.warn("Failed to process an update notification from OVS.", e);
}
}
List<TransactionCommand> commands = new ArrayList<>();
if (transaction != null) {
int index = pendingTransactions.lastIndexOf(transaction);
+ //This logic needs to be revisited. Is it ok to resubmit these things again ?
+ //are these operations idempotent ?
+ //Does the transaction chain execute n+1th if nth one threw error ?
List<ReadWriteTransaction> transactions =
pendingTransactions.subList(index, pendingTransactions.size() - 1);
for (ReadWriteTransaction tx: transactions) {
}
resetTransactionQueue();
}
+ if (commandIterator != null) {
+ while (commandIterator.hasNext()) {
+ commands.add(commandIterator.next());
+ }
+ }
return commands;
}
private List<TransactionCommand> extractCommands() throws InterruptedException {
List<TransactionCommand> commands = extractResubmitCommands();
+ if (!commands.isEmpty() && inputQueue.isEmpty()) {
+ //we got some commands to be executed let us not sit and wait on empty queue
+ return commands;
+ }
+ //pull commands from queue if not empty , otherwise wait for commands to be placed in queue.
commands.addAll(extractCommandsFromQueue());
return commands;
}
public void close() throws Exception {
this.executor.shutdown();
}
+
+ @Override
+ public void uncaughtException(Thread thread, Throwable e) {
+ LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", e);
+ if (transactionInFlight != null) {
+ failedTransactionQueue.offer(transactionInFlight);
+ }
+ transactionInFlight = null;
+ executor.execute(this);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionCommand;
+import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvokerImpl;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentationBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+public class TransactionInvokerImplTest extends AbstractConcurrentDataBrokerTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerTestBase.class);
+
+ private TransactionCommand SLEEPING_PILL = new TransactionCommand() {
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ try {
+ LOG.debug("Running sleeping pill");
+ SLEEPING_PILL_STARTED_LATCH.countDown();
+ SLEEPING_PILL_END_LATCH.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //ignore the error
+ }
+ }
+ };
+
+ private TransactionCommand NULL_POINTER_PILL = new TransactionCommand() {
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ LOG.debug("Running npe TransactionCommand");
+ NULL_POINTER_PILL_START.countDown();
+ throw new NullPointerException("Failed to execute command");
+ }
+ };
+
+ private InstanceIdentifier<Node> nodeIid1;
+ private InstanceIdentifier<Node> nodeIid2;
+ private InstanceIdentifier<Node> nodeIid3;
+
+ private DataBroker dataBroker;
+ private TransactionInvokerImpl invoker;
+ private CountDownLatch SLEEPING_PILL_STARTED_LATCH;
+ private CountDownLatch SLEEPING_PILL_END_LATCH;
+ private CountDownLatch NULL_POINTER_PILL_START;
+
+ @Before
+ public void setupTest() throws Exception {
+ dataBroker = getDataBroker();
+ invoker = new TransactionInvokerImpl(dataBroker);
+ nodeIid1 = createInstanceIdentifier(java.util.UUID.randomUUID().toString());
+ nodeIid2 = createInstanceIdentifier(java.util.UUID.randomUUID().toString());
+ nodeIid3 = createInstanceIdentifier(java.util.UUID.randomUUID().toString());
+ SLEEPING_PILL_STARTED_LATCH = new CountDownLatch(1);
+ SLEEPING_PILL_END_LATCH = new CountDownLatch(1);
+ NULL_POINTER_PILL_START = new CountDownLatch(1);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ deleteNode(nodeIid1);
+ deleteNode(nodeIid2);
+ deleteNode(nodeIid3);
+ }
+
+ private void deleteNode(InstanceIdentifier<Node> iid) {
+ ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+ tx.delete(LogicalDatastoreType.CONFIGURATION, iid);
+ tx.submit();
+ }
+
+ @Test
+ public void testMiddleCommandNullPointerFailure() throws Exception {
+ SettableFuture ft1 = SettableFuture.create();
+ SettableFuture ft2 = SettableFuture.create();
+ SettableFuture ft3 = SettableFuture.create();
+
+ //add a command which does a sleep of 500ms
+ invoker.invoke(SLEEPING_PILL);
+
+ //wait fot the above one to be scheduled
+ SLEEPING_PILL_STARTED_LATCH.await(5, TimeUnit.SECONDS);
+
+ //Now add the commands which will be picked up in one lot
+ invoker.invoke(new AddNodeCmd(nodeIid1, ft1));
+ invoker.invoke(NULL_POINTER_PILL);
+ invoker.invoke(new AddNodeCmd(nodeIid2, ft2));
+
+ SLEEPING_PILL_END_LATCH.countDown();
+
+ ft1.get(5, TimeUnit.SECONDS);
+ ft2.get(5, TimeUnit.SECONDS);
+
+ NULL_POINTER_PILL_START = new CountDownLatch(1);
+ invoker.invoke(NULL_POINTER_PILL);
+ NULL_POINTER_PILL_START.await(5, TimeUnit.SECONDS);
+
+ //make sure that any commands which are submitted after the previous failure run smoothly
+ invoker.invoke(new AddNodeCmd(nodeIid3, ft3));
+ ft3.get(5, TimeUnit.SECONDS);
+ }
+
+
+ private InstanceIdentifier<Node> createInstanceIdentifier(String nodeIdString) {
+ NodeId nodeId = new NodeId(new Uri(nodeIdString));
+ NodeKey nodeKey = new NodeKey(nodeId);
+ TopologyKey topoKey = new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID);
+ return InstanceIdentifier.builder(NetworkTopology.class)
+ .child(Topology.class, topoKey)
+ .child(Node.class, nodeKey)
+ .build();
+ }
+
+ private static class AddNodeCmd extends DefaultTransactionComamndImpl {
+ InstanceIdentifier<Node> iid;
+
+ AddNodeCmd(InstanceIdentifier<Node> iid, SettableFuture ft) {
+ super(ft);
+ this.iid = iid;
+ }
+
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ NodeBuilder nodeBuilder = new NodeBuilder();
+ nodeBuilder.setNodeId(iid.firstKeyOf(Node.class).getNodeId());
+ HwvtepGlobalAugmentationBuilder builder = new HwvtepGlobalAugmentationBuilder();
+ nodeBuilder.addAugmentation(HwvtepGlobalAugmentation.class, builder.build());
+ transaction.put(LogicalDatastoreType.CONFIGURATION, iid, nodeBuilder.build(), WriteTransaction.CREATE_MISSING_PARENTS);
+
+ }
+ }
+
+ private static class DeleteNodeCmd extends DefaultTransactionComamndImpl {
+ InstanceIdentifier<Node> iid;
+
+ DeleteNodeCmd(InstanceIdentifier<Node> iid, SettableFuture ft) {
+ super(ft);
+ this.iid = iid;
+ }
+
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ transaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
+ }
+ }
+
+ private static class DefaultTransactionComamndImpl implements TransactionCommand {
+ SettableFuture ft;
+ DefaultTransactionComamndImpl(SettableFuture ft) {
+ this.ft = ft;
+ }
+
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+
+ }
+
+ @Override
+ public void setTransactionResultFuture(ListenableFuture future) {
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void aVoid) {
+ ft.set(null);
+ }
+ @Override
+ public void onFailure(Throwable throwable) {
+ ft.setException(throwable);
+ }
+ });
+ }
+ }
+}