From: Giovanni Meo Date: Mon, 23 Sep 2013 20:57:03 +0000 (+0200) Subject: Enhance debug capabilities X-Git-Tag: releasepom-0.1.0~27^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1decf2b08abb69e7d2845d6aa993ad52e817cf75 Enhance debug capabilities - Make sure infinispan complains very loud when it think an hashcode is not consistent in the cluster. - For now this patch is controller specific, but i will propose to upstream in infinispan as well. - Given BaseDistributionInterceptor.java came from version 5.3.0.Final enforce it in the build Change-Id: I029abac8d311ec3f49795ba325f075541fd3784b Signed-off-by: Giovanni Meo --- diff --git a/opendaylight/clustering/services_implementation/pom.xml b/opendaylight/clustering/services_implementation/pom.xml index d6bd287434..fddaa6c9dc 100644 --- a/opendaylight/clustering/services_implementation/pom.xml +++ b/opendaylight/clustering/services_implementation/pom.xml @@ -100,6 +100,32 @@ + + org.apache.maven.plugins + maven-enforcer-plugin + ${enforcer.version} + + + enforce-banned-dependencies + + enforce + + + + + + org.infinispan:infinispan-core:* + + + org.infinispan:infinispan-core:[5.3.0.Final] + + + + true + + + + diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java new file mode 100644 index 0000000000..e10563797d --- /dev/null +++ b/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java @@ -0,0 +1,246 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2009 Red Hat Inc. and/or its affiliates and other + * contributors as indicated by the @author tags. All rights reserved. + * See the copyright.txt in the distribution for a full listing of + * individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.infinispan.interceptors.distribution; + +import org.infinispan.CacheException; +import org.infinispan.commands.FlagAffectedCommand; +import org.infinispan.commands.remote.ClusteredGetCommand; +import org.infinispan.commands.write.DataWriteCommand; +import org.infinispan.commands.write.WriteCommand; +import org.infinispan.container.entries.InternalCacheEntry; +import org.infinispan.container.entries.InternalCacheValue; +import org.infinispan.context.InvocationContext; +import org.infinispan.context.impl.TxInvocationContext; +import org.infinispan.distribution.DistributionManager; +import org.infinispan.factories.annotations.Inject; +import org.infinispan.interceptors.ClusteringInterceptor; +import org.infinispan.interceptors.locking.ClusteringDependentLogic; +import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter; +import org.infinispan.remoting.responses.ExceptionResponse; +import org.infinispan.remoting.responses.Response; +import org.infinispan.remoting.responses.SuccessfulResponse; +import org.infinispan.remoting.rpc.ResponseFilter; +import org.infinispan.remoting.rpc.ResponseMode; +import org.infinispan.remoting.rpc.RpcOptions; +import org.infinispan.remoting.transport.Address; +import org.infinispan.transaction.xa.GlobalTransaction; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; + +import java.util.*; + +/** + * Base class for distribution of entries across a cluster. + * + * @author Manik Surtani + * @author Mircea.Markus@jboss.com + * @author Pete Muir + * @author Dan Berindei + * @since 4.0 + */ +public abstract class BaseDistributionInterceptor extends ClusteringInterceptor { + + protected DistributionManager dm; + + protected ClusteringDependentLogic cdl; + + private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class); + + @Override + protected Log getLog() { + return log; + } + + @Inject + public void injectDependencies(DistributionManager distributionManager, ClusteringDependentLogic cdl) { + this.dm = distributionManager; + this.cdl = cdl; + } + + @Override + protected final InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception { + GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null; + ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx); + + List
targets = new ArrayList
(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key)); + // if any of the recipients has left the cluster since the command was issued, just don't wait for its response + targets.retainAll(rpcManager.getTransport().getMembers()); + ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress()); + RpcOptions options = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false) + .responseFilter(filter).build(); + Map responses = rpcManager.invokeRemotely(targets, get, options); + + if (!responses.isEmpty()) { + for (Response r : responses.values()) { + if (r instanceof SuccessfulResponse) { + InternalCacheValue cacheValue = (InternalCacheValue) ((SuccessfulResponse) r).getResponseValue(); + return cacheValue.toInternalCacheEntry(key); + } + } + } + + // TODO If everyone returned null, and the read CH has changed, retry the remote get. + // Otherwise our get command might be processed by the old owners after they have invalidated their data + // and we'd return a null even though the key exists on + return null; + } + + protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { + if (ctx.isInTxScope()) { + throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context"); + } + + RecipientGenerator recipientGenerator = new SingleKeyRecipientGenerator(command.getKey()); + + // see if we need to load values from remote sources first + remoteGetBeforeWrite(ctx, command, recipientGenerator); + + // if this is local mode then skip distributing + if (isLocalModeForced(command)) { + return invokeNextInterceptor(ctx, command); + } + + boolean isSync = isSynchronous(command); + if (!ctx.isOriginLocal()) { + Object returnValue = invokeNextInterceptor(ctx, command); + Address primaryOwner = cdl.getPrimaryOwner(command.getKey()); + if (primaryOwner.equals(rpcManager.getAddress())) { + if (command.isConditional() && !command.isSuccessful()) { + log.tracef( + "Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", + command); + return returnValue; + } + rpcManager.invokeRemotely(recipientGenerator.generateRecipients(), command, + rpcManager.getDefaultRpcOptions(isSync)); + } else { + log.errorf("Didn't invoke RPC because primaryOwner (%s) didn't match this node (%s)", primaryOwner, + rpcManager.getAddress()); + log.errorf("Hashcode is (%s) for Key (%s) .. it could be inconsistent in the cluster!", + command.getKey().hashCode(), command.getKey()); + } + return returnValue; + } else { + Address primaryOwner = cdl.getPrimaryOwner(command.getKey()); + if (primaryOwner.equals(rpcManager.getAddress())) { + Object result = invokeNextInterceptor(ctx, command); + if (command.isConditional() && !command.isSuccessful()) { + log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command); + return result; + } + List
recipients = recipientGenerator.generateRecipients(); + log.tracef("I'm the primary owner, sending the command to all (%s) the recipients in order to be applied.", recipients); + // check if a single owner has been configured and the target for the key is the local address + boolean isSingleOwnerAndLocal = cacheConfiguration.clustering().hash().numOwners() == 1 + && recipients != null + && recipients.size() == 1 + && recipients.get(0).equals(rpcManager.getTransport().getAddress()); + if (!isSingleOwnerAndLocal) { + rpcManager.invokeRemotely(recipients, command, rpcManager.getDefaultRpcOptions(isSync)); + } + return result; + } else { + log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", primaryOwner); + log.tracef("Hashcode is (%s) for Key (%s)", command.getKey().hashCode(), command.getKey()); + + Object localResult = invokeNextInterceptor(ctx, command); + boolean isSyncForwarding = isSync || isNeedReliableReturnValues(command); + Map addressResponseMap = rpcManager.invokeRemotely(Collections.singletonList(primaryOwner), command, + rpcManager.getDefaultRpcOptions(isSyncForwarding)); + if (!isSyncForwarding) return localResult; + + return getResponseFromPrimaryOwner(primaryOwner, addressResponseMap); + } + } + } + + private Object getResponseFromPrimaryOwner(Address primaryOwner, Map addressResponseMap) { + Response fromPrimaryOwner = addressResponseMap.get(primaryOwner); + if (fromPrimaryOwner == null) { + log.tracef("Primary owner %s returned null", primaryOwner); + return null; + } + if (!fromPrimaryOwner.isSuccessful()) { + Throwable cause = fromPrimaryOwner instanceof ExceptionResponse ? ((ExceptionResponse)fromPrimaryOwner).getException() : null; + throw new CacheException("Got unsuccessful response from primary owner: " + fromPrimaryOwner, cause); + } else { + return ((SuccessfulResponse) fromPrimaryOwner).getResponseValue(); + } + } + + protected abstract void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, RecipientGenerator keygen) throws Throwable; + + interface RecipientGenerator { + + Collection getKeys(); + + List
generateRecipients(); + } + + class SingleKeyRecipientGenerator implements RecipientGenerator { + private final Object key; + private final Set keys; + private List
recipients = null; + + SingleKeyRecipientGenerator(Object key) { + this.key = key; + keys = Collections.singleton(key); + } + + @Override + public List
generateRecipients() { + if (recipients == null) { + recipients = cdl.getOwners(key); + } + return recipients; + } + + @Override + public Collection getKeys() { + return keys; + } + } + + class MultipleKeysRecipientGenerator implements RecipientGenerator { + + private final Collection keys; + private List
recipients = null; + + MultipleKeysRecipientGenerator(Collection keys) { + this.keys = keys; + } + + @Override + public List
generateRecipients() { + if (recipients == null) { + recipients = cdl.getOwners(keys); + } + return recipients; + } + + @Override + public Collection getKeys() { + return keys; + } + } +} diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 3ae9c6ee6c..59b068a4c9 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -56,6 +56,7 @@ 2.3.2 3.1 0.5.3.201107060350 + 1.3.1