BUG 3067: Added support for blocking if schema is not available. 63/22063/2
authorTony Tkacik <ttkacik@cisco.com>
Tue, 5 May 2015 11:57:57 +0000 (13:57 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 8 Jun 2015 10:26:57 +0000 (10:26 +0000)
Codecs are able to send specific exception, which describes
reason of failure for codec failing. If the reason is
missing schema (not yet loaded) we catch it and retry
serialization after schema context is upgraded.

User thread will be blocked maximally for 5 seconds
after that it will be unblocked.

Change-Id: I3494702eae644b495211a1a34c074e268c2f5f46
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
(cherry picked from commit 6aba3ef282f86a84920fa5a7ccf21c91d459806b)

opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/RuntimeMappingModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeServiceAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMMountPointServiceAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/FutureSchema.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/yang/opendaylight-binding-broker-impl.yang
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/BindingNormalizedCodecTest.java

index c228244800bda2810139fe93e3e7e20cf90c2452..87ce27d501159a0736b9b73d31545e660716a878 100644 (file)
@@ -51,14 +51,14 @@ public final class RuntimeMappingModule extends AbstractRuntimeMappingModule {
     @Override
     public java.lang.AutoCloseable createInstance() {
         final GeneratedClassLoadingStrategy classLoading = getGlobalClassLoadingStrategy();
-        BindingNormalizedNodeCodecRegistry codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(SingletonHolder.JAVASSIST));
-        BindingToNormalizedNodeCodec instance = new BindingToNormalizedNodeCodec(classLoading, codecRegistry);
+        final BindingNormalizedNodeCodecRegistry codecRegistry = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(SingletonHolder.JAVASSIST));
+        final BindingToNormalizedNodeCodec instance = new BindingToNormalizedNodeCodec(classLoading, codecRegistry,getWaitForSchema());
         bundleContext.registerService(SchemaContextListener.class, instance, new Hashtable<String,String>());
         return instance;
     }
 
     private GeneratedClassLoadingStrategy getGlobalClassLoadingStrategy() {
-        ServiceReference<GeneratedClassLoadingStrategy> ref = bundleContext.getServiceReference(GeneratedClassLoadingStrategy.class);
+        final ServiceReference<GeneratedClassLoadingStrategy> ref = bundleContext.getServiceReference(GeneratedClassLoadingStrategy.class);
         return bundleContext.getService(ref);
     }
 
index b37bb045b118c58ccae9b8c3c6e09fa297a69741..7e63e54f4b5c5358e33c632c76d669b602a2bf63 100644 (file)
@@ -65,7 +65,7 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
             final InstanceIdentifier<?> path, final DataChangeListener listener, final DataChangeScope triggeringScope) {
         final DOMDataChangeListener domDataChangeListener = new TranslatingDataChangeInvoker(store, path, listener,
                 triggeringScope);
-        final YangInstanceIdentifier domPath = codec.toNormalized(path);
+        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
         final ListenerRegistration<DOMDataChangeListener> domRegistration = domDataBroker.registerDataChangeListener(store,
                 domPath, domDataChangeListener, triggeringScope);
         return new ListenerRegistrationImpl(listener, domRegistration);
index eadde73e42c0daf632da84dc84fadfc08a171f92..3d337d879d88e9d9ff07bb8b18a3b9589b7efcac 100644 (file)
@@ -61,7 +61,7 @@ abstract class AbstractForwardedTransaction<T extends AsyncTransaction<YangInsta
         Preconditions.checkArgument(!path.isWildcarded(), "Invalid read of wildcarded path %s", path);
 
         return MappingCheckedFuture.create(
-                    Futures.transform(readTx.read(store, codec.toNormalized(path)),
+                    Futures.transform(readTx.read(store, codec.toYangInstanceIdentifierBlocking(path)),
                                       codec.deserializeFunction(path)),
                     ReadFailedException.MAPPER);
     }
index 7e1f112c4ad4ffc20d0e67a3069f434292c47ef7..1e3e41909aad90172f0f5d0a19cc9a9d0aaa7fce 100644 (file)
@@ -126,7 +126,7 @@ public abstract class AbstractWriteTransaction<T extends DOMDataWriteTransaction
             final InstanceIdentifier<?> path) {
         Preconditions.checkArgument(!path.isWildcarded(), "Cannot delete wildcarded path %s", path);
 
-        final YangInstanceIdentifier normalized = getCodec().toNormalized(path);
+        final YangInstanceIdentifier normalized = getCodec().toYangInstanceIdentifierBlocking(path);
         getDelegate().delete(store, normalized);
     }
 
index ad0ab54e9f700479200f5411316b2430c6908788..02daa4559aba652ade0b7662403aa0431e27a4fd 100644 (file)
@@ -54,7 +54,7 @@ final class BindingDOMDataTreeChangeServiceAdapter implements DataTreeChangeServ
     }
 
     private DOMDataTreeIdentifier toDomTreeIdentifier(final DataTreeIdentifier<?> treeId) {
-        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifier(treeId.getRootIdentifier());
+        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(treeId.getRootIdentifier());
         return new DOMDataTreeIdentifier(treeId.getDatastoreType(), domPath);
     }
 }
index 6bb40edabc962493821abfb0b710c67b3c86b2ed..759c470f0610100679c92de06d16a47c71ba7f32 100644 (file)
@@ -45,7 +45,7 @@ public class BindingDOMMountPointServiceAdapter implements MountPointService {
     @Override
     public Optional<MountPoint> getMountPoint(InstanceIdentifier<?> mountPoint) {
 
-        YangInstanceIdentifier domPath = codec.toNormalized(mountPoint);
+        YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(mountPoint);
         Optional<DOMMountPoint> domMount = mountService.getMountPoint(domPath);
         if(domMount.isPresent()) {
             return Optional.<MountPoint>fromNullable(bindingMountpoints.getUnchecked(domMount.get()));
index 3795ad3bac8c4787bda7ff27cd22668b398eeb33..776691c0475aef6d969d3262176aaa86ce3bbd6d 100644 (file)
@@ -15,12 +15,16 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableBiMap;
 import java.lang.reflect.Method;
+import java.net.URI;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
@@ -30,6 +34,7 @@ import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory
 import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
 import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.binding.data.codec.impl.MissingSchemaException;
 import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
 import org.opendaylight.yangtools.yang.binding.BindingMapping;
@@ -51,28 +56,51 @@ import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, BindingNormalizedNodeSerializer, SchemaContextListener, AutoCloseable {
 
+    private static final long WAIT_DURATION_SEC = 5;
+    private static final Logger LOG = LoggerFactory.getLogger(BindingToNormalizedNodeCodec.class);
+
     private final BindingNormalizedNodeCodecRegistry codecRegistry;
-    private DataNormalizer legacyToNormalized;
+
     private final GeneratedClassLoadingStrategy classLoadingStrategy;
-    private BindingRuntimeContext runtimeContext;
+    private final FutureSchema futureSchema;
     private final LoadingCache<InstanceIdentifier<?>, YangInstanceIdentifier> iiCache = CacheBuilder.newBuilder()
             .softValues().build(new CacheLoader<InstanceIdentifier<?>, YangInstanceIdentifier>() {
 
                 @Override
                 public YangInstanceIdentifier load(final InstanceIdentifier<?> key) throws Exception {
-                    return toYangInstanceIdentifier(key);
+                    return toYangInstanceIdentifierBlocking(key);
                 }
 
             });
 
+    private BindingRuntimeContext runtimeContext;
+    private DataNormalizer legacyToNormalized;
+
     public BindingToNormalizedNodeCodec(final GeneratedClassLoadingStrategy classLoadingStrategy,
             final BindingNormalizedNodeCodecRegistry codecRegistry) {
+        this(classLoadingStrategy,codecRegistry,false);
+
+    }
+
+    public BindingToNormalizedNodeCodec(final GeneratedClassLoadingStrategy classLoadingStrategy,
+            final BindingNormalizedNodeCodecRegistry codecRegistry,final boolean waitForSchema) {
         this.classLoadingStrategy = Preconditions.checkNotNull(classLoadingStrategy,"classLoadingStrategy");
         this.codecRegistry = Preconditions.checkNotNull(codecRegistry,"codecRegistry");
+        this.futureSchema = waitForSchema ? new FutureSchema(WAIT_DURATION_SEC, TimeUnit.SECONDS) : null;
+    }
 
+    final YangInstanceIdentifier toYangInstanceIdentifierBlocking(final InstanceIdentifier<? extends DataObject> binding) {
+        try {
+            return codecRegistry.toYangInstanceIdentifier(binding);
+        } catch (final MissingSchemaException e) {
+            waitForSchema(decompose(binding),e);
+            return codecRegistry.toYangInstanceIdentifier(binding);
+        }
     }
 
     /**
@@ -203,6 +231,9 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
         legacyToNormalized = new DataNormalizer (arg0);
         runtimeContext = BindingRuntimeContext.create(classLoadingStrategy, arg0);
         codecRegistry.onBindingRuntimeContextUpdated(runtimeContext);
+        if(futureSchema != null) {
+            futureSchema.onRuntimeContextUpdated(runtimeContext);
+        }
     }
 
     public <T extends DataObject> Function<Optional<NormalizedNode<?, ?>>, Optional<T>>  deserializeFunction(final InstanceIdentifier<T> path) {
@@ -244,8 +275,7 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
 
     // FIXME: This should be probably part of Binding Runtime context
     public ImmutableBiMap<Method, SchemaPath> getRpcMethodToSchemaPath(final Class<? extends RpcService> key) {
-        final QNameModule moduleName = BindingReflections.getQNameModule(key);
-        final Module module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(moduleName.getNamespace(), moduleName.getRevision());
+        final Module module = getModuleBlocking(key);
         final ImmutableBiMap.Builder<Method, SchemaPath> ret = ImmutableBiMap.<Method, SchemaPath>builder();
         try {
             for (final RpcDefinition rpcDef : module.getRpcs()) {
@@ -259,8 +289,7 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
     }
 
     protected ImmutableBiMap<Method, RpcDefinition> getRpcMethodToSchema(final Class<? extends RpcService> key) {
-        final QNameModule moduleName = BindingReflections.getQNameModule(key);
-        final Module module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(moduleName.getNamespace(), moduleName.getRevision());
+        final Module module = getModuleBlocking(key);
         final ImmutableBiMap.Builder<Method, RpcDefinition> ret = ImmutableBiMap.builder();
         try {
             for (final RpcDefinition rpcDef : module.getRpcs()) {
@@ -273,6 +302,28 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
         return ret.build();
     }
 
+    private Module getModuleBlocking(final Class<?> modeledClass) {
+        final QNameModule moduleName = BindingReflections.getQNameModule(modeledClass);
+        final URI namespace = moduleName.getNamespace();
+        final Date revision = moduleName.getRevision();
+        Module module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(namespace, revision);
+        if(module == null && futureSchema != null && futureSchema.waitForSchema(namespace,revision)) {
+            module = runtimeContext.getSchemaContext().findModuleByNamespaceAndRevision(namespace, revision);
+        }
+        Preconditions.checkState(module != null, "Schema for %s is not available.", modeledClass);
+        return module;
+    }
+
+    private void waitForSchema(final Collection<Class<?>> binding, final MissingSchemaException e) {
+        if(futureSchema != null) {
+            LOG.warn("Blocking thread to wait for schema convergence updates for {} {}",futureSchema.getDuration(), futureSchema.getUnit());
+            if(!futureSchema.waitForSchema(binding)) {
+                return;
+            }
+        }
+        throw e;
+    }
+
     private Method findRpcMethod(final Class<? extends RpcService> key, final RpcDefinition rpcDef) throws NoSuchMethodException {
         final String methodName = BindingMapping.getMethodName(rpcDef.getQName());
         if(rpcDef.getInput() != null) {
@@ -307,6 +358,7 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
         return new SimpleEntry<InstanceIdentifier<?>, BindingCodecTreeNode<?>>(bindingPath, codecContext);
     }
 
+    @SuppressWarnings("unchecked")
     public Set<Class<? extends Notification>> getNotificationClasses(final Set<SchemaPath> interested) {
         final Set<Class<? extends Notification>> result = new HashSet<>();
         final Set<NotificationDefinition> knownNotifications = runtimeContext.getSchemaContext().getNotifications();
@@ -316,10 +368,19 @@ public final class BindingToNormalizedNodeCodec implements BindingCodecTreeFacto
                     result.add((Class<? extends Notification>) runtimeContext.getClassForSchema(notification));
                 } catch (final IllegalStateException e) {
                     // Ignore
+                    LOG.warn("Class for {} is currently not known.",notification.getPath(),e);
                 }
             }
         }
         return result;
     }
 
+    private static Collection<Class<?>> decompose(final InstanceIdentifier<?> path) {
+        final Set<Class<?>> clazzes = new HashSet<>();
+        for(final InstanceIdentifier.PathArgument arg : path.getPathArguments()) {
+            clazzes.add(arg.getType());
+        }
+        return clazzes;
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/FutureSchema.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/FutureSchema.java
new file mode 100644 (file)
index 0000000..4fa76c9
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the terms of the Eclipse
+ * Public License v1.0 which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.SettableFuture;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
+
+class FutureSchema implements AutoCloseable {
+
+    private final List<FutureSchemaPredicate> postponedOperations = new CopyOnWriteArrayList<>();
+    private final long duration;
+    private final TimeUnit unit;
+
+    protected FutureSchema(final long time, final TimeUnit unit) {
+        this.duration = time;
+        this.unit = unit;
+    }
+
+    void onRuntimeContextUpdated(final BindingRuntimeContext context) {
+        for (final FutureSchemaPredicate op : postponedOperations) {
+            op.unlockIfPossible(context);
+        }
+    }
+
+    long getDuration() {
+        return duration;
+    }
+
+    TimeUnit getUnit() {
+        return unit;
+    }
+
+    @Override
+    public void close() {
+        for (final FutureSchemaPredicate op : postponedOperations) {
+            op.cancel();
+        }
+    }
+
+    private static boolean isSchemaAvailable(final Class<?> clz, final BindingRuntimeContext context) {
+        final Object schema;
+        if (Augmentation.class.isAssignableFrom(clz)) {
+            schema = context.getAugmentationDefinition(clz);
+        } else {
+            schema = context.getSchemaDefinition(clz);
+        }
+        return schema != null;
+    }
+
+    boolean waitForSchema(final URI namespace, final Date revision) {
+        final FutureSchemaPredicate postponedOp = new FutureSchemaPredicate() {
+
+            @Override
+            public boolean apply(final BindingRuntimeContext input) {
+                return input.getSchemaContext().findModuleByNamespaceAndRevision(namespace, revision) != null;
+            }
+        };
+        return postponedOp.waitForSchema();
+    }
+
+    boolean waitForSchema(final Collection<Class<?>> bindingClasses) {
+        final FutureSchemaPredicate postponedOp = new FutureSchemaPredicate() {
+
+            @Override
+            public boolean apply(final BindingRuntimeContext context) {
+                for (final Class<?> clz : bindingClasses) {
+                    if (!isSchemaAvailable(clz, context)) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        };
+        return postponedOp.waitForSchema();
+    }
+
+    private abstract class FutureSchemaPredicate implements Predicate<BindingRuntimeContext> {
+
+        final boolean waitForSchema() {
+            try {
+                schemaPromise.get(duration, unit);
+                return true;
+            } catch (final InterruptedException | ExecutionException e) {
+                throw Throwables.propagate(e);
+            } catch (final TimeoutException e) {
+                return false;
+            } finally {
+                postponedOperations.remove(this);
+            }
+        }
+
+        final void unlockIfPossible(final BindingRuntimeContext context) {
+            if (!schemaPromise.isDone() && apply(context)) {
+                schemaPromise.set(null);
+            }
+        }
+
+        final void cancel() {
+            schemaPromise.cancel(true);
+        }
+
+        private final SettableFuture<?> schemaPromise = SettableFuture.create();
+    }
+
+}
index 866cb844d189711458785614edaca761f2f3f850..84cc5c7b99121e08e801b5af82777264712154eb 100644 (file)
@@ -194,7 +194,7 @@ module opendaylight-sal-binding-broker-impl {
                 container schema-service {
                     uses config:service-ref {
                         refine type {
-                            mandatory false;
+                            mandatory true;
                             config:required-identity dom:schema-service;
                         }
                     }
@@ -204,9 +204,13 @@ module opendaylight-sal-binding-broker-impl {
     }
 
 
-    augment "/config:modules/config:module/config:state" {
+    augment "/config:modules/config:module/config:configuration" {
         case runtime-generated-mapping {
             when "/config:modules/config:module/config:type = 'runtime-generated-mapping'";
+            leaf wait-for-schema {
+                default "false";
+                type boolean;
+            }
         }
     }
 
index ad3d6c6eb4bcd7a716c4136bb95a8f699abd7aad..4767e7bb18bd5646acf3d715372c52365b6b8ecf 100644 (file)
@@ -36,8 +36,8 @@ public class BindingNormalizedCodecTest extends AbstractSchemaAwareTest {
 
     @Override
     protected void setupWithSchema(final SchemaContext context) {
-        DataObjectSerializerGenerator streamWriter = StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()));
-        BindingNormalizedNodeCodecRegistry registry = new BindingNormalizedNodeCodecRegistry(streamWriter);
+        final DataObjectSerializerGenerator streamWriter = StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()));
+        final BindingNormalizedNodeCodecRegistry registry = new BindingNormalizedNodeCodecRegistry(streamWriter);
         codec = new BindingToNormalizedNodeCodec(GeneratedClassLoadingStrategy.getTCCLClassLoadingStrategy(), registry);
         codec.onGlobalContextUpdated(context);
     };
@@ -45,7 +45,7 @@ public class BindingNormalizedCodecTest extends AbstractSchemaAwareTest {
     @Test
     public void testComplexAugmentationSerialization() {
 
-        PathArgument lastArg = codec.toNormalized(BA_TREE_COMPLEX_USES).getLastPathArgument();
+        final PathArgument lastArg = codec.toYangInstanceIdentifier(BA_TREE_COMPLEX_USES).getLastPathArgument();
         assertTrue(lastArg instanceof AugmentationIdentifier);
     }
 
@@ -53,7 +53,7 @@ public class BindingNormalizedCodecTest extends AbstractSchemaAwareTest {
     @Test
     public void testLeafOnlyAugmentationSerialization() {
 
-        PathArgument leafOnlyLastArg = codec.toNormalized(BA_TREE_LEAF_ONLY).getLastPathArgument();
+        final PathArgument leafOnlyLastArg = codec.toYangInstanceIdentifier(BA_TREE_LEAF_ONLY).getLastPathArgument();
         assertTrue(leafOnlyLastArg instanceof AugmentationIdentifier);
         assertTrue(((AugmentationIdentifier) leafOnlyLastArg).getPossibleChildNames().contains(SIMPLE_VALUE_QNAME));
     }