Polyglot Apache Flink UDF Programming with Iron Functions
Posted on by Yaroslav Tkachenko
Stream-processing technologies have been evolving over the years. The latest iteration borrows many ideas from databases; as a result, streaming SQL has become one of the primary interfaces for data processing.
Apache Flink SQL is an extremely powerful way to define data streaming pipelines. Flink SQL is declarative, supports changelog semantics, and relies on decades of database optimization research.
However, sometimes SQL is just not enough (or too hard!). You may need to work with deeply nested data structures, and writing JSON parsing logic in SQL is still quite challenging. Or you may have certain battle-tested libraries you want to use (I’ve seen this many times). Finally, some engineers just prefer imperative languages they’re already comfortable using.
User-Defined Functions (UDFs) are typically used as a workaround in this case. You author a piece of logic in Java, package it, register with your SQL environment… that’s it! You found a way to bring imperative logic to the declarative world 🙂.
The Iron Functions extension enables you to take it even further.
Iron Functions?
Iron Functions enables WebAssembly-based function execution in Apache Flink, allowing you to author code in many languages that are typically not supported: TypeScript, Rust, and Go.
It leverages WebAssembly and the Extism ecosystem to provide a secure, high-performance environment for executing custom functions.
Iron Functions supports both DataStream API and Table/SQL API. This post focuses on the latter only.
Creating UDFs with Iron Functions is quite straightforward: start with the provided project template, annotate your inputs and outputs, and then use the ironfun
CLI to package your project as a Flink UDF. We’ll follow these steps below!
Authoring Workflow and Supported Languages
Out of the box, Apache Flink supports implementing UDFs in Java and Python. And Iron Functions currently supports TypeScript, Rust, and Go. So, why do we need more languages?
It turns out that the list of programming languages a tool supports is still important 🙂 Apache Flink is a complex distributed system, which can be challenging to learn on its own. Imagine also adding the need to learn a new programming language to the list of initial requirements!
True, several years ago, Flink was primarily used by data platform engineers with deep Java knowledge; however, nowadays, this is no longer the case! Data scientists, machine learning engineers and application engineers all rely on Flink to build real-time data applications.
I’d like to emphasize the last group of users: Flink is starting to be more actively used not just to power internal pipelines, but as a foundation for user-facing data products and services. Here are just a few recent examples:
Application engineers should be able to use the languages they’re comfortable with. In many cases, it’s not Java or Python.
So, how does Iron Functions support many languages for creating UDFs?
To start, you can generate a project from a template. After installing ironfun, simply run:
ironfun generate --name my-project --language typescript
The generated project gives you pretty much everything for running in DataStream API, but there is one more thing required to support UDFs in Table/SQL API: annotating input and output types.
In TypeScript, it simply means using the provided FlinkInput
and FlinkOutput
interfaces:
import {FlinkInput, FlinkIO, FlinkOutput, flinkType} from "iron-functions-sdk";
class Input implements FlinkInput {
@flinkType("ARRAY(DOUBLE)")
coords: number[];
name: string;
}
class Output implements FlinkOutput {
result: string;
constructor(result: string) {
this.result = result;
}
}
export function process() {
const input: Input = FlinkIO.read();
const result = // ...
FlinkIO.write(new Output(result));
}
The input field types are mapped to the UDF argument types. The output field type (expecting a single field here) is mapped to the UDF return type. And, as you can see, you can manually set more complex types with the @flinkType
decorator.
In Rust, derive and attribute macros are used with the same purpose:
use iron_functions_sdk::*;
#[flink_input]
#[derive(FlinkTypes)]
struct InputData {
order_number: i64,
buyer: String,
#[flink_type("DATE")]
purchase_date: String,
}
Golang has similar support, but using structured comments (see below).
After your process
function is implemented, you can package the project in a UDF with:
ironfun package-udf --source-path . --package-name com.demo.geo --class-name GeoDistance
The generated JAR file expects the Flink runtime to contain the Iron Functions dependency and Irontools license information. Which is reasonable to expect from Flink deployments that you manage yourself. However, if you use a fully managed platform like Confluent Flink, you can make the JAR truly portable by including --uber-jar
and --include-license
flags. In fact, we’ll do it below!
One neat thing about this workflow is that we didn’t need to understand any WebAssembly concepts. You could say it’s just an implementation detail. Also, modern Rust and Golang versions come with WebAssembly support, so additional tooling is only required for TypeScript functions!
Security and Isolation
Since Iron Functions rely on WebAssembly, you get another big benefit compared to standard Java or Python Flink UDFs: isolation.
By default, WebAssembly executables can’t access any files. Or make network calls. Or access any host memory. Or make system calls. You get the idea. If you think it’s too restrictive, don’t worry, there are ways to expose the functionality you need via a host function (for example, Iron Functions UDFs can make network calls since the Extism runtime provides an HTTP client).
Each executable is a fully sandboxed environment that can run untrusted code. This makes it possible to expose certain parts of your Flink pipelines to internal or external users! Imagine giving your users the flexibility to shape the data exactly as they want, without worrying about security implications.
Portability
I already briefly mentioned it above, but I’d like to reiterate: UDFs that you create with Iron Functions can be fully portable! If you generate a UDF with --uber-jar
and --include-license
flags, you get a self-sufficient artifact. It means that the same UDF JAR can be used:
- In your local development environment, e.g. IDE.
- In an on-prem or BYOC Flink deployment, e.g. with Flink Kubernetes Operator.
- In any Flink-managed environment that supports UDFs: Confluent, Ververica, DeltaStream, etc.
Examples
Finally, I’d like to showcase a few examples of Iron Functions UDFs deployed in various scenarios.
Distance Calculation in TypeScript
Full example can be found here.
This UDF leverages an external library, geolib (which is installed like any other npm dependency), to calculate the distance between coordinates. The coordinates and an optional prefix are provided as UDF inputs.
The complete main.ts
file looks like this:
import {FlinkInput, FlinkIO, FlinkOutput, flinkType} from "iron-functions-sdk";
import { getDistance } from 'geolib';
class Input implements FlinkInput {
@flinkType("ARRAY(DOUBLE)")
coords_x: number[];
@flinkType("ARRAY(DOUBLE)")
coords_y: number[];
prefix: string;
}
class Output implements FlinkOutput {
result: string;
constructor(result: string) {
this.result = result;
}
}
export function process() {
const input: Input = FlinkIO.read();
const start_coords = { latitude: input.coords_x[0], longitude: input.coords_x[1] };
const end_coords = { latitude: input.coords_y[0], longitude: input.coords_y[1] };
const distance = getDistance(start_coords, end_coords);
const result = input.prefix + distance / 1000 + " km";
FlinkIO.write(new Output(result));
}
After compiling the UDF JAR with ironfun
we can upload it to the Confluent Flink using a simple CLI command:
confluent flink artifact create geo_distance --cloud aws --region us-east-1 \
--environment env-????? --artifact-file GeoDistance.jar
Then navigate to the UI and create a function:

