Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-4069: Remove Reader String Cache from Generic Datum Reader #3194

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public enum StringType {
private final ClassLoader classLoader;

/**
* Set the Java type to be used when reading this schema. Meaningful only only
* Set the Java type to be used when reading this schema. Meaningful only for
* string schemas and map schemas (for the keys).
*/
public static void setStringType(Schema s, StringType stringType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversion;
Expand All @@ -46,8 +44,8 @@
private Schema actual;
private Schema expected;
private DatumReader<D> fastDatumReader = null;

private ResolvingDecoder creatorResolver = null;
private final Map<Class, Constructor> stringCtorCache;
private final Thread creator;

public GenericDatumReader() {
Expand All @@ -73,6 +71,7 @@
protected GenericDatumReader(GenericData data) {
this.data = data;
this.creator = Thread.currentThread();
this.stringCtorCache = new HashMap<>();
}

/** Return the {@link GenericData} implementation. */
Expand Down Expand Up @@ -452,13 +451,15 @@
* representation. By default, this calls {@link #readString(Object,Decoder)}.
*/
protected Object readString(Object old, Schema expected, Decoder in) throws IOException {
Class stringClass = this.getReaderCache().getStringClass(expected);
if (stringClass == String.class) {
return in.readString();
}
Class stringClass = this.findStringClass(expected);

// Default is CharSequence / UTF8 so check it first
if (stringClass == CharSequence.class) {
return readString(old, in);
}
if (stringClass == String.class) {
return in.readString();
}
return this.newInstanceFromString(stringClass, in.readString());
}

Expand Down Expand Up @@ -487,99 +488,48 @@
*/
protected Class findStringClass(Schema schema) {
String name = schema.getProp(GenericData.STRING_PROP);
if (name == null)
if (name == null) {
return CharSequence.class;

switch (GenericData.StringType.valueOf(name)) {
case String:
}
if (GenericData.StringType.String.name().equals(name)) {
return String.class;
default:
return CharSequence.class;
}
return CharSequence.class;
}

/**
* This class is used to reproduce part of IdentityHashMap in ConcurrentHashMap
* code.
*/
private static final class IdentitySchemaKey {
private final Schema schema;

private final int hashcode;

public IdentitySchemaKey(Schema schema) {
this.schema = schema;
this.hashcode = System.identityHashCode(schema);
}

@Override
public int hashCode() {
return this.hashcode;
@SuppressWarnings("unchecked")
protected Object newInstanceFromString(Class c, String s) {
// For some of the more common classes, implement specific routines.
// For more complex classes, use reflection.
if (c == Integer.class) {
return Integer.parseInt(s, 10);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}

@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) {
return false;
}
IdentitySchemaKey key = (IdentitySchemaKey) obj;
return this == key || this.schema == key.schema;
if (c == Long.class) {
return Long.parseLong(s, 10);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}
}

// VisibleForTesting
static class ReaderCache {
private final Map<IdentitySchemaKey, Class> stringClassCache = new ConcurrentHashMap<>();

private final Map<Class, Function<String, Object>> stringCtorCache = new ConcurrentHashMap<>();

private final Function<Schema, Class> findStringClass;

public ReaderCache(Function<Schema, Class> findStringClass) {
this.findStringClass = findStringClass;
if (c == Float.class) {
return Float.parseFloat(s);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}

public Object newInstanceFromString(Class c, String s) {
final Function<String, Object> ctor = stringCtorCache.computeIfAbsent(c, this::buildFunction);
return ctor.apply(s);
if (c == Double.class) {
return Double.parseDouble(s);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}

private Function<String, Object> buildFunction(Class c) {
final Constructor cachedCtor = stringCtorCache.computeIfAbsent(c, clazz -> {
final Constructor ctor;
try {
ctor = c.getDeclaredConstructor(String.class);
ctor = clazz.getDeclaredConstructor(String.class);
ctor.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new AvroRuntimeException(e);
}
ctor.setAccessible(true);

return (String s) -> {
try {
return ctor.newInstance(s);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException(e);
}
};
}

public Class getStringClass(final Schema s) {
final IdentitySchemaKey key = new IdentitySchemaKey(s);
return this.stringClassCache.computeIfAbsent(key, (IdentitySchemaKey k) -> this.findStringClass.apply(k.schema));
return ctor;
});
try {
return cachedCtor.newInstance(s);
} catch (ReflectiveOperationException e) {
throw new AvroRuntimeException(e);
}
}

private final ReaderCache readerCache = new ReaderCache(this::findStringClass);

// VisibleForTesting
ReaderCache getReaderCache() {
return readerCache;
}

@SuppressWarnings("unchecked")
protected Object newInstanceFromString(Class c, String s) {
return this.getReaderCache().newInstanceFromString(c, s);
}

/**
* Called to read byte arrays. Subclasses may override to use a different byte
* array representation. By default, this calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
Expand All @@ -33,27 +31,9 @@ public class TestGenericDatumReader {

private static final Random r = new Random(System.currentTimeMillis());

@Test
void readerCache() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you delete the test ?
Looking at it I think it should still work. You just need to remove the ctor parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is testing the thread-safe nature of the code. This code should not be considered thread-safe and therefore should be removed.

final GenericDatumReader.ReaderCache cache = new GenericDatumReader.ReaderCache(this::findStringClass);
List<Thread> threads = IntStream.rangeClosed(1, 200).mapToObj((int index) -> {
final Schema schema = TestGenericDatumReader.this.build(index);
final WithSchema s = new WithSchema(schema, cache);
return (Runnable) () -> s.test();
}).map(Thread::new).collect(Collectors.toList());
threads.forEach(Thread::start);
threads.forEach((Thread t) -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

@Test
void newInstanceFromString() {
final GenericDatumReader.ReaderCache cache = new GenericDatumReader.ReaderCache(this::findStringClass);
final GenericDatumReader cache = new GenericDatumReader();

Object object = cache.newInstanceFromString(StringBuilder.class, "Hello");
assertEquals(StringBuilder.class, object.getClass());
Expand All @@ -62,21 +42,6 @@ void newInstanceFromString() {

}

static class WithSchema {
private final Schema schema;

private final GenericDatumReader.ReaderCache cache;

public WithSchema(Schema schema, GenericDatumReader.ReaderCache cache) {
this.schema = schema;
this.cache = cache;
}

public void test() {
this.cache.getStringClass(schema);
}
}

private List<Schema> list = new ArrayList<>();

private Schema build(int index) {
Expand Down
Loading