Back to Portfolio

Making Large Flink Range Joins Cheap with B+Tree State

By Stefan Vercillo | November 2025

You know that feeling when a "simple" Flink join quietly turns into the most expensive line item on your infra bill?

You start with something innocent: two streams, a time window, maybe a few range conditions. You glue together some MapState or ListState, lean on RocksDB, and tell yourself you'll "optimise it later". Then the data grows, the join fan-out explodes, and suddenly you're staring at metrics wondering why this one operator is hoarding memory, hammering RocksDB, and costing actual millions a year to keep alive.

If that's you, you're not alone. Most Flink jobs treat state like a junk drawer: you stuff things in, hope you can find them later, and accept that every lookup is going to be a bit of a rummage.

Here's the twist: state doesn't have to be a junk drawer. It can be an index.

In a large production streaming system I worked on, we hit this wall hard. We had joins that worked functionally, but were completely unacceptable from a cost and latency perspective. The turning point was when we stopped thinking of Flink's managed state as "somewhere to put stuff for later" and started treating it as a B+Tree-backed index. That shift took one of our nastiest joins from eye-watering, seven-figure annual infra spend down to something in the low thousands – without dumbing down the logic.

In this article, you'll see how that idea plays out.

You'll walk through a B+Tree-based state primitive for Flink – B+TreeState – that gives you:

  • Efficient key lookups, range scans, and "closest to" queries.
  • Time-bounded state with event-time–based eviction that doesn't blow up.
  • Support for secondary indexes when you need more than one way to slice your data.
  • The foundation for sorted interval joins and K-nearest-neighbour (KNN) joins that can handle huge result sets without loading the world into memory at once.

This is not a fluffy "here's what a B+Tree is" explainer. You'll see internal vs leaf node design, split factors, overflow pages, page-size–based splitting on top of RocksDB, and how this all wires into real join transforms.

By the end, you won't just know that "B+Trees are efficient". You'll have a concrete, technically deep pattern for treating Flink state as a proper index – and a new way to think about scaling range joins and KNN-style queries without setting your infra budget on fire.

Why Flink's default state struggles with range joins and KNN

Before you get excited about B+Trees, it's worth asking: why isn't Flink's existing state enough?

On paper, you already have a decent toolbox:

  • ValueState<T> – single value per key.
  • ListState<T> – a bag of values per key.
  • MapState<K, V> – a key–value map per key.

Underneath, a RocksDB state backend can persist all of this for you. For many workloads, that's perfectly fine.

But as soon as you move into range joins or K-nearest-neighbour (KNN) joins, these tools start to fight you.

How people usually "hack" range joins with built-in state

Let's say you want to join a query stream against a history of events:

  • "Give me all events for this user in the last 10 minutes, sorted by time."
  • "For this detection, give me the nearest N training examples by distance metric."

With the default state types, you usually end up doing one of a few things:

ListState per key

  • You store all relevant events for a key (user, id, etc.) in a ListState.
  • On a query, you iterate the list and filter by your range condition.
  • If you need sorted output, you sort in memory on every query.

MapState keyed by some timestamp or metric

  • You store events in a MapState<Metric, Event>.
  • On a query, you iterate entries() or values(), filter by range, then sort.
  • For KNN, you might compute all distances, then pick the nearest K.

Multiple operators / joins

  • You chain together built-in join operators and CEP patterns.
  • You push more of the range logic into user functions and state, hoping Flink's runtime will "deal with it".

All of these work at small scale. Then your data grows and the cracks appear.

The core problem: your state is not sorted

ListState and MapState don't give you ordering guarantees. They're essentially containers. If you want sorted behaviour, you have to impose it:

  • Want "all events between t₁ and t₂, in order"? You have to scan everything, pick the ones in the interval, then sort.
  • Want "the K nearest neighbours"? You either compute distances for everything and take the top K, or you maintain some ad-hoc heap structure in state.