And now let’s run it on some real data:

Ethereum Event Log Decoding in Rust
Full example can be found here.
This UDF relies on ethabi-decode Rust crate to perform event log decoding for a given Ethereum ABI. This is a somewhat common operation when dealing with blockchain data. Rust is widely used in the blockchain ecosystem, so it’s a great choice of language for a UDF like this.
The key parts of the lib.rs
file look like this:
use serde_json::Value;
use extism_pdk::*;
use iron_functions_sdk::*;
use serde::{Serialize, Deserialize};
use ethabi_decode::{Event, Param, ParamKind, Token, H256};
#[flink_input]
#[derive(Deserialize)]
struct InputData {
abi: String,
topics: String,
data: String,
}
#[flink_output]
#[derive(Serialize)]
struct OutputData {
decoded: String,
}
#[derive(Debug, Serialize)]
struct EventParams(Vec<String>);
#[plugin_fn]
pub fn process(input: String) -> FnResult<String> {
let input: InputData = serde_json::from_str(&input)?;
let output = process_internal(input);
Ok(serde_json::to_string(&output)?)
}
fn process_internal(input: InputData) -> OutputData {
match decode_log(input.abi, input.topics, input.data) {
Ok(params) => {
let decoded = params.0.join(",");
OutputData { decoded }
}
Err(e) => {
error!("Could not decode log: {}", e);
OutputData {
decoded: format!("error: {}", e),
}
}
}
}
fn decode_log(
abi_json_str: String,
topics_str: String,
data_hex_str: String
) -> Result<EventParams, String> {
// ...
}
Let’s package the UDF (using the short flags, pro move 😎):
ironfun package-udf -s . -p com.demo.abi -c AbiDecoder -l -u
This time we’re going to use DeltaStream. Simply drop your JAR here:

