Gunnar Morling launched end of 2023 (almost beginning of 2024) his *o*ne *b*illion *r*ows challenge.

The challenge

You can check out all the details on Gunnar’s blog post but high level the idea is to read a file containing a location name and a value per line, separated by a semicolon:

---
Bordeaux;12.3
Marrakesh;25.6
...
---

What is interesting is that the value has a maximum of 1 decimal, and a maximum of two digits (so it is -100 < x < 100), and the location name is limited in size too.

The challenge is not to read the values but that the file has 1 million rows and the goal is to compute the min/max/average per location.

the figures given in this post were on my computer and given for comparison purposes.

The goods

By default Gunnar provided a default implementation using Java. Indeed, it is not the fastest - and it was not the goal, but it give a base. The implementation is basically what we woud do in java using a bit Stream and String manipulation. The processing was done in about 4mn - which is already not bad for a very simple code, trivial to maintain and enhance.

Read the file concurrently

Split the file

To process quickly a file the best way is to use all the power possible (IO and CPU), the challenge context was mentionning 8 CPU would be available and a "fast enough" disk. So the first step is to make the processing parallel.

This phase is very common in any big data stack (from Hadoop to Apache Spark or Flink). High level, the idea is to split virtually the file in chunks and let each "worker" process its segment. What is interesting there is that the big data flavor - which often uses a remote filesystem - splits in segments with a heuristic and not exactly aligned on records (lines there) so a worker can overflow its segments to read the end of a record and it can start after the beginning of its segment to not read twice the same record.

Assuming we have this content (the file was inlined to make it easier to "split"):

city1;12.3\ncity2;45.6\ncity3;78.9\ncity4;1.2

If we want to create two chunks (number of CPU/worker normally but it is easier to explain with less) we would get something like the following where || represents the split:

city1;12.3\ncity2;45.||6\ncity3;7.\ncity4;1.

If each worker reads only its segment, city2 wouldn’t see 45.6 but would read 45. and second worker would start with a failling record. To avoid that, the common algorithm is to "read until the end of the record" for the end of the segment so city2 gets 45.6 as value and "read from after the first end of record" for the beginning (except first chunk indeed) so second worker starts at city3. Indeed, for last chunk, it is read until the end.

In big data ecosystem, this adjustment is done "on the flow" because reading is often costly but here we read a local file so we can adjust the segments before starting to process them using whatever way you prefer to navigate in the file around the chunk ends (can be a Channel or RandomAccessFile).

So after this phase we get:

city1;12.3\ncity2;45.6\n||city3;7.\ncity4;1.

and since we don’t care of the separator we can even get:

city1;12.3\ncity2;45.6||city3;7.\ncity4;1.

which enables each worker to read its chunk to the end blindly.

This small adjustment of the widespread concurrent reading of files enable to drop all the state logic managing if we are before/after the end which simplifies a lot the code and even enables to make it a bit faster dropping some condition (but it is negligible most of the time).

Create threads

So at that stage we have chunks so we just have to create threads to read each chunk in a thread. Here there are diverse options:

  • Create an ExecutorService (fixed size or fork/join types for example),

  • Create N Thread,

  • Use ForJoin.commonPool() - and Stream parallel call for example,

  • Use virtual threads (which is just a kind of ForkJoin pool at the end).

Since each worker will basically be 100% computation - we’ll discuss it more soon - all of these options will lead to get a thread per chunk fill CPU capacity. The hability to steal work, to use a queue to process pending work until some thread is available etc is useless.

Conclusion: use whatever you prefer there, there will be no real performance difference.

Summary

So overall, in pseudo code, we have something like:

void run() {
    int cpus = getCpuCount();
    var chunks = splitFile(cpus);
    try (var threads = createThreads(cpus)) {
        threads.invokeAll(chunks);
    }
}

Moving to this kind of processing is quite efficient and enables to reach the …​. it depends how you implement the parsing but assuming you do it the simple way you should be between the minute and 30s. Let move to next part to see the difference between both options.