That's fine if each key has tens of entries. It's painful when keys have thousands or millions of entries.

You pay for that every time you:

  • Do a full scan of a large ListState or MapState.
  • Allocate big in-memory collections just to sort.
  • Repeat the same expensive work on every query event.

The hidden tax: RocksDB calls and memory blow-ups

Now layer in RocksDB.

Flink's state backend exposes nice abstractions, but on the hot path you're still:

  • Fetching blobs of data from RocksDB.
  • Deserialising them into JVM objects.
  • Doing CPU-heavy filters and sorts.
  • Writing them back when state changes.

When your "state model" is basically "dump everything into a list or map", three things happen:

Reads get fat

  • Each query pulls a lot of data out of RocksDB just to throw most of it away after filtering.
  • You can't naturally start at "the first candidate in the range" – you always start at "everything".

CPU gets noisy

  • Sorting large collections on every query quickly dominates CPU.
  • Distance calculations for KNN get repeated over the same neighbours.

Memory gets spiky

  • To produce sorted output, you usually need to materialise a big intermediate result in memory.
  • One bad key with huge cardinality can cause per-key explosions.

You feel this as latency spikes, GC churn, and infra cost.

Many-to-many joins make it worse

Built-in join operators in Flink are designed around matches, not sorted ranges.

They're great when:

  • You join on equality keys.
  • The number of matches per key is small or bounded.

They're much less comfortable when:

  • A single input record can match thousands of records on the other side.
  • You need results in order (by time or by distance).
  • You care about range semantics – "everything between these bounds", not just "everything equal to this key".

You can bolt on sorting and filtering after the join, but that means:

  • More shuffles.
  • More intermediate state.
  • More operators to maintain.

At that point, you're essentially reinventing an index – just doing it the hard way.

Why KNN and "closest to" queries are especially painful

KNN-style behaviour is particularly awkward with the default state types:

Suppose you want the K nearest points to a query value on some numeric axis (time, distance, score, etc.).

  • With a plain MapState, you don't know where the "closest" values sit without scanning everything.
  • You can't efficiently walk "outwards" from a query point:
    • Get the closest, then the next closest, then the next, stopping when you've found K.
  • Instead, you compute distance for every candidate, stash them somewhere, sort, then limit.

This is exactly what a B+Tree is designed to avoid.

The short version

Flink's built-in state types are great containers, but they're not indexes:

  • They don't know about ordering.
  • They don't give you native range scans.
  • They don't have a notion of "nearest key".

So when you ask them to power large range joins or KNN joins:

  • You end up over-reading from RocksDB.
  • You do too much CPU work per query.
  • You materialise too much in memory.

And you pay for it in latency and cost.

That's the gap B+TreeState is trying to fill.

What is B+TreeState (and how is it implemented)?

At a high level, B+TreeState is just this:

A Flink managed state that stores your data in a B+Tree instead of a list or map, so you get fast point lookups, range scans, and "closest to" queries.

You still use Flink's usual state machinery (RuntimeContext, descriptors, RocksDB backend), but under the hood your state is a B+Tree.

The core state interface

In code, you work with something like this:

public interface BPlusTreeState<K extends Comparable<K>, V> {

    BPlusTreeState<K, V> create(
            RuntimeContext context,
            BPlusTreeStateDescriptor<K, V> stateDescriptor);

    void insertEntry(Entry<K, V> entry);

    // remove all instances of this exact entry
    void removeEntry(Entry<K, V> entry);

    // inclusive range query
    Iterator<V> getValuesInRange(K lower, K upper);

    // flexible bounds
    Iterator<V> getValuesInRange(
            K lower, boolean lowerInclusive,
            K upper, boolean upperInclusive);

    // remove all entries for a given key
    boolean removeEntriesForKey(K key);

    // bulk delete in a range
    boolean removeEntriesInRange(
            K lower, boolean lowerInclusive,
            K upper, boolean upperInclusive);

    // cheap point lookup
    Optional<V> getFirstInstanceOfKey(K key);
}

Compared to MapState, the key difference is: range is first-class. You don't "get everything and filter"; you ask for a range and walk it.

Inside the B+Tree: nodes and splits

Under the hood, the tree is a classic B+Tree split into internal nodes and leaf nodes, both persisted in Flink state.

B+Tree Structure Visualization

graph TD subgraph "Internal Nodes" Root["Root
Keys: [50, 100]"] Internal1["Internal
Keys: [20, 35]"] Internal2["Internal
Keys: [70, 85]"] Internal3["Internal
Keys: [120, 150]"] end subgraph "Leaf Nodes (Linked)" Leaf1["Leaf
[10, 15, 18]"] Leaf2["Leaf
[20, 25, 30]"] Leaf3["Leaf
[35, 40, 45]"] Leaf4["Leaf
[50, 55, 60]"] Leaf5["Leaf
[70, 75, 80]"] Leaf6["Leaf
[85, 90, 95]"] Leaf7["Leaf
[100, 105, 110]"] Leaf8["Leaf
[120, 130, 140]"] Leaf9["Leaf
[150, 160, 170]"] end Root --> Internal1 Root --> Internal2 Root --> Internal3 Internal1 --> Leaf1 Internal1 --> Leaf2 Internal1 --> Leaf3 Internal2 --> Leaf4 Internal2 --> Leaf5 Internal2 --> Leaf6 Internal3 --> Leaf7 Internal3 --> Leaf8 Internal3 --> Leaf9 Leaf1 -.->|sibling| Leaf2 Leaf2 -.->|sibling| Leaf3 Leaf3 -.->|sibling| Leaf4 Leaf4 -.->|sibling| Leaf5 Leaf5 -.->|sibling| Leaf6 Leaf6 -.->|sibling| Leaf7 Leaf7 -.->|sibling| Leaf8 Leaf8 -.->|sibling| Leaf9 style Root fill:#e1f5ff style Internal1 fill:#e1f5ff style Internal2 fill:#e1f5ff style Internal3 fill:#e1f5ff style Leaf1 fill:#c8e6c9 style Leaf2 fill:#c8e6c9 style Leaf3 fill:#c8e6c9 style Leaf4 fill:#c8e6c9 style Leaf5 fill:#c8e6c9 style Leaf6 fill:#c8e6c9 style Leaf7 fill:#c8e6c9 style Leaf8 fill:#c8e6c9 style Leaf9 fill:#c8e6c9

B+Tree showing internal nodes (blue) for routing and leaf nodes (green) storing actual data, connected via sibling pointers

Node abstraction

public interface BPlusTreeNode<K extends Comparable<K>, V> {

    boolean isFull();              // when to split
    SplitResult<K> split();        // how to split
    void persistToState();         // write this node to Flink state

    // ... navigation helpers, lookup hooks, etc.
}

Every node knows:

  • when it's full,
  • how to split into two nodes,
  • and how to write itself back to RocksDB-backed state.

Internal nodes

Internal nodes guide the search; they store keys and pointers to children, but no actual records:

public class InternalNode<K extends Comparable<K>, V>
        implements BPlusTreeNode<K, V> {

    private NodeType type;                    // INTERNAL
    private Capacity maxCapacity;

    private List<K> entryKeys;                // split keys
    private List<BPlusTreeNode<K, V>> childrenNodes;

    private Optional<InternalNode<K, V>> parent; // empty if root

    // ... searchChildForKey, insertChild, split, etc.
}

When you do a lookup or insert:

  1. You start at the root internal node.
  2. You binary-search entryKeys.
  3. You follow the right child pointer down.
  4. Repeat until you hit a leaf.

Internal nodes usually have a higher split factor (more keys per node) so the tree stays shallow.

Leaf nodes

Leaf nodes hold the actual key–value entries and are linked horizontally for fast scans:

public class LeafNode<K extends Comparable<K>, V>
        implements BPlusTreeNode<K, V> {

    private final InternalNode<K, V> parent;
    private List<Entry<K, V>> lookupTable;

    private Capacity maxCapacity;

    private Optional<LeafNode<K, V>> leftSibling;
    private Optional<LeafNode<K, V>> rightSibling;

    private List<LeafNode<K, V>> overflowPages;

    // ... insertEntry, findRangeStart, split, etc.
}

Key points:

  • lookupTable stores sorted Entry<K,V> values.
  • leftSibling / rightSibling are what make range scans cheap: once you find the starting key, you just walk across leaves.
  • overflowPages handle "hot" keys with a huge number of values without blowing up a single node.

When a leaf is full, it splits:

  • Entries are divided into two roughly equal halves (or based on page size – more on that later).
  • The parent internal node gets a new split key and child pointer.
  • Sibling links (leftSibling / rightSibling) are rewired so sequential scans stay efficient.

From here, you can start to see how range joins and KNN join behaviour drop out naturally:

  1. Find the leaf that contains (or would contain) your query key.
  2. Use the leaf and its siblings to stream values in order.
  3. Stop when you reach your upper bound or your K limit.

Range Scan Operation

graph TD Root["Root Node
Range Query: [35, 85]"] Internal1["Internal Node"] Internal2["Internal Node"] Leaf1["Leaf 1
[10, 20, 30]"] Leaf2["Leaf 2
[35, 40, 45]
✓ START"] Leaf3["Leaf 3
[50, 60, 70]
✓ SCAN"] Leaf4["Leaf 4
[75, 80, 85]
✓ END"] Leaf5["Leaf 5
[90, 100, 110]"] Root -->|Navigate| Internal1 Root -->|Navigate| Internal2 Internal1 --> Leaf1 Internal1 --> Leaf2 Internal2 --> Leaf3 Internal2 --> Leaf4 Internal2 --> Leaf5 Leaf1 -.->|Skip| Leaf2 Leaf2 -.->|Scan| Leaf3 Leaf3 -.->|Scan| Leaf4 Leaf4 -.->|Stop| Leaf5 style Root fill:#fff9c4 style Leaf2 fill:#81c784 style Leaf3 fill:#81c784 style Leaf4 fill:#81c784 style Leaf1 fill:#e0e0e0 style Leaf5 fill:#e0e0e0

Range scan for values [35, 85]: navigate to start leaf, then walk sibling pointers until reaching upper bound

Splitting nodes by page size, not record count

So far, you've treated maxCapacity as "how many entries can this node hold?". That's the classic textbook view.

In a real Flink + RocksDB setup, that's not actually what you care about.

What you really care about is:

How many bytes can I pack into a node before it becomes a bad fit for RocksDB and starts to hurt latency?

Your records are not fixed-size:

  • Some keys are small, some are big.
  • Some values are tiny, some are massive.

If you say "each leaf can hold 1,000 entries", you can easily end up with:

  • 1,000 small entries → node is mostly empty from a page perspective.
  • 1,000 huge entries → node blows past a sensible page size and becomes expensive to read/write.

Why page-size-based splitting works better

Instead of "split when I have N entries", you say:

"Split when the serialised size of this node crosses a configured page size."

This plays nicely with RocksDB for a few reasons:

RocksDB loves sequential reads

When nodes are sized to fit into these "pages", scanning across adjacent leaves turns into efficient sequential I/O, not a random-access mess.

Variable-length records behave sanity-wise

  • Big records don't accidentally cause your "N entries" heuristic to create bloated nodes.
  • Tiny records don't leave you wasting half a page because you hit an arbitrary entry count.

Tree height stays under control

  • By packing nodes by bytes, not just count, you keep the B+Tree compact.
  • Fewer nodes ⇒ fewer RocksDB fetches ⇒ lower latency per query.

What this looks like in practice

You make Capacity more like "budget in bytes" than "budget in entries":

In your LeafNode.insertEntry(...):

  1. Insert the entry in sorted order into lookupTable.
  2. Update currentSizeBytes.
  3. If isFull() flips to true, call split().

The split() then:

  • Walks lookupTable and finds a split point such that:
    • Left node ≈ half page,
    • Right node ≈ half page,
  • Rebuilds two new leaf nodes with updated Capacity tracking.
  • Updates the parent InternalNode and sibling links.

For internal nodes, you can do something similar but allow them to be denser (more keys per page), because they only store keys + pointers, not full values.

Two different split factors still make sense

You still end up with effectively different split factors for:

  • Internal nodes → more keys per page (cheap to store).
  • Leaf nodes → fewer entries per page (values are heavier).

The difference now is that split factors fall out of bytes and layout, not arbitrary integers you picked up from a data structures textbook.

The net effect

By splitting on page size:

  • Your B+Tree is better packed.
  • Range scans read fewer pages and do fewer RocksDB calls.
  • Latency is more predictable even when record sizes vary a lot.

And you get all of that without changing your public API. From the outside, it's still just:

Iterator<V> it = tree.getValuesInRange(lower, upper);

The node layout is an internal optimisation – but it's a big part of why this pattern holds up under real, messy production workloads.

Page-Size Based Node Splitting

graph TD subgraph "Before Split" Before["Leaf Node
Size: 4KB / 4KB (FULL)
Entries: [e1, e2, e3, ..., e20]"] end subgraph "After Split" Left["Left Leaf
Size: 2KB
Entries: [e1...e10]"] Right["Right Leaf
Size: 2KB
Entries: [e11...e20]"] Parent["Parent Internal
New split key: e11"] end Before -->|Split on
page size| Left Before -->|Split on
page size| Right Parent --> Left Parent --> Right Left -.->|sibling| Right style Before fill:#ffcdd2 style Left fill:#c8e6c9 style Right fill:#c8e6c9 style Parent fill:#e1f5ff

When a leaf reaches its page size limit (4KB), it splits into two ~2KB nodes, maintaining sibling links

Secondary indexes: dual B+Trees for richer queries

So far everything has focused on how B+TreeState stores data and keeps RocksDB happy. To make this usable for real joins, you also need to decide what you index on.

In practice, you usually end up with two very different needs:

  • Lifecycle: you want to evict old data efficiently based on event time.
  • Query semantics: you want to look things up by some other dimension – a distance metric, a score, a numeric key for KNN, etc.

If you try to satisfy both with a single index key, you lose somewhere. The pattern that worked best for me was:

Keep event time as the primary index (for eviction), and add a second B+TreeState as a secondary index for queries.

Primary vs secondary tree

Think in terms of two B+Trees living side by side:

  • Primary tree (event-time index)
    • Key: event time (optionally with a tie-breaker like ID).
    • Value: the full record, or a compact representation of it.
    • Responsibility: lifecycle and eviction, driven by the watermark.
  • Secondary tree (query index)
    • Key: the query dimension (distance, score, some numeric expression, etc.).
    • Value: a pointer to the same record – usually its primary key.
    • Responsibility: efficient range and KNN-style lookups.

On the write path, you do a dual write:

// build keys
PrimaryKey primaryKey   = new PrimaryKey(eventTime, id);
SecondaryKey secondaryKey = new SecondaryKey(metricValue, id);

// write to primary (event-time) index
primaryTree.insertEntry(Entry.of(primaryKey, record));

// mirror into secondary (metric) index
secondaryTree.insertEntry(Entry.of(secondaryKey, primaryKey));

The secondary index never owns the record; it just knows how to find it via the primary key.

Querying through the secondary index

Reads for "normal" range joins can go through the primary index (event-time windows). Reads for KNN or metric-based joins go through the secondary tree instead:

// e.g. KNN-style range on the metric axis
Iterator<PrimaryKey> keys =
        secondaryTree.getValuesInRange(lowerMetric, upperMetric);

// fetch full records from primary tree or a cache
while (keys.hasNext()) {
    PrimaryKey pk = keys.next();
    Record record = primaryTree.getFirstInstanceOfKey(pk).orElseThrow();
    // ... emit joined row
}

Because the secondary tree is sorted by the metric, you can also implement a “walk outwards from the query point” KNN, stopping after K neighbours or when you hit some metric bound. You never scan the whole keyspace just to find the closest values.

Eviction stays simple: primary tree drives deletion

The nice part about this design is that eviction logic stays anchored to event time. The watermark advances, you compute a cutoff, and you drop everything older than that from the primary tree with a range delete.

As you evict from the primary index, you use the record to reconstruct the secondary key and delete that from the secondary tree as well:

Iterator<Entry<PrimaryKey, Record>> expired =
        primaryTree.getValuesInRange(MIN_EVENT_TIME, evictionCutoff);

while (expired.hasNext()) {
    Entry<PrimaryKey, Record> entry = expired.next();

    // remove from primary index
    primaryTree.removeEntry(entry);

    // derive secondary key and remove mirror entry
    SecondaryKey sk = buildSecondaryKey(entry.getValue());
    secondaryTree.removeEntry(Entry.of(sk, entry.getKey()));
}

You never have to “hunt” for stale entries in the secondary index. The primary tree tells you exactly what needs to disappear, and the secondary index just mirrors that.

Trade-offs and why it’s still worth it

You do pay a cost for this:

  • Extra writes: every insert becomes two inserts (primary + secondary).
  • Extra state: you hold two sets of keys and pointers.
  • More moving parts: you have to keep the trees logically in sync.

But you gain something very hard to fake with plain MapState and ListState:

  • Event-time–driven eviction that is a simple left-to-right range delete on the primary tree.
  • Fast, sorted access by whatever metric you care about for range and KNN joins.
  • A clean separation between “how long does this data live?” and “how do I want to query it?”.

In other words: the primary index keeps your state bounded and cheap, the secondary index makes your queries expressive and fast. Together, they give you a sane foundation for building sorted interval joins and KNN joins in Flink without turning every operator into a hand-rolled database.

So what does B+TreeState actually buy you?

At this point you've seen a lot of internals: node layouts, page-size-based splits, secondary indexes. It's worth stepping back and asking the only question that really matters: what did all of this actually change for the job?

1. Range and KNN joins became the cheap path

Before B+TreeState, range and K-nearest-neighbour joins were "expensive by default": full scans of unsorted state, big in-memory sorts, and a lot of wasted RocksDB I/O. With a B+Tree-backed index, those same operations turn into simple range walks over leaf nodes. You touch far fewer pages, you don't materialise everything just to throw most of it away, and you stream results back already sorted.

2. State growth became predictable instead of scary

By indexing on event time and treating eviction as a left-to-right range delete, state is bounded by a window in event time rather than by "however much data has gone through this operator since it was deployed". That makes cost and latency a function of clear parameters (window size, key cardinality, K limits), not a mysterious function of job age and traffic spikes.

3. New join patterns became realistic

Once state behaves like an index, patterns that were previously awkward or unsafe become routine: large many-to-many joins with sorted output, "give me page N of all matches for this key", KNN-style joins on a secondary metric, and joins where the result cardinality can be huge but you only ever pull one page at a time into memory. Instead of hand-rolling a bespoke operator for each of these, you can express them as different ways of walking the same underlying index.

The bigger lesson

The bigger lesson for me wasn't "B+Trees are cool", it was that the shape of your state needs to match the shape of your queries. If you keep doing range and proximity joins on top of unordered bags of state, you'll keep fighting RocksDB, GC, and infra bills. When you make the index a first-class concept inside your Flink job, a lot of those "hard problems" collapse into straightforward tree operations.

Thanks for making it this far!