Revert "Upgrading clustermanager to use infinispan 6.0.2Final." 73/6973/2
authorEd Warnicke <eaw@cisco.com>
Tue, 13 May 2014 18:51:48 +0000 (13:51 -0500)
committerEd Warnicke <eaw@cisco.com>
Thu, 15 May 2014 00:49:06 +0000 (19:49 -0500)
This reverts commit eed877b90a0f8ed70150d710a9479e5a11409259.

Change-Id: I6671fe7e00f18a596a9fcf21904eff426bdd657c
Signed-off-by: Ed Warnicke <eaw@cisco.com>
opendaylight/clustering/services_implementation/pom.xml
opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java [new file with mode: 0644]
opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml
opendaylight/commons/opendaylight/pom.xml

index 4acfae0d29dbb7435e37d87c42e26898a052e1d8..e876929e6daa9a25a28ba667013f716d91984a8d 100644 (file)
@@ -9,7 +9,7 @@
   </parent>
 
   <artifactId>clustering.services-implementation</artifactId>
-  <version>0.4.3-SNAPSHOT</version>
+  <version>0.4.2-SNAPSHOT</version>
   <packaging>bundle</packaging>
   <properties>
     <!-- Sonar properties using jacoco to retrieve integration test results -->
@@ -29,7 +29,7 @@
     <dependency>
       <groupId>org.infinispan</groupId>
       <artifactId>infinispan-core</artifactId>
-      <version>6.0.2.Final</version>
+      <version>5.3.0.Final</version>
     </dependency>
     <dependency>
       <groupId>org.jboss.jbossts.jta</groupId>
@@ -87,7 +87,7 @@
             <!-- why those dependencies ends in the DynamicImport-Package -->
             <!-- rather in the Import-Package -->
             <DynamicImport-Package>*</DynamicImport-Package>
-            <Embed-Dependency>infinispan-core,infinispan-commons,jgroups,jboss-marshalling-river,jboss-marshalling,jboss-logging,staxmapper,narayana-jta;type=!pom;inline=false</Embed-Dependency>
+            <Embed-Dependency>infinispan-core,jgroups,jboss-marshalling-river,jboss-marshalling,jboss-logging,staxmapper,narayana-jta;type=!pom;inline=false</Embed-Dependency>
             <Embed-Transitive>true</Embed-Transitive>
           </instructions>
           <manifestLocation>${project.basedir}/META-INF</manifestLocation>
                     <exclude>org.infinispan:infinispan-core:*</exclude>
                   </excludes>
                   <includes>
-                    <include>org.infinispan:infinispan-core:[6.0.2.Final]</include>
+                    <include>org.infinispan:infinispan-core:[5.3.0.Final]</include>
                   </includes>
                 </bannedDependencies>
               </rules>
diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java
new file mode 100644 (file)
index 0000000..d495e45
--- /dev/null
@@ -0,0 +1,252 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.interceptors.distribution;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.infinispan.CacheException;
+import org.infinispan.commands.FlagAffectedCommand;
+import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.write.DataWriteCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.interceptors.ClusteringInterceptor;
+import org.infinispan.interceptors.locking.ClusteringDependentLogic;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.ExceptionResponse;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.rpc.ResponseFilter;
+import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.rpc.RpcOptions;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+
+/**
+ * Base class for distribution of entries across a cluster.
+ *
+ * @author Manik Surtani
+ * @author Mircea.Markus@jboss.com
+ * @author Pete Muir
+ * @author Dan Berindei <dan@infinispan.org>
+ * @since 4.0
+ */
+public abstract class BaseDistributionInterceptor extends ClusteringInterceptor {
+
+   protected DistributionManager dm;
+
+   protected ClusteringDependentLogic cdl;
+
+   private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class);
+
+   @Override
+   protected Log getLog() {
+      return log;
+   }
+
+   @Inject
+   public void injectDependencies(DistributionManager distributionManager, ClusteringDependentLogic cdl) {
+      this.dm = distributionManager;
+      this.cdl = cdl;
+   }
+
+   @Override
+   protected final InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception {
+      GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
+      ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
+
+      List<Address> targets = new ArrayList<Address>(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key));
+      // if any of the recipients has left the cluster since the command was issued, just don't wait for its response
+      targets.retainAll(rpcManager.getTransport().getMembers());
+      ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress());
+      RpcOptions options = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false)
+            .responseFilter(filter).build();
+      Map<Address, Response> responses = rpcManager.invokeRemotely(targets, get, options);
+
+      if (!responses.isEmpty()) {
+         for (Response r : responses.values()) {
+            if (r instanceof SuccessfulResponse) {
+               InternalCacheValue cacheValue = (InternalCacheValue) ((SuccessfulResponse) r).getResponseValue();
+               return cacheValue.toInternalCacheEntry(key);
+            }
+         }
+      }
+
+      // TODO If everyone returned null, and the read CH has changed, retry the remote get.
+      // Otherwise our get command might be processed by the old owners after they have invalidated their data
+      // and we'd return a null even though the key exists on
+      return null;
+   }
+
+   protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable {
+      if (ctx.isInTxScope()) {
+         throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context");
+      }
+
+      RecipientGenerator recipientGenerator = new SingleKeyRecipientGenerator(command.getKey());
+
+      // see if we need to load values from remote sources first
+      remoteGetBeforeWrite(ctx, command, recipientGenerator);
+
+      // if this is local mode then skip distributing
+      if (isLocalModeForced(command)) {
+         return invokeNextInterceptor(ctx, command);
+      }
+
+      boolean isSync = isSynchronous(command);
+      if (!ctx.isOriginLocal()) {
+         Object returnValue = invokeNextInterceptor(ctx, command);
+         Address primaryOwner = cdl.getPrimaryOwner(command.getKey());
+            if (primaryOwner.equals(rpcManager.getAddress())) {
+                if (command.isConditional() && !command.isSuccessful()) {
+                    log.tracef(
+                            "Skipping the replication of the conditional command as it did not succeed on primary owner (%s).",
+                            command);
+                    return returnValue;
+                }
+                rpcManager.invokeRemotely(recipientGenerator.generateRecipients(), command,
+                        rpcManager.getDefaultRpcOptions(isSync));
+            } else {
+                log.tracef("Didn't invoke RPC because primaryOwner (%s) didn't match this node (%s)", primaryOwner,
+                           rpcManager.getAddress());
+                log.tracef("Hashcode is (%s) for Key (%s)",
+                           command.getKey().hashCode(), command.getKey());
+            }
+         return returnValue;
+      } else {
+         Address primaryOwner = cdl.getPrimaryOwner(command.getKey());
+         if (primaryOwner.equals(rpcManager.getAddress())) {
+            Object result = invokeNextInterceptor(ctx, command);
+            if (command.isConditional() && !command.isSuccessful()) {
+               log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command);
+               return result;
+            }
+            List<Address> recipients = recipientGenerator.generateRecipients();
+            log.tracef("I'm the primary owner, sending the command to all (%s) the recipients in order to be applied.", recipients);
+            // check if a single owner has been configured and the target for the key is the local address
+            boolean isSingleOwnerAndLocal = cacheConfiguration.clustering().hash().numOwners() == 1
+                  && recipients != null
+                  && recipients.size() == 1
+                  && recipients.get(0).equals(rpcManager.getTransport().getAddress());
+            if (!isSingleOwnerAndLocal) {
+               rpcManager.invokeRemotely(recipients, command, rpcManager.getDefaultRpcOptions(isSync));
+            }
+            return result;
+         } else {
+            log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", primaryOwner);
+            log.tracef("Hashcode is (%s) for Key (%s)", command.getKey().hashCode(), command.getKey());
+
+            Object localResult = invokeNextInterceptor(ctx, command);
+            boolean isSyncForwarding = isSync || isNeedReliableReturnValues(command);
+            Map<Address, Response> addressResponseMap = rpcManager.invokeRemotely(Collections.singletonList(primaryOwner), command,
+                  rpcManager.getDefaultRpcOptions(isSyncForwarding));
+            if (!isSyncForwarding) return localResult;
+
+            return getResponseFromPrimaryOwner(primaryOwner, addressResponseMap);
+         }
+      }
+   }
+
+   private Object getResponseFromPrimaryOwner(Address primaryOwner, Map<Address, Response> addressResponseMap) {
+      Response fromPrimaryOwner = addressResponseMap.get(primaryOwner);
+      if (fromPrimaryOwner == null) {
+         log.tracef("Primary owner %s returned null", primaryOwner);
+         return null;
+      }
+      if (!fromPrimaryOwner.isSuccessful()) {
+         Throwable cause = fromPrimaryOwner instanceof ExceptionResponse ? ((ExceptionResponse)fromPrimaryOwner).getException() : null;
+         throw new CacheException("Got unsuccessful response from primary owner: " + fromPrimaryOwner, cause);
+      } else {
+         return ((SuccessfulResponse) fromPrimaryOwner).getResponseValue();
+      }
+   }
+
+   protected abstract void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, RecipientGenerator keygen) throws Throwable;
+
+   interface RecipientGenerator {
+
+      Collection<Object> getKeys();
+
+      List<Address> generateRecipients();
+   }
+
+   class SingleKeyRecipientGenerator implements RecipientGenerator {
+      private final Object key;
+      private final Set<Object> keys;
+      private List<Address> recipients = null;
+
+      SingleKeyRecipientGenerator(Object key) {
+         this.key = key;
+         keys = Collections.singleton(key);
+      }
+
+      @Override
+      public List<Address> generateRecipients() {
+         if (recipients == null) {
+            recipients = cdl.getOwners(key);
+         }
+         return recipients;
+      }
+
+      @Override
+      public Collection<Object> getKeys() {
+         return keys;
+      }
+   }
+
+   class MultipleKeysRecipientGenerator implements RecipientGenerator {
+
+      private final Collection<Object> keys;
+      private List<Address> recipients = null;
+
+      MultipleKeysRecipientGenerator(Collection<Object> keys) {
+         this.keys = keys;
+      }
+
+      @Override
+      public List<Address> generateRecipients() {
+         if (recipients == null) {
+            recipients = cdl.getOwners(keys);
+         }
+         return recipients;
+      }
+
+      @Override
+      public Collection<Object> getKeys() {
+         return keys;
+      }
+   }
+}
index 5fb0ddff609a8fbbaa7fce1601392cc007e5ff33..5ec4325c7f83a665ffc54d0de10b8a7ea0e31ebe 100644 (file)
@@ -1,4 +1,4 @@
-<infinispan>
+<infinispan xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:infinispan:config:5.3">
   <global>
     <transport>
       <properties>
         jmxDomain="org.infinispan"
         cacheManagerName="SampleCacheManager"/>
   </global>