Read the file efficiently

Handle your memory correctly

When optimizing a software there are always trade off, the most extreme are generally something around "make it easy to read/maintain" vs "make it insanely fast". But when working with data streams (InputStream, Channel and friends) the key is to not re-allocate the memory when possible.

Yes you read it right, even if java manages the memory for you, if you can avoid to reallocate the memory you will be faster. This is a common optimization the parser/writer you use in your applications - even JSON ones - do. Even Apache Tomcat or Jackson use such optimization.

Conretely what we want is:

var buffer = new byte[MAX_LINE_LENGTH];
int read;
while ((read = in.read(buffer)) >= 0) {
    // do process
}

and NOT:

int read;
while ((read = doRead()) >= 0) {
    // do process
}

// ...
int doRead() {
    var buffer = new byte[MAX_LINE_LENGTH]; // oops
    return in.read(buffer);
}

Similarly, since each time you read you do an I/O access, you will want to limit that to enable to work in memory - and therefore abuse of the CPU as planned - so more you read better it is.

So a first implementation can be to do chunks of ~512Mb and load them all in memory to process these chunks concurrently. There you will reach the 45s execution - which is already very good and the code stay quite maintainable.

Handle your memory without byte[]

Now, there is a more efficient way to read bytes from a file inherited from C primitives: mapping the file in memory - challenge allocated enough memory to enable that.

In java it looks like:

try (final var channel = FileChannel.open(input, EnumSet.of(StandardOpenOption.READ)); (1)
    final var arena = Arena.ofConfined()) { (2)
  final var segment = channel.map(READ_ONLY, 0, size, arena); (3)
  final long address = segment.address(); (4)
  // process your file
}
1 Create a file channel from your input,
2 Define a memory retention for the file mapping (= out of the try block the mapping/memory can be released),
3 Map the file content in memory (MemorySegment)
4 Get the pointer to the first byte of the file mapped in memory.

Then you can browse the file, there are multiple options, the simplest is to use MemorySegment directly but it has the same pitfall than using an ArrayList or ByteBuffer to iterate item by item over the array/list: each call checks the indices so does an useless check which costs some CPU time for nothing. The alternative is to use Unsafe which which has a getByte(pointer) method which does exactly what we need.

So now we have a way to navigate in the file in memory, we’ll just use the MemorySegment#address as our offset (our "0") and visit each chunk using Unsafe#getByte instead of preloading a byte[] as before.

This change is quite impressive because it moves to 30s the global execution.

Process lines efficiently

Processing a line needs to extract the location and then parse the value.

First implementation always uses something like:

  1. Find the ';',

  2. Substring of the first part,

  3. Double.parseDouble of the last part.

This has 3 steps…​.and 3 issues:

  • Do not just find the separator but also process the first value in the mean time, will save one iteration later,

  • Do not use String since they are backed by a byte array but also a decoder which is insanely slow and we don’t care of that for most of the processing,

  • Do not parse a simple number (xx.x) as a double since it also handles a lot of syntax which makes it slower than parsing 3 integer more or less.

Process the value

Processing the value is constructing an int (we’ll divide it by 10 later). We know that a byte is a digit and to get its value we just need to remove '0' because '1' is actually '0' + 1 etc…​

So assuming we have our byte array of 4 bytes (xx.x) we just need to do:

int value = (bytes[0]-'0')*100 + (bytes[1]-'0')*10 + (bytes[0]-'0');
for simplicity I don’t enter into much more details but in the challenge negative values must be handled - so x-1 after the parsing if first byte is '-' and missing decimal or multiple of 10.

Some extreme optimizers use SWAR algorithm which is very good for a generic parser but here the complexity of the simple parser is likely the same than the SWAR one so I’m not sure it is worth its added complexity.

Process the key

The key processing is quite simple: just extract the bytes between the beginning and the semicolon but the challenge is to fusion the values.

