BUG 3067: Added support for blocking if schema is not available. 63/22063/2
authorTony Tkacik <ttkacik@cisco.com>
Tue, 5 May 2015 11:57:57 +0000 (13:57 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 8 Jun 2015 10:26:57 +0000 (10:26 +0000)
Codecs are able to send specific exception, which describes
reason of failure for codec failing. If the reason is
missing schema (not yet loaded) we catch it and retry
serialization after schema context is upgraded.

User thread will be blocked maximally for 5 seconds
after that it will be unblocked.

Change-Id: I3494702eae644b495211a1a34c074e268c2f5f46
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
(cherry picked from commit 6aba3ef282f86a84920fa5a7ccf21c91d459806b)

opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/RuntimeMappingModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeServiceAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMMountPointServiceAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/FutureSchema.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingNormalizedCodecTest.java

index c228244..87ce27d 100644 (file)
@@ -51,14 +51,14 @@ public final class RuntimeMappingModule extends AbstractRuntimeMappingModule {
     @Override
     public java.lang.AutoCloseable createInstance() {
         final GeneratedClassLoadingStrategy classLoading = getGlobalClassLoadingStrategy();
-        BindingNormalizedNodeCodecRegistry codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(SingletonHolder.JAVASSIST));
-        BindingToNormalizedNodeCodec instance = new BindingToNormalizedNodeCodec(classLoading, codecRegistry);
+        final BindingNormalizedNodeCodecRegistry codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(SingletonHolder.JAVASSIST));
+        final BindingToNormalizedNodeCodec instance = new BindingToNormalizedNodeCodec(classLoading, codecRegistry,getWaitForSchema());
         bundleContext.registerService(SchemaContextListener.class, instance, new Hashtable<String,String>());
         return instance;
     }
 
     private GeneratedClassLoadingStrategy getGlobalClassLoadingStrategy() {
-        ServiceReference<GeneratedClassLoadingStrategy> ref = bundleContext.getServiceReference(GeneratedClassLoadingStrategy.class);
+        final ServiceReference<GeneratedClassLoadingStrategy> ref = bundleContext.getServiceReference(GeneratedClassLoadingStrategy.class);
         return bundleContext.getService(ref);
     }
 
index b37bb04..7e63e54 100644 (file)
@@ -65,7 +65,7 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
             final InstanceIdentifier<?> path, final DataChangeListener listener, final DataChangeScope triggeringScope) {
         final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
                 triggeringScope);
-        final YangInstanceIdentifier domPath = codec.toNormalized(path);
+        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
         final ListenerRegistration<DOMDataChangeListener> domRegistration = domDataBroker.registerDataChangeListener(store,
                 domPath, domDataChangeListener, triggeringScope);
         return new ListenerRegistrationImpl(listener, domRegistration);
index eadde73..3d337d8 100644 (file)
@@ -61,7 +61,7 @@ abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInsta
         Preconditions.checkArgument(!path.isWildcarded(), "Invalid read of wildcarded path %s", path);
 
         return MappingCheckedFuture.create(
-                    Futures.transform(readTx.read(store, codec.toNormalized(path)),
+                    Futures.transform(readTx.read(store, codec.toYangInstanceIdentifierBlocking(path)),
                                       codec.deserializeFunction(path)),
                     ReadFailedException.MAPPER);
     }
index 7e1f112..1e3e419 100644 (file)
@@ -126,7 +126,7 @@ public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction
             final InstanceIdentifier<?> path) {
         Preconditions.checkArgument(!path.isWildcarded(), "Cannot delete wildcarded path %s", path);
 
-        final YangInstanceIdentifier normalized = getCodec().toNormalized(path);
+        final YangInstanceIdentifier normalized = getCodec().toYangInstanceIdentifierBlocking(path);
         getDelegate().delete(store, normalized);
     }
 
index ad0ab54..02daa45 100644 (file)
@@ -54,7 +54,7 @@ final class BindingDOMDataTreeChangeServiceAdapter implements DataTreeChangeServ
     }
 
     private DOMDataTreeIdentifier toDomTreeIdentifier(final DataTreeIdentifier<?> treeId) {
-        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifier(treeId.getRootIdentifier());
+        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(treeId.getRootIdentifier());
         return new DOMDataTreeIdentifier(treeId.getDatastoreType(), domPath);
     }
 }
index 6bb40ed..759c470 100644 (file)
@@ -45,7 +45,7 @@ public class BindingDOMMountPointServiceAdapter implements MountPointService {
     @Override
     public Optional<MountPoint> getMountPoint(InstanceIdentifier<?> mountPoint) {
 
-        YangInstanceIdentifier domPath = codec.toNormalized(mountPoint);
+        YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(mountPoint);
         Optional<DOMMountPoint> domMount = mountService.getMountPoint(domPath);
         if(domMount.isPresent()) {
             return Optional.<MountPoint>fromNullable(bindingMountpoints.getUnchecked(domMount.get()));
index 3795ad3..776691c 100644 (file)
@@ -15,12 +15,16 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableBiMap;
 import java.lang.reflect.Method;
+import java.net.URI;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
@@ -30,6 +34,7 @@ import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory
 import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
 import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.binding.data.codec.impl.MissingSchemaException;
 import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
 import org.opendaylight.yangtools.yang.binding.BindingMapping;
@@ -51,28 +56,51 @@ import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, BindingNormalizedNodeSerializer, SchemaContextListener, AutoCloseable {
 
+    private static final long WAIT_DURATION_SEC = 5;
+    private static final Logger LOG = LoggerFactory.getLogger(BindingToNormalizedNodeCodec.class);
+
     private final BindingNormalizedNodeCodecRegistry codecRegistry;
-    private DataNormalizer legacyToNormalized;
+
     private final GeneratedClassLoadingStrategy classLoadingStrategy;
-    private BindingRuntimeContext runtimeContext;
+    private final FutureSchema futureSchema;
     private final LoadingCache<InstanceIdentifier<?>, YangInstanceIdentifier> iiCache = CacheBuilder.newBuilder()
             .softValues().build(new CacheLoader<InstanceIdentifier<?>, YangInstanceIdentifier>() {
 
                 @Override
                 public YangInstanceIdentifier load(final InstanceIdentifier<?> key) throws Exception {
-                    return toYangInstanceIdentifier(key);
+                    return toYangInstanceIdentifierBlocking(key);
                 }
 
             });
 
+    private BindingRuntimeContext runtimeContext;
+    private DataNormalizer legacyToNormalized;
+
     public BindingToNormalizedNodeCodec(final GeneratedClassLoadingStrategy classLoadingStrategy,
             final BindingNormalizedNodeCodecRegistry codecRegistry) {
+        this(classLoadingStrategy,codecRegistry,false);
+
+    }
+
+    public BindingToNormalizedNodeCodec(final GeneratedClassLoadingStrategy classLoadingStrategy,
+            final BindingNormalizedNodeCodecRegistry codecRegistry,final boolean waitForSchema) {
         this.classLoadingStrategy = Preconditions.checkNotNull(classLoadingStrategy,"classLoadingStrategy");
         this.codecRegistry = Preconditions.checkNotNull(codecRegistry,"codecRegistry");
+        this.futureSchema = waitForSchema ? new FutureSchema(WAIT_DURATION_SEC, TimeUnit.SECONDS) : null;
+    }
 
+    final YangInstanceIdentifier toYangInstanceIdentifierBlocking(final InstanceIdentifier<? extends DataObject> binding) {
+        try {
+            return codecRegistry.toYangInstanceIdentifier(binding);
+        } catch (final MissingSchemaException e) {
+            waitForSchema(decompose(binding),e);
+            return codecRegistry.toYangInstanceIdentifier(binding);
+        }
     }
 
     /**
@@ -203,6 +231,9 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
         legacyToNormalized = new DataNormalizer (arg0);
         runtimeContext = BindingRuntimeContext.create(classLoadingStrategy, arg0);
         codecRegistry.onBindingRuntimeContextUpdated(runtimeContext);
+        if(futureSchema != null) {
+            futureSchema.onRuntimeContextUpdated(runtimeContext);
+        }
     }
 
     public <T extends DataObject> Function<Optional<NormalizedNode<?, ?>>, Optional<T>>  deserializeFunction(final InstanceIdentifier<T> path) {
@@ -244,8 +275,7 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
 
     // FIXME: This should be probably part of Binding Runtime context
     public ImmutableBiMap<Method, SchemaPath> getRpcMethodToSchemaPath(final Class<? extends RpcService> key) {
-        final QNameModule moduleName = BindingReflections.getQNameModule(key);
-        final Module module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(moduleName.getNamespace(), moduleName.getRevision());
+        final Module module = getModuleBlocking(key);
         final ImmutableBiMap.Builder<Method, SchemaPath> ret = ImmutableBiMap.<Method, SchemaPath>builder();
         try {
             for (final RpcDefinition rpcDef : module.getRpcs()) {
@@ -259,8 +289,7 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
     }
 
     protected ImmutableBiMap<Method, RpcDefinition> getRpcMethodToSchema(final Class<? extends RpcService> key) {
-        final QNameModule moduleName = BindingReflections.getQNameModule(key);
-        final Module module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(moduleName.getNamespace(), moduleName.getRevision());
+        final Module module = getModuleBlocking(key);
         final ImmutableBiMap.Builder<Method, RpcDefinition> ret = ImmutableBiMap.builder();
         try {
             for (final RpcDefinition rpcDef : module.getRpcs()) {
@@ -273,6 +302,28 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
         return ret.build();
     }
 
+    private Module getModuleBlocking(final Class<?> modeledClass) {
+        final QNameModule moduleName = BindingReflections.getQNameModule(modeledClass);
+        final URI namespace = moduleName.getNamespace();
+        final Date revision = moduleName.getRevision();
+        Module module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(namespace, revision);
+        if(module == null && futureSchema != null && futureSchema.waitForSchema(namespace,revision)) {
+            module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(namespace, revision);
+        }
+        Preconditions.checkState(module != null, "Schema for %s is not available.", modeledClass);
+        return module;
+    }
+
+    private void waitForSchema(final Collection<Class<?>> binding, final MissingSchemaException e) {
+        if(futureSchema != null) {
+            LOG.warn("Blocking thread to wait for schema convergence updates for {} {}",futureSchema.getDuration(), futureSchema.getUnit());
+            if(!futureSchema.waitForSchema(binding)) {
+                return;
+            }
+        }
+        throw e;
+    }
+
     private Method findRpcMethod(final Class<? extends RpcService> key, final RpcDefinition rpcDef) throws NoSuchMethodException {
         final String methodName = BindingMapping.getMethodName(rpcDef.getQName());
         if(rpcDef.getInput() != null) {
@@ -307,6 +358,7 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
         return new SimpleEntry<InstanceIdentifier<?>, BindingCodecTreeNode<?>>(bindingPath, codecContext);
     }
 
+    @SuppressWarnings("unchecked")
     public Set<Class<? extends Notification>> getNotificationClasses(final Set<SchemaPath> interested) {
         final Set<Class<? extends Notification>> result = new HashSet<>();
         final Set<NotificationDefinition> knownNotifications = runtimeContext.getSchemaContext().getNotifications();
@@ -316,10 +368,19 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
                     result.add((Class<? extends Notification>) runtimeContext.getClassForSchema(notification));
                 } catch (final IllegalStateException e) {
                     // Ignore
+                    LOG.warn("Class for {} is currently not known.",notification.getPath(),e);
                 }
             }
         }
         return result;
     }
 
+    private static Collection<Class<?>> decompose(final InstanceIdentifier<?> path) {
+        final Set<Class<?>> clazzes = new HashSet<>();
+        for(final InstanceIdentifier.PathArgument arg : path.getPathArguments()) {
+            clazzes.add(arg.getType());
+        }
+        return clazzes;
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/FutureSchema.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/FutureSchema.java
new file mode 100644 (file)
index 0000000..4fa76c9
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the terms of the Eclipse
+ * Public License v1.0 which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.SettableFuture;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
+
+class FutureSchema implements AutoCloseable {
+
+    private final List<FutureSchemaPredicate> postponedOperations = new CopyOnWriteArrayList<>();
+    private final long duration;
+    private final TimeUnit unit;
+
+    protected FutureSchema(final long time, final TimeUnit unit) {
+        this.duration = time;
+        this.unit = unit;
+    }
+
+    void onRuntimeContextUpdated(final BindingRuntimeContext context) {
+        for (final FutureSchemaPredicate op : postponedOperations) {
+            op.unlockIfPossible(context);
+        }
+    }
+
+    long getDuration() {
+        return duration;
+    }
+
+    TimeUnit getUnit() {
+        return unit;
+    }
+
+    @Override
+    public void close() {
+        for (final FutureSchemaPredicate op : postponedOperations) {
+            op.cancel();
+        }
+    }
+
+    private static boolean isSchemaAvailable(final Class<?> clz, final BindingRuntimeContext context) {
+        final Object schema;
+        if (Augmentation.class.isAssignableFrom(clz)) {
+            schema = context.getAugmentationDefinition(clz);
+        } else {
+            schema = context.getSchemaDefinition(clz);
+        }
+        return schema != null;
+    }
+
+    boolean waitForSchema(final URI namespace, final Date revision) {
+        final FutureSchemaPredicate postponedOp = new FutureSchemaPredicate() {
+
+            @Override
+            public boolean apply(final BindingRuntimeContext input) {
+                return input.getSchemaContext().findModuleByNamespaceAndRevision(namespace, revision) != null;
+            }
+        };
+        return postponedOp.waitForSchema();
+    }
+
+    boolean waitForSchema(final Collection<Class<?>> bindingClasses) {
+        final FutureSchemaPredicate postponedOp = new FutureSchemaPredicate() {
+
+            @Override
+            public boolean apply(final BindingRuntimeContext context) {
+                for (final Class<?> clz : bindingClasses) {
+                    if (!isSchemaAvailable(clz, context)) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        };
+        return postponedOp.waitForSchema();
+    }
+
+    private abstract class FutureSchemaPredicate implements Predicate<BindingRuntimeContext> {
+
+        final boolean waitForSchema() {
+            try {
+                schemaPromise.get(duration, unit);
+                return true;
+            } catch (final InterruptedException | ExecutionException e) {
+                throw Throwables.propagate(e);
+            } catch (final TimeoutException e) {
+                return false;
+            } finally {
+                postponedOperations.remove(this);
+            }
+        }
+
+        final void unlockIfPossible(final BindingRuntimeContext context) {
+            if (!schemaPromise.isDone() && apply(context)) {
+                schemaPromise.set(null);
+            }
+        }
+
+        final void cancel() {
+            schemaPromise.cancel(true);
+        }
+
+        private final SettableFuture<?> schemaPromise = SettableFuture.create();
+    }
+
+}
index 866cb84..84cc5c7 100644 (file)
@@ -194,7 +194,7 @@ module opendaylight-sal-binding-broker-impl {
                 container schema-service {
                     uses config:service-ref {
                         refine type {
-                            mandatory false;
+                            mandatory true;
                             config:required-identity dom:schema-service;
                         }
                     }
@@ -204,9 +204,13 @@ module opendaylight-sal-binding-broker-impl {
     }
 
 
-    augment "/config:modules/config:module/config:state" {
+    augment "/config:modules/config:module/config:configuration" {
         case runtime-generated-mapping {
             when "/config:modules/config:module/config:type = 'runtime-generated-mapping'";
+            leaf wait-for-schema {
+                default "false";
+                type boolean;
+            }
         }
     }
 
index ad3d6c6..4767e7b 100644 (file)
@@ -36,8 +36,8 @@ public class BindingNormalizedCodecTest extends AbstractSchemaAwareTest {
 
     @Override
     protected void setupWithSchema(final SchemaContext context) {
-        DataObjectSerializerGenerator streamWriter = StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()));
-        BindingNormalizedNodeCodecRegistry registry = new BindingNormalizedNodeCodecRegistry(streamWriter);
+        final DataObjectSerializerGenerator streamWriter = StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()));
+        final BindingNormalizedNodeCodecRegistry registry = new BindingNormalizedNodeCodecRegistry(streamWriter);
         codec = new BindingToNormalizedNodeCodec(GeneratedClassLoadingStrategy.getTCCLClassLoadingStrategy(), registry);
         codec.onGlobalContextUpdated(context);
     };
@@ -45,7 +45,7 @@ public class BindingNormalizedCodecTest extends AbstractSchemaAwareTest {
     @Test
     public void testComplexAugmentationSerialization() {
 
-        PathArgument lastArg = codec.toNormalized(BA_TREE_COMPLEX_USES).getLastPathArgument();
+        final PathArgument lastArg = codec.toYangInstanceIdentifier(BA_TREE_COMPLEX_USES).getLastPathArgument();
         assertTrue(lastArg instanceof AugmentationIdentifier);
     }
 
@@ -53,7 +53,7 @@ public class BindingNormalizedCodecTest extends AbstractSchemaAwareTest {
     @Test
     public void testLeafOnlyAugmentationSerialization() {
 
-        PathArgument leafOnlyLastArg = codec.toNormalized(BA_TREE_LEAF_ONLY).getLastPathArgument();
+        final PathArgument leafOnlyLastArg = codec.toYangInstanceIdentifier(BA_TREE_LEAF_ONLY).getLastPathArgument();
         assertTrue(leafOnlyLastArg instanceof AugmentationIdentifier);
         assertTrue(((AugmentationIdentifier) leafOnlyLastArg).getPossibleChildNames().contains(SIMPLE_VALUE_QNAME));
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.