Iron Functions
Iron Functions enables WebAssembly-based function execution in Apache Flink, allowing you to author code in languages that are typically not supported: TypeScript, Rust, Go.
It leverages WebAssembly and the Extism ecosystem to provide a secure, high-performance environment for executing custom functions. This approach offers several advantages:
- Security: WebAssembly provides a sandboxed execution environment, so you can run even untrusted code safely.
- Performance: Fast execution speed with runtime compilation.
- Language Flexibility: Write functions in several supported languages.
- Integration: Seamless integration with both DataStream and Table/SQL APIs.
Because of the secure sandboxed environment, WebAssembly functions have some limitations: they can’t read files, they don’t support multithreading, and they can’t make arbitrarily network calls. Network calls are possible using the provided HTTP client in each language. Consult the Languages section for more details.
Ironfun CLI
The Ironfun CLI helps you to get started with Iron Functions quickly: it provides commands to generate new projects, and package them as UDFs (User Defined Functions), for use with Table/SQL API.
Installation
curl -s https://irontools.dev/ironfun-cli-install.sh | sh
Commands
Generate
Creates a new Iron Functions project.
ironfun generate [OPTIONS] --name <NAME> --language <LANGUAGE>
Options:
-n
,--name
<NAME>
Project name.-l
,--language
<LANGUAGE>
Project language (possible values: typescript, rust, go).-p
,--path
<PATH>
Project path. If not provided, the current directory will be used.
Package UDF
Packages an Iron Functions project as a Flink UDF.
ironfun package-udf [OPTIONS] --source-path <SOURCE_PATH> --package-name <PACKAGE_NAME> --class-name <CLASS_NAME>
Options:
-s
,--source-path
<SOURCE_PATH>
Project path containing Irontools function.-p
,--package-name
<PACKAGE_NAME>
Java package name for the UDF.-c
,--class-name
<CLASS_NAME>
Java class name for the UDF.-o
,--output-path
<OUTPUT_PATH>
Output path for the JAR file. If not provided, the current directory will be used.-l
,--include-license
[<INCLUDE_LICENSE>]
An instruction to include a license file in the generated JAR. The license string can be provided as an argument. If not provided, the CLI attempts to find the license using IRONTOOLS_LICENSE and IRONTOOLS_LICENSE_PATH environment variables.-u
,--uber-jar
Whether to build the Uber JAR. The Uber JAR contains Iron Functions and all its dependencies, so it can be deployed to any Flink runtime. When enabled, consider also including the license via –include-license.
NOTE: at the moment, package-udf
command is only supported for TypeScript projects.
Examples
Generate a TypeScript Project
ironfun generate --name my-function --language typescript --path src/main/ts
Package as UDF
ironfun package-udf -s . -p com.demo.example -c UdfName -l -u
Languages
Iron Functions currently supports several languages:
- TypeScript
- Rust
- Go
It’s highly recommended to use the Ironfun CLI to generate your projects, as it sets up the necessary structure and dependencies for you.
process
function is the entry point for your Iron Function project. It interacts with the Flink runtime using JSON
format: an input record is passed as a JSON string (or object), and the output is also a JSON string (or object).
TypeScript Support
TypeScript support is provided using Extism’s JS PDK. Check its documentation to learn more about advanced capabilities like making network calls.
Basic example:
import { FlinkIO } from "iron-functions-sdk";
export function process() {
const input: any = FlinkIO.read();
const output = { ...input, name: input.name + "-updated" };
FlinkIO.write(output);
}
UDF Requirements
If you want to package your TypeScript project as a UDF, you must define the input and output classes implementing
FlinkInput
and FlinkOutput
interfaces. Use these classes when reading data with FlinkIO.read
and writing data with
FlinkIO.write
. The @flinkType
decorator could be used to specify the Flink type of each field.
Advanced example showcasing UDF support and external dependencies:
import {FlinkInput, FlinkIO, FlinkOutput, flinkType} from "iron-functions-sdk";
import { getDistance } from 'geolib';
class Input implements FlinkInput {
@flinkType("ARRAY(DOUBLE)")
coords_x: number[];
@flinkType("ARRAY(DOUBLE)")
coords_y: number[];
prefix: string;
}
class Output implements FlinkOutput {
result: string;
constructor(result: string) {
this.result = result;
}
}
export function process() {
const input: Input = FlinkIO.read();
const start_coords = { latitude: input.coords_x[0], longitude: input.coords_x[1] };
const end_coords = { latitude: input.coords_y[0], longitude: input.coords_y[1] };
const distance = getDistance(start_coords, end_coords);
const result = input.prefix + distance / 1000 + " km";
FlinkIO.write(new Output(result));
}
Rust Support
Rust support is provided using Extism’s Rust PDK. Check its documentation to learn more about advanced capabilities like making network calls.
use serde_json::{Map, Value};
use extism_pdk::*;
#[plugin_fn]
pub fn process(json: String) -> FnResult<String> {
Ok(process_internal(json))
}
fn process_internal(json: String) -> String {
let row: Value = serde_json::from_str(json.as_str()).unwrap();
// Omitted, assume some transformation logic
result.to_string()
}
Go Support
Golang support is provided using Extism’s Go PDK. Check its documentation to learn more about advanced capabilities like making network calls.
package main
import (
"encoding/json"
"github.com/extism/go-pdk"
)
//go:wasmexport process
func process() int32 {
inputRaw := pdk.Input()
// Omitted, assume some transformation logic
pdk.Output(result)
return 0
}
DataStream API
Iron Functions provides seamless integration with Flink’s DataStream API. Currently, it provides the following process functions:
dev.irontools.flink.functions.pojo.IronWasmPojoFunction
: for processing POJOs.dev.irontools.flink.functions.row.IronWasmRowFunction
: for processing Rows.
You can find a complete DataStream API + TypeScript example here.
Basic POJO Function
Once you’ve compiled your Iron Function project to WebAssembly, you can place the .wasm
file in the resources
folder,
e.g. src/main/resources/wasm/demo.wasm
. Then you can use the following code to process a DataStream of POJOs:
DataStream<MyClass> transformedStream = inputStream.process(
IronWasmPojoFunction.builder()
.withInputTypeInfo(inputStream.getType())
.withWasmResourceFile("/wasm/demo.wasm")
.build()
);
This example assumes that the input and output types are the same.
Row Function with Custom Output
You can override the output type via withOutputTypeInfo
or withOutputDataType
methods. This is needed when the
output type is different from the input type.
DataType outputDataType = DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.STRING())
);
DataStream<Row> transformedStream = inputStream.process(
IronWasmRowFunction.builder()
.withInputDataType(inputStream.getType())
.withOutputDataType(outputDataType)
.withWasmResourceFile("/wasm/demo.wasm")
.build()
);
Unnesting Output
You can unnest the output of the function if it returns an array. This is useful when you want to flatten the output and avoid nested structures.
DataType outputDataType = DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.STRING())
);
DataStream<Row> transformedStream = inputStream.process(
IronWasmRowFunction.builder()
.withInputDataType(inputStream.getType())
.withOutputDataType(outputDataType)
.withWasmResourceFile("/wasm/demo.wasm")
.unnestOutput() // Enable output unnesting for array results
.build()
);
Table/SQL API
Iron Functions provides integration with Flink’s Table/SQL API via User Defined Functions (UDFs). Currently, only
ScalarFunction
is supported.
UDF packaging is done via the Ironfun CLI, which generates a JAR file containing the UDF and, optionally, its dependencies. Consult the Ironfun CLI section for more details.
NOTE: UDF packaging typically requires extra definitions, e.g. FlinkInput
and FlinkOutput
classes for TypeScript.
These classes are used to set User Defined Function parameters (input) and return types (output).
Consult the Languages section for more details.
You can find a complete Table/SQL API + TypeScript example here.
Registering UDFs
Once the UDF JAR is created, you can register it in your Flink SQL environment. You can do this via the SQL CLI or programmatically.
Flink SQL UDF registration example:
CREATE FUNCTION custom_function
AS 'com.example.CustomFunction'
LANGUAGE JAVA
USING JAR '/path/to/your.jar';
Then it can be used like a regular SQL function:
SELECT custom_function(column1, column2) as result
FROM your_table;