Merge "Enhance debug capabilities"
authorMadhu Venugopal <vmadhu@cisco.com>
Tue, 24 Sep 2013 13:00:48 +0000 (13:00 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 24 Sep 2013 13:00:48 +0000 (13:00 +0000)
opendaylight/clustering/services_implementation/pom.xml
opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java [new file with mode: 0644]
opendaylight/commons/opendaylight/pom.xml

index d6bd287434a499c19b1aa1576b0aa11ee303344d..fddaa6c9dc7e657f51576f5f1ed209a0409fe47a 100644 (file)
         </execution>
       </executions>
     </plugin>
+    <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>${enforcer.version}</version>
+        <executions>
+          <execution>
+            <id>enforce-banned-dependencies</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <bannedDependencies>
+                  <excludes>
+                    <exclude>org.infinispan:infinispan-core:*</exclude>
+                  </excludes>
+                  <includes>
+                    <include>org.infinispan:infinispan-core:[5.3.0.Final]</include>
+                  </includes>
+                </bannedDependencies>
+              </rules>
+              <fail>true</fail>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
   </plugins>
   </build>
   <dependencies>
diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java
new file mode 100644 (file)
index 0000000..e105637
--- /dev/null
@@ -0,0 +1,246 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.interceptors.distribution;
+
+import org.infinispan.CacheException;
+import org.infinispan.commands.FlagAffectedCommand;
+import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.write.DataWriteCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.interceptors.ClusteringInterceptor;
+import org.infinispan.interceptors.locking.ClusteringDependentLogic;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.ExceptionResponse;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.rpc.ResponseFilter;
+import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.rpc.RpcOptions;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.*;
+
+/**
+ * Base class for distribution of entries across a cluster.
+ *
+ * @author Manik Surtani
+ * @author Mircea.Markus@jboss.com
+ * @author Pete Muir
+ * @author Dan Berindei <dan@infinispan.org>
+ * @since 4.0
+ */
+public abstract class BaseDistributionInterceptor extends ClusteringInterceptor {
+
+   protected DistributionManager dm;
+
+   protected ClusteringDependentLogic cdl;
+
+   private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class);
+
+   @Override
+   protected Log getLog() {
+      return log;
+   }
+
+   @Inject
+   public void injectDependencies(DistributionManager distributionManager, ClusteringDependentLogic cdl) {
+      this.dm = distributionManager;
+      this.cdl = cdl;
+   }
+
+   @Override
+   protected final InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception {
+      GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null;
+      ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx);
+
+      List<Address> targets = new ArrayList<Address>(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key));
+      // if any of the recipients has left the cluster since the command was issued, just don't wait for its response
+      targets.retainAll(rpcManager.getTransport().getMembers());
+      ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress());
+      RpcOptions options = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false)
+            .responseFilter(filter).build();
+      Map<Address, Response> responses = rpcManager.invokeRemotely(targets, get, options);
+
+      if (!responses.isEmpty()) {
+         for (Response r : responses.values()) {
+            if (r instanceof SuccessfulResponse) {
+               InternalCacheValue cacheValue = (InternalCacheValue) ((SuccessfulResponse) r).getResponseValue();
+               return cacheValue.toInternalCacheEntry(key);
+            }
+         }
+      }
+
+      // TODO If everyone returned null, and the read CH has changed, retry the remote get.
+      // Otherwise our get command might be processed by the old owners after they have invalidated their data
+      // and we'd return a null even though the key exists on
+      return null;
+   }
+
+   protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable {
+      if (ctx.isInTxScope()) {
+         throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context");
+      }
+
+      RecipientGenerator recipientGenerator = new SingleKeyRecipientGenerator(command.getKey());
+
+      // see if we need to load values from remote sources first
+      remoteGetBeforeWrite(ctx, command, recipientGenerator);
+
+      // if this is local mode then skip distributing
+      if (isLocalModeForced(command)) {
+         return invokeNextInterceptor(ctx, command);
+      }
+
+      boolean isSync = isSynchronous(command);
+      if (!ctx.isOriginLocal()) {
+         Object returnValue = invokeNextInterceptor(ctx, command);
+         Address primaryOwner = cdl.getPrimaryOwner(command.getKey());
+            if (primaryOwner.equals(rpcManager.getAddress())) {
+                if (command.isConditional() && !command.isSuccessful()) {
+                    log.tracef(
+                            "Skipping the replication of the conditional command as it did not succeed on primary owner (%s).",
+                            command);
+                    return returnValue;
+                }
+                rpcManager.invokeRemotely(recipientGenerator.generateRecipients(), command,
+                        rpcManager.getDefaultRpcOptions(isSync));
+            } else {
+                log.errorf("Didn't invoke RPC because primaryOwner (%s) didn't match this node (%s)", primaryOwner,
+                           rpcManager.getAddress());
+                log.errorf("Hashcode is (%s) for Key (%s) .. it could be inconsistent in the cluster!",
+                           command.getKey().hashCode(), command.getKey());
+            }
+         return returnValue;
+      } else {
+         Address primaryOwner = cdl.getPrimaryOwner(command.getKey());
+         if (primaryOwner.equals(rpcManager.getAddress())) {
+            Object result = invokeNextInterceptor(ctx, command);
+            if (command.isConditional() && !command.isSuccessful()) {
+               log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command);
+               return result;
+            }
+            List<Address> recipients = recipientGenerator.generateRecipients();
+            log.tracef("I'm the primary owner, sending the command to all (%s) the recipients in order to be applied.", recipients);
+            // check if a single owner has been configured and the target for the key is the local address
+            boolean isSingleOwnerAndLocal = cacheConfiguration.clustering().hash().numOwners() == 1
+                  && recipients != null
+                  && recipients.size() == 1
+                  && recipients.get(0).equals(rpcManager.getTransport().getAddress());
+            if (!isSingleOwnerAndLocal) {
+               rpcManager.invokeRemotely(recipients, command, rpcManager.getDefaultRpcOptions(isSync));
+            }
+            return result;
+         } else {
+            log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", primaryOwner);
+            log.tracef("Hashcode is (%s) for Key (%s)", command.getKey().hashCode(), command.getKey());
+
+            Object localResult = invokeNextInterceptor(ctx, command);
+            boolean isSyncForwarding = isSync || isNeedReliableReturnValues(command);
+            Map<Address, Response> addressResponseMap = rpcManager.invokeRemotely(Collections.singletonList(primaryOwner), command,
+                  rpcManager.getDefaultRpcOptions(isSyncForwarding));
+            if (!isSyncForwarding) return localResult;
+
+            return getResponseFromPrimaryOwner(primaryOwner, addressResponseMap);
+         }
+      }
+   }
+
+   private Object getResponseFromPrimaryOwner(Address primaryOwner, Map<Address, Response> addressResponseMap) {
+      Response fromPrimaryOwner = addressResponseMap.get(primaryOwner);
+      if (fromPrimaryOwner == null) {
+         log.tracef("Primary owner %s returned null", primaryOwner);
+         return null;
+      }
+      if (!fromPrimaryOwner.isSuccessful()) {
+         Throwable cause = fromPrimaryOwner instanceof ExceptionResponse ? ((ExceptionResponse)fromPrimaryOwner).getException() : null;
+         throw new CacheException("Got unsuccessful response from primary owner: " + fromPrimaryOwner, cause);
+      } else {
+         return ((SuccessfulResponse) fromPrimaryOwner).getResponseValue();
+      }
+   }
+
+   protected abstract void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, RecipientGenerator keygen) throws Throwable;
+
+   interface RecipientGenerator {
+
+      Collection<Object> getKeys();
+
+      List<Address> generateRecipients();
+   }
+
+   class SingleKeyRecipientGenerator implements RecipientGenerator {
+      private final Object key;
+      private final Set<Object> keys;
+      private List<Address> recipients = null;
+
+      SingleKeyRecipientGenerator(Object key) {
+         this.key = key;
+         keys = Collections.singleton(key);
+      }
+
+      @Override
+      public List<Address> generateRecipients() {
+         if (recipients == null) {
+            recipients = cdl.getOwners(key);
+         }
+         return recipients;
+      }
+
+      @Override
+      public Collection<Object> getKeys() {
+         return keys;
+      }
+   }
+
+   class MultipleKeysRecipientGenerator implements RecipientGenerator {
+
+      private final Collection<Object> keys;
+      private List<Address> recipients = null;
+
+      MultipleKeysRecipientGenerator(Collection<Object> keys) {
+         this.keys = keys;
+      }
+
+      @Override
+      public List<Address> generateRecipients() {
+         if (recipients == null) {
+            recipients = cdl.getOwners(keys);
+         }
+         return recipients;
+      }
+
+      @Override
+      public Collection<Object> getKeys() {
+         return keys;
+      }
+   }
+}
index 3ae9c6ee6ca7d2ae70c43c3fce5e227b2bfb0725..59b068a4c9cf24fa804537cdc4c20c6827570c3a 100644 (file)
@@ -56,6 +56,7 @@
     <releaseplugin.version>2.3.2</releaseplugin.version>
     <commons.lang.version>3.1</commons.lang.version>
     <jacoco.version>0.5.3.201107060350</jacoco.version>
+    <enforcer.version>1.3.1</enforcer.version>
   </properties>
 
   <pluginRepositories>