From: Tony Tkacik Date: Sat, 8 Nov 2014 16:06:15 +0000 (+0000) Subject: Merge changes I52c2aaab,I2509ed5e,Ic1d8a4e5,I73730f79,Ica1959e5,I0ce61b07 X-Git-Tag: release/lithium~911 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=00dba4141e5132372e68190638e083cd54e327ba;hp=f22b555b8c9bf662a950fd8077b0a6f57490586b Merge changes I52c2aaab,I2509ed5e,Ic1d8a4e5,I73730f79,Ica1959e5,I0ce61b07 * changes: Fix a raw Dictionary reference Fix a raw Enumeration reference Fix a non-generic CheckedFuture Access static methods via class Remove unused local variable Fix non-generic references to Optional --- diff --git a/opendaylight/commons/filter-valve/src/test/java/org/opendaylight/controller/filtervalve/cors/model/UrlMatcherTest.java b/opendaylight/commons/filter-valve/src/test/java/org/opendaylight/controller/filtervalve/cors/model/UrlMatcherTest.java index 07f6354b19..6276deab78 100644 --- a/opendaylight/commons/filter-valve/src/test/java/org/opendaylight/controller/filtervalve/cors/model/UrlMatcherTest.java +++ b/opendaylight/commons/filter-valve/src/test/java/org/opendaylight/controller/filtervalve/cors/model/UrlMatcherTest.java @@ -24,14 +24,11 @@ public class UrlMatcherTest { final String jspFilter = "jspFilter"; final String exactMatch = "/somePath"; final String prefixFilter = "prefixFilter"; - LinkedHashMap patternMap = new LinkedHashMap() { - { - put(exactMatch, exactMatchFilter); - put("/*", defaultFilter); - put("*.jsp", jspFilter); - put("/foo/*", prefixFilter); - } - }; + LinkedHashMap patternMap = new LinkedHashMap<>(); + patternMap.put(exactMatch, exactMatchFilter); + patternMap.put("/*", defaultFilter); + patternMap.put("*.jsp", jspFilter); + patternMap.put("/foo/*", prefixFilter); urlMatcher = new UrlMatcher<>(patternMap); assertMatches("/abc", defaultFilter); assertMatches(exactMatch, exactMatchFilter, defaultFilter); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 2e671e3ce2..87959efe8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -11,11 +11,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import java.util.AbstractMap.SimpleEntry; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; @@ -38,39 +36,31 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private interface State { boolean isReady(); - SimpleEntry>> getReadyFutures(); - - void setReadyFutures(Object txIdentifier, List> readyFutures); + List> getPreviousReadyFutures(); } private static class Allocated implements State { - private volatile SimpleEntry>> readyFutures; + private final ChainedTransactionProxy transaction; - @Override - public boolean isReady() { - return readyFutures != null; + Allocated(ChainedTransactionProxy transaction) { + this.transaction = transaction; } @Override - public SimpleEntry>> getReadyFutures() { - return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES; + public boolean isReady() { + return transaction.isReady(); } @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures); + public List> getPreviousReadyFutures() { + return transaction.getReadyFutures(); } } private static abstract class AbstractDefaultState implements State { @Override - public SimpleEntry>> getReadyFutures() { - return EMPTY_READY_FUTURES; - } - - @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - throw new IllegalStateException("No transaction is allocated"); + public List> getPreviousReadyFutures() { + return Collections.emptyList(); } } @@ -88,21 +78,15 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { } }; - private static final SimpleEntry>> EMPTY_READY_FUTURES = - new SimpleEntry>>("", - Collections.>emptyList()); - - private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state"); + private static final AtomicInteger counter = new AtomicInteger(0); private final ActorContext actorContext; private final String transactionChainId; - private volatile State state = IDLE_STATE; - private static final AtomicInteger counter = new AtomicInteger(0); + private volatile State currentState = IDLE_STATE; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-transaction-chain-" + counter.incrementAndGet(); + transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet(); } public String getTransactionChainId() { @@ -111,8 +95,11 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - checkReadyState(); - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + State localState = currentState; + checkReadyState(localState); + + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, + transactionChainId, localState.getPreviousReadyFutures()); } @Override @@ -127,36 +114,61 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public void close() { - state = CLOSED_STATE; + currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard actorContext.broadcast(new CloseTransactionChain(transactionChainId)); } private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { - checkReadyState(); + State localState = currentState; + + checkReadyState(localState); - ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type); - STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated()); + // Pass the ready Futures from the previous Tx. + ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type, + transactionChainId, localState.getPreviousReadyFutures()); + + currentState = new Allocated(txProxy); return txProxy; } - private void checkReadyState() { - Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet", - state.getReadyFutures().getKey()); + private void checkReadyState(State state) { + Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - private class ChainedTransactionProxy extends TransactionProxy { + private static class ChainedTransactionProxy extends TransactionProxy { + + /** + * Stores the ready Futures from the previous Tx in the chain. + */ + private final List> previousReadyFutures; + + /** + * Stores the ready Futures from this transaction when it is readied. + */ + private volatile List> readyFutures; - ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, + String transactionChainId, List> previousReadyFutures) { super(actorContext, transactionType, transactionChainId); + this.previousReadyFutures = previousReadyFutures; + } + + List> getReadyFutures() { + return readyFutures; + } + + boolean isReady() { + return readyFutures != null; } @Override protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), readyFutures.size(), TransactionChainProxy.this.transactionChainId); - state.setReadyFutures(getIdentifier(), readyFutures); + LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), + readyFutures.size(), getTransactionChainId()); + this.readyFutures = readyFutures; } /** @@ -169,32 +181,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { final Object serializedCreateMessage) { // Check if there are any previous ready Futures, otherwise let the super class handle it. - // The second check is done to ensure the the previous ready Futures aren't for this - // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can - // occur in this scenario: - // - // - the TransactionProxy is created and the client does a write. - // - // - the TransactionProxy then attempts to create the shard Tx. However it first - // sends a FindPrimaryShard message to the shard manager to find the local shard - // This call is done async. - // - // - the client submits the Tx and the TransactionProxy is readied and we cache - // the ready Futures here. - // - // - then the FindPrimaryShard call completes and this method is called to create - // the shard Tx. However the cached Futures were from the ready on this Tx. If we - // tried to wait on them, it would cause a form of deadlock as the ready Future - // would be waiting on the Tx create Future and vice versa. - SimpleEntry>> readyFuturesEntry = state.getReadyFutures(); - List> readyFutures = readyFuturesEntry.getValue(); - if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) { + if(previousReadyFutures.isEmpty()) { return super.sendCreateTransaction(shard, serializedCreateMessage); } // Combine the ready Futures into 1. Future> combinedFutures = akka.dispatch.Futures.sequence( - readyFutures, actorContext.getActorSystem().dispatcher()); + previousReadyFutures, getActorContext().getActorSystem().dispatcher()); // Add a callback for completion of the combined Futures. final Promise createTxPromise = akka.dispatch.Futures.promise(); @@ -205,15 +198,18 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { // A Ready Future failed so fail the returned Promise. createTxPromise.failure(failure); } else { + LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", + getIdentifier(), getTransactionChainId()); + // Send the CreateTx message and use the resulting Future to complete the // returned Promise. - createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, serializedCreateMessage)); } } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); return createTxPromise.future(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 226ac75467..443e0af9e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -534,6 +534,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return transactionChainId; } + protected ActorContext getActorContext() { + return actorContext; + } + /** * Interface for a transaction operation to be invoked later. */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 4f1a02e435..9f5aded352 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -9,6 +9,8 @@ import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -633,6 +635,60 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionsInQuickSuccession", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + int nTxs = 20; + List cohorts = new ArrayList<>(nTxs); + for(int i = 0; i < nTxs; i++) { + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + rwTx.merge(TestModel.TEST_PATH, testNode); + + cohorts.add(rwTx.ready()); + + } + + for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { + doCommit(cohort); + } + + txChain.close(); + + cleanup(dataStore); + }}; + } + + @Test + public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + + rwTx1.ready(); + + DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + + Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + + txChain.close(); + + cleanup(dataStore); + }}; + } + @Test public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { new IntegrationTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPointTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPointTest.java index cb1a99b4c0..f1b7261bcb 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPointTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPointTest.java @@ -139,7 +139,7 @@ public class BackwardsCompatibleMountPointTest { private DataNormalizer mockNormalizer() throws DataNormalizationException { final DataNormalizer mock = mock(DataNormalizer.class); - doReturn(new AbstractMap.SimpleEntry>(id, normalizedNode) {}) + doReturn(new AbstractMap.SimpleEntry>(id, normalizedNode)) .when(mock).toNormalized(any(YangInstanceIdentifier.class), any(CompositeNode.class)); doReturn(compositeNode).when(mock).toLegacy(any(YangInstanceIdentifier.class), any(NormalizedNode.class)); doReturn(id).when(mock).toLegacy(any(YangInstanceIdentifier.class)); diff --git a/opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/runtimerpc/RuntimeRpcElementResolvedTest.java b/opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/runtimerpc/RuntimeRpcElementResolvedTest.java index 816e118f39..3c4213cbc3 100644 --- a/opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/runtimerpc/RuntimeRpcElementResolvedTest.java +++ b/opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/operations/runtimerpc/RuntimeRpcElementResolvedTest.java @@ -9,13 +9,12 @@ package org.opendaylight.controller.netconf.confignetconfconnector.operations.runtimerpc; import static org.junit.Assert.assertEquals; - +import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; - import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -35,9 +34,7 @@ public class RuntimeRpcElementResolvedTest { return Arrays.asList(new Object[][] { // With namespaces { "/a:modules/a:module[a:name='instanceName'][a:type='moduleType']/b:listener-state[b:peer-id='127.0.0.1']", - new HashMap() {{ - put("listener-state", "127.0.0.1"); - }}}, + new HashMap<>(ImmutableMap.of("listener-state", "127.0.0.1"))}, { "/a:modules/a:module[a:name='instanceName'][a:type='moduleType']", null}, @@ -57,10 +54,7 @@ public class RuntimeRpcElementResolvedTest { { "/modules/module[name=instanceName and type=moduleType]/inner[key=\"b\"]", Collections.singletonMap("inner", "b")}, { "/modules/module[name=instanceName and type=\"moduleType\"]/inner[key2=a]/inner2[key=b]", - new HashMap() {{ - put("inner", "a"); - put("inner2", "b"); - }} + new HashMap<>(ImmutableMap.of("inner", "a", "inner2", "b")) }, }); }