-
- <default>
+  <default>
+    <!-- Configure a synchronous replication cache -->
+    <clustering mode="replication">
+      <sync/>
+    </clustering>
+    <!--
+        Used to register JMX statistics in any available MBean server
+    -->
     <jmxStatistics enabled="true"/>
-        <clustering mode="replication">
-        <sync/>
-        </clustering>
-        </default>
- <namedCache name="transactional-type">
+  </default>
+  <!-- transactionManagerLookupClass="org.infinispan.transaction.lookup.JBossStandaloneJTAManagerLookup" -->
+  <namedCache name="transactional-type">
     <transaction
         transactionManagerLookupClass="org.infinispan.transaction.lookup.JBossStandaloneJTAManagerLookup"
         syncRollbackPhase="true"
         use1PcForAutoCommitTransactions="true"
         autoCommit="true"
         lockingMode="OPTIMISTIC"
-        useEagerLocking="true"
         useSynchronization="true"
         transactionMode="TRANSACTIONAL"
         />
-        <clustering mode="replication">
-        <sync/>
-        </clustering>
   </namedCache>
-
 </infinispan>
index 586a818862f1fe87d69b14febec4178ab5f7a5aa..03a07fe560696fb9ba62b9028cf0f1279ab34989 100644 (file)
@@ -20,7 +20,7 @@
     <bundlescanner.version>0.4.2-SNAPSHOT</bundlescanner.version>
     <checkstyle.version>2.10</checkstyle.version>
     <clustering.services.version>0.5.1-SNAPSHOT</clustering.services.version>
-    <clustering.services_implementation.version>0.4.3-SNAPSHOT</clustering.services_implementation.version>
+    <clustering.services_implementation.version>0.4.2-SNAPSHOT</clustering.services_implementation.version>
     <clustering.stub.version>0.4.2-SNAPSHOT</clustering.stub.version>
     <commons.httpclient.version>0.1.2-SNAPSHOT</commons.httpclient.version>
     <commons.io.version>2.4</commons.io.version>