And then create a new function:

Finally, you can use it:

Parsing OpenTelemetry Logs in Golang
Full example can be found here.
This UDF demonstrates how a little bit of Golang can be used to parse a popular OpenTelemetry log file format.
The key parts of the main.go
file look like this:
package main
import (
"encoding/json"
"github.com/extism/go-pdk"
)
// +irontools:flink_input
type InputRawLog struct {
RawLog string `json:"rawLog"`
}
// OtelLogs is the root struct for the OTEL logs payload
type OtelLogs struct {
ResourceLogs []ResourceLogs `json:"resourceLogs"`
}
// More type declarations ...
// +irontools:flink_output
type Output struct {
Result string `json:"result"`
}
//go:wasmexport process
func process() int32 {
var rawInput InputRawLog
err := pdk.InputJSON(&rawInput)
if err != nil {
pdk.SetError(err)
return 1
}
var logs OtelLogs
err = json.Unmarshal([]byte(rawInput.RawLog), &logs)
if err != nil {
pdk.SetError(err)
return 1
}
var transformedLog Output
for _, resourceLog := range logs.ResourceLogs {
for _, scopeLog := range resourceLog.ScopeLogs {
for _, logRecord := range scopeLog.LogRecords {
// Extract severity and body, combine them
transformedLog.Result = "[" + logRecord.SeverityText + "]: " + logRecord.Body.StringValue
}
}
}
err = pdk.OutputJSON(transformedLog)
if err != nil {
pdk.SetError(err)
return 1
}
return 0
}
func main() {}
We’re going to deploy this function in a simple Table API program right from the IDE. Here’s what the whole Flink Table API application may look like:
package dev.irontools.demo;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class IronFunctionsOtelParsingDemo {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
settings.getConfiguration().setString("parallelism.default", "4");
TableEnvironment tEnv = TableEnvironment.create(settings);
Path currentPath =
Paths.get(System.getProperty("user.dir") + "/iron-functions-otel-parsing-udf");
tEnv.executeSql(
"CREATE FUNCTION parse_otel_log AS 'com.demo.otel.LogParser' LANGUAGE JAVA USING JAR"
+ " '"
+ currentPath
+ "/LogParser.jar'");
// Redacted for brevity
tEnv.sqlQuery(
"SELECT parse_otel_log('{\"resourceLogs\":...') as log")
.execute()
.print();
}
}
Summary
Iron Functions enables the authoring of transformation logic in multiple languages not supported by Flink. I hope this post has convinced you that creating more UDFs can be easy, but also powerful! I also hope that the isolation and portability you gain will enable you to utilize Flink in new use cases.
If you’re interested in exploring Iron Functions further, feel free to contact us. If you’d like to get started right away, you can generate your free trial license here and explore the documentation.
Subscribe to receive updates