Iron Serde
Iron Serde provides high-performance Kafka serialization and deserialization libraries for Apache Flink. These libraries don’t require any additional configuration and can be used as drop-in replacements for the default Flink libraries.
When a new schema (either provided by a Schema Registry or inferred from a class) is observed, Iron Serde generates a specialized class that’s used only for deserializing that specific schema. The class is very compact, has minimal branching, and is quickly JIT compiled.
NOTE: currently, Iron Serde mostly focuses on deserialization. Let us know if you need more serialization support!
Formats Supported
Avro
Currently supported libraries:
dev.irontools.flink.serde.avro.IronAvroDeserializationSchema
- replaces AvroDeserializationSchema
dev.irontools.flink.serde.avro.IronConfluentRegistryAvroDeserializationSchema
dev.irontools.flink.serde.avro.IronRegistryAvroDeserializationSchema
- replaces RegistryAvroDeserializationSchema
Generic Record Deserialization
Schema schema = new Schema.Parser().parse(schemaString);
DeserializationSchema<GenericRecord> deserializer =
IronAvroDeserializationSchema.forGeneric(schema);
Specific Record Deserialization
DeserializationSchema<YourSpecificRecord> deserializer =
IronAvroDeserializationSchema.forSpecific(YourSpecificRecord.class);
Schema Registry Integration
DeserializationSchema<GenericRecord> deserializer =
IronConfluentRegistryAvroDeserializationSchema.forGeneric(
schema,
"http://schema-registry:8081"
);
Kafka Source Usage Example
KafkaSource<RowData> source = KafkaSource.<RowData>builder()
.setBootstrapServers("localhost:9092")
.setTopics("your-topic")
.setGroupId("your-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(
IronConfluentRegistryAvroDeserializationSchema.forGeneric(
schema,
"http://schema-registry:8081"
)
)
.build();
Gotchas
NOTE: currently, Avro serdes require full JVM at runtime. Community Flink Docker images come with JRE by default. Let us know if you need alternative Docker images that were built with JDK.
Performance Benchmark
Setup: ubuntu-22.04, 2 CPUs, 7GB RAM, x64 arch; Java version: 17; JMH version: 1.37.
Measuring three kinds of schemas: trivial (a few simple fields), standard (several fields), and complex (many fields including arrays, nested records, and logical types).
Benchmark Mode Cnt Score Error Units
FlinkAvroGenericRecordBenchmark.measureAvroDeserializationComplex avgt 10 610.802 ± 10.620 ns/op
FlinkAvroGenericRecordBenchmark.measureAvroDeserializationStandard avgt 10 384.341 ± 4.519 ns/op
FlinkAvroGenericRecordBenchmark.measureAvroDeserializationTrivial avgt 10 131.794 ± 4.027 ns/op
FlinkAvroGenericRecordBenchmark.measureIronAvroDeserializationComplex avgt 10 224.669 ± 7.045 ns/op
FlinkAvroGenericRecordBenchmark.measureIronAvroDeserializationStandard avgt 10 134.520 ± 3.171 ns/op
FlinkAvroGenericRecordBenchmark.measureIronAvroDeserializationTrivial avgt 10 44.185 ± 0.609 ns/op
FlinkAvroSpecificRecordBenchmark.measureAvroDeserializationComplex avgt 10 768.978 ± 12.671 ns/op
FlinkAvroSpecificRecordBenchmark.measureAvroDeserializationStandard avgt 10 406.098 ± 3.875 ns/op
FlinkAvroSpecificRecordBenchmark.measureAvroDeserializationTrivial avgt 10 165.906 ± 2.508 ns/op
FlinkAvroSpecificRecordBenchmark.measureIronAvroDeserializationComplex avgt 10 304.938 ± 7.490 ns/op
FlinkAvroSpecificRecordBenchmark.measureIronAvroDeserializationStandard avgt 10 119.207 ± 1.684 ns/op
FlinkAvroSpecificRecordBenchmark.measureIronAvroDeserializationTrivial avgt 10 39.230 ± 0.821 ns/op
FlinkRegistryAvroGenericRecordBenchmark.measureAvroDeserializationComplex avgt 10 612.538 ± 11.979 ns/op
FlinkRegistryAvroGenericRecordBenchmark.measureAvroDeserializationStandard avgt 10 391.808 ± 3.314 ns/op
FlinkRegistryAvroGenericRecordBenchmark.measureAvroDeserializationTrivial avgt 10 154.710 ± 4.052 ns/op
FlinkRegistryAvroGenericRecordBenchmark.measureIronAvroDeserializationComplex avgt 10 300.058 ± 3.173 ns/op
FlinkRegistryAvroGenericRecordBenchmark.measureIronAvroDeserializationStandard avgt 10 208.013 ± 1.567 ns/op
FlinkRegistryAvroGenericRecordBenchmark.measureIronAvroDeserializationTrivial avgt 10 81.824 ± 1.517 ns/op
JSON
Currently supported libraries:
dev.irontools.flink.serde.json.IronJsonDeserializationSchema
- replaces JsonDeserializationSchema
dev.irontools.flink.serde.json.IronJsonRowDataDeserializationSchema
- replaces JsonRowDataDeserializationSchema
dev.irontools.flink.serde.json.debezium.IronDebeziumJsonDeserializationSchema
c- replaces DebeziumJsonDeserializationSchema
dev.irontools.flink.serde.json.IronJsonSerializationSchema
- replaces JsonSerializationSchema
POJO Deserialization
DeserializationSchema<YourClass> deserializer =
new IronJsonDeserializationSchema<>(YourClass.class);
RowData Deserialization
DeserializationSchema<RowData> deserializationSchema =
new IronJsonRowDataDeserializationSchema(
rowType, InternalTypeInfo.of(rowType), false, false, TimestampFormat.ISO_8601);
Debezium Deserialization
DeserializationSchema deserializationSchema =
new IronDebeziumJsonDeserializationSchema(
rowType,
Collections.emptyList(),
InternalTypeInfo.of(rowType),
schemaInclude,
false,
TimestampFormat.ISO_8601);
POJO Serialization
SerializationSchema<YourClass> serializer =
new IronJsonSerializationSchema<>();
Kafka Source Usage Example
KafkaSource<RowData> source = KafkaSource.<RowData>builder()
.setBootstrapServers("localhost:9092")
.setTopics("your-topic")
.setGroupId("your-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(
new IronJsonDeserializationSchema<>(YourClass.class)
)
.build();
Performance Benchmark
Setup: ubuntu-22.04, 2 CPUs, 7GB RAM, x64 arch; Java version: 17; JMH version: 1.37.
Measuring three kinds of schemas: trivial (a few simple fields), standard (several fields), and complex (many fields including arrays, nested records, and logical types).
Benchmark Mode Cnt Score Error Units
FlinkJsonDeserializerPojoBenchmark.measureIronJsonDeserializationStandard avgt 10 175.937 ± 0.619 ns/op
FlinkJsonDeserializerPojoBenchmark.measureJsonDeserializationStandard avgt 10 575.212 ± 2.184 ns/op
FlinkJsonRowDataBenchmark.measureIronJsonRowDataDeserializationComplex avgt 10 657.295 ± 1.909 ns/op
FlinkJsonRowDataBenchmark.measureIronJsonRowDataDeserializationStandard avgt 10 316.403 ± 2.437 ns/op
FlinkJsonRowDataBenchmark.measureIronJsonRowDataDeserializationTrivial avgt 10 113.398 ± 0.441 ns/op
FlinkJsonRowDataBenchmark.measureJsonRowDataDeserializationComplex avgt 10 1329.446 ± 7.110 ns/op
FlinkJsonRowDataBenchmark.measureJsonRowDataDeserializationStandard avgt 10 800.959 ± 2.608 ns/op
FlinkJsonRowDataBenchmark.measureJsonRowDataDeserializationTrivial avgt 10 260.678 ± 1.193 ns/op
FlinkJsonSerializerPojoBenchmark.measureIronJsonSerializationStandard avgt 10 113.348 ± 0.492 ns/op
FlinkJsonSerializerPojoBenchmark.measureJsonSerializationStandard avgt 10 340.445 ± 1.061 ns/op