For that there are sevral options but high level they all converge to get a hash and a backing structure enabling to store the value aggregator (we’ll speak of that in next part) to cumulate the parsed values thank previous part.

You can, indeed implement a custom structure - but I have to admit that when I benchmarked it compared to a well sized HashMap I didn’t see any boost.

So overall our algorithm will be:

// beginning of the worker
final var aggregators = new HashMap<Location, Aggregator>();

// loop
// while() {
    final var location = readLocation();
    final int value = readValue();
    aggregators.computeIfAbsent(location, k -> new Aggregator()).add(value);
// }

But this snippet hides multiple things:

  • computeIfAbsent is slower than get and if not found put pattern,

  • location key must be very efficient if you use a HashMap because if too much key use the same hash the map will keep adding nodes for this hash (see it as a list behind the index represented by the hash),

  • hash must be quick to compute,

  • equals - which is used after the first put for each hash - must be quick to compare keys.

At that stage we have a byte[] for the location so we can use Arrays.hashCode and Arrays.equals but it will keep iterating over the array and is not insanely fast.

To compensate that we can reuse the same algorithm - it is fast enough - and cache the hashcode for the item already stored in the map (so hashcode computation is not redone) and only use the equals when needed.

One trick there to avoid to iterate once to find the location and once to compute its hashcode is to compute the hashcode while we find the semicolon:

// start readRecord() {
final long nameStart = memIndex;
int hash = UNSAFE.getByte(memIndex++);
byte c;
while ((c = UNSAFE.getByte(memIndex++)) != ';') {
    hash *= 31;
    hash += c;
}
final long nameEnd = memIndex - 1;

final var name = new byte[(int) (nameEnd - nameStart)];
UNSAFE.copyMemory(null, nameStart, name, Unsafe.ARRAY_BYTE_BASE_OFFSET, name.length);
// }

This snippet gives us the start and end offsets of the name and loads it in a byte[] - indeed we can avoid some allocation in this pseudo code.

the map must always be sized to avoid to need to be resized for speed (challenge was providing a max of locations) so HashMap#newHashMap(int) (or HashMap constructor if you do the computation yourself) is highly recommended.

Handle the aggregation correctly

To compute a min/max/average 3-uple you just need to keep storing for each location the:

  • min

  • max

  • number of value

  • sum of values

So you can get the map we just talked about as a ConcurrentHashMap, share it accross the threads and store everything in the aggregators.

However, if you do so you need to lock the aggregator or allocate one aggregator per thread (in an array with one index per worker for example). To avoid that, the simplest and recommended solution is to create one map per worker and after all worker terminated, fusion them since the four values we stored in the aggregators are trivially mergeable before the final computation (don’t forget to divide by ten the values once all is merged since we parsed floats/doubles as integers).

Congratulation! You just did a map/reduce and if you followed all these advices you should be around 10s on my computer scale!

The bads

There aren’t a lot of bads in this challenges but I want to highlight a few extremes we see sometimes.

The equals/hashcode irrespect

Recall we need a very fast hashCode and equals implementation for our locations? We treated hashCode but one way to make equals very fast is to implement it this way:

@Override
public boolean equals(final Object obj) {
    return true;
}

It can’t be faster right? This means that, since we precompute the hash, that our map key can be this wrapper (of byte[]) class:

class Bytes {
  private final byte[] value;
  private final int hash;

  private Bytes(final byte[] value, final int hash) {
    this.value = value;
    this.hash = hash;
  }

  @Override
  public int hashCode() {
    return hash;
  }

  @Override
  public boolean equals(final Object obj) {
    return true;
  }
}

I qualify it of bad because it means it assumes: * there is no conflict in the incoming key for hashes (each key has a different hash) * each hash lead to a different cell in the hashmap storage - so map is well sized for the incoming data * the input dataset (locations) is known enough to ensure previous point for each execution.

indeed there are real life cases - and this challenge one where location are weather station location could be one since it almost never moves - where these assumptions are really acceptable, it is up to you to evaluate if the associated boost is worth it or not (do you want to gain 1s for 1 billion rows?).

SWAR

SWAR algorithm by itself is a very good thing but in the context of this challenge there is no real boost using it due to file format/hypothesis. I’ll not copy the algorithm there but is it mainly using binary operations and still its complexity is comparable to the one we have without doing it and since CPU have most of the operations we use to convert the data to a number already wired the boost is not there - it can even be the opposite.

Conclusion: don’t be greeky (geek + greedy ;)).

Don’t reimplement well implemented code

The challenge didn’t allow to use any dependency but some participant - not the fastest when writing these lines - reimplemented containers - more than dispatching in an array which is okish - and in particular maps.

I didn’t find a good proof that it was faster than built-in structures - well sized HashMap was fast enough for me by comparison - but it is also something quite hard to maintain and which will make you miss some performance optimizations of the JVM - we regularly see the JVM enhancing its structures.

So for a challenge it is fun to do it but please not in real life/applications.

Don’t use Stream#parallel until you know why

For this challenge it was not critical but Stream#parallel behavior is basically "you’ll be ran in parallel but you don’t know in which context nor how".

However, Stream design enables map/reduce algorithm thanks SplitIterator so if you wan to use it you have to do the parallel call from a ForkJoinWorkerThread you will inherit (but this is cumbersome and generally it is saner to control the threading of your application to ensure you can bulkhead part of your application, avoid deadlocks and tune it accurately). If you don’t do that, the default reduce tasks (most of terminal operations) will end up calling java.util.concurrent.ForkJoinTask#fork so ForkJoin.commonPool(). This is the easiest and works great for batch like applications.

Alternatively you can handle the split yourself from the SplitIterator but it can become hard with hinted streams (sorted, sized, etc), see java.util.stream.AbstractShortCircuitTask#compute for example - but keep in mind in web application you likely don’t want to abuse concurrency there to keep the scalability of your web application part.

Tomorrow we’ll probably be able to use it withing virtual threads but today virtual threads are just a hidden ForkJoinPool - not, not the same than the commonPool one, see java.lang.VirtualThread#DEFAULT_SCHEDULER, it has its own system properties like jdk.virtualThreadScheduler.parallelism - so we would just move the issue a bit further and not even leverage the continuation capabilities for this challenge (ew just consume 100% of the thread resources we use from a worker).

even if implicit threading is a trend, it is key to always know where your code is executed and in which thread (for example common pool - which is implicitly used by HttpClient in async mode - will make you look your contextual ClassLoader so your thenApply can fail with a ClassNotFound). The only case using a banalised thread pool is ok is when all the code of all the stack is fully reactive. For Java it means when the whole platform will be reactive we will be able to default to virtual threads, but we are far from there so please ensure to control your threads for now.

Conclusion

I’d like to thank Gunnar a lot for this challenge, it was a really interesting time and I loved to highlght simple algorithms people don’t always know (like the map/reduce which is rarely masterized by common developers from my experience).

It is also very great to see that performance boost are not always at the level we think and it is a great reminder that you shouldn’t optimize any application without metrics or a very good knowledge of what you do. Similarly, also ensure to run performance tests on a comparable environments (don’t run one listenining music and unplugged on a laptop and the other plugged without any aplication running ;)) and (when possible) on the target hardware because it can completely change the results (imagine you use GPU vectorization and there is no GPU on the machine you run on in prod). No "feeling" there, just metrics and facts.

Another insane conclusion of this challenge is that Java is moving and in the right direction, the MemorySegment is very promishing for example and make working with mapped file way easier than before.

Lastly, since we talked of goods but also bad, it is important to keep in mind that in real work we always evaluate the "next dev" who will maintain our code and wonder if the gain is worth it.

I really encourage you to review some of the proposals - including the "naive" original implementation.

From the same author:

In the same category: