Did you think about closing your Java stream?
Java streams are awesome to orchestrate processing flows, merge data coming from multiple sources and so on...however, as any new API it can hide some surprises. Let see why java.util.stream.Stream has a close() method.
Quick reminder on streams
Your first contact with streams is pretty close to an enriched Iterator API: it is a not yet materialized (with values) sequence of values you can iterate over and, with Stream, process to return a new structure.
The hello world is generally not far from:
public static void main(final String[] args) {
Stream.of("a", "b").forEach(System.out::println);
}
If you execute it, you will of course see this output:
a
b
The next step is to start using the Stream operators (map, flatMap etc...):
public static void main(final String[] args) {
Stream.of("a", "b")
.map(String::length)
.forEach(System.out::println);
}
Which will output the length of each string:
1
1
Then you finally end up using collectors or reducers which are a way to combine the values in a single one or a container:
public static void main(final String[] args) {
final List<Integer> collected = Stream.of("a", "b")
.map(String::length)
.collect(toList());
System.out.println(collected);
}
Which aggregates the previous computation in a list, so the output is very close:
[1, 1]
All good, so why am I writing a blog post on something we see everywhere since many years? Because this usage is the expected one even the one you will find in all libraries and documentations!
java.util.stream and close method
A stream is designed to be an API so it must stay generic. This is why there is a close() method on it. It enables to expose data which can require a remote access or any resource usage. Here is how previous code should be written:
public static void main(final String[] args) {
final List<Integer> collected;
try (final Stream<String> source = Stream.of("a", "b")) {
collected = source.map(String::length).collect(toList());
}
System.out.println(collected);
}
The main difference is to wrap the stream in a try with resource block to ensure the stream is closed after the block. It also implies the stream was consumed in the block - thanks our collect() in our sample - otherwise the stream would be closed before being visited which would lead to an exception.
Is it a big deal? Yes and no. A lot of built-in streams don't need that and if you use them internally, not closing them is as good as using a StringReader and not closing it: the close method just reset some internal state to encourage a faster garbage collection. But if you object life is as long as the method, the object will be elligible to the collection anyway. However, if you start to compose the stream through methods, some other methods can use the Stream contract and rely on that fact close() is assumed being called. This means that, even if your stream does not need to do any clean up, you will still need to call close(). Here is a very simple example - just consider the methods are in different classes to make it more "real":
getContracts().forEach(System.out::println);
Here we want to get a stream of contracts and process them - log them with previous sample.
Now if we are in a microservice environment like the trend tends to encourage nowadays, we will load our contract identifiers from our database (select contract_id from user for example). Let's illustrate it with a stream not requiring a close call:
private Stream<String> requestData() {
return Stream.of("id1", "id2");
}
Then we'll need to load the contracts from another service, let's assume we call /contract/{id} for each one of them:
private Stream<Contract> getContracts() {
final Client client = ClientBuilder.newClient();
try {
return requestData()
.onClose(client::close)
.map(id -> client.target("http://contract.company.com/{id}")
.resolveTemplate("id", id)
.request(APPLICATION_JSON_TYPE)
.get(Contract.class));
} finally {
client.close();
}
}
And here is the issue: the link between the method loading the identifier and the one loading the contract (likely two different services like UserService and ContractService) relies on Stream contract which includes onClose(). However since the stream is never closed then the cleanup will never be done. This example is using JAX-RS 2 and the client.close() method is really needed when you setup an asynchronous client (CXF with async httpclient for instance), otherwise you will leak threads.
The solution is to update our external (higher) caller to close the stream:
try (final Stream<Contract> contractStream = getContracts()) {
contractStream.forEach(System.out::println);
}
It is important to never close a stream at a level just enriching the stream with no final operators (collect, reduce, forEach, close, toArray, ...). If this simple rule is not respected you will eagerly close the stream and make it no more consummable by the caller which will just make the code broken.
...this is he general rule at least and as any good rule - in particular for a french guy ;) - there is an exception: flatMap case.
java.util.stream, close and flatMap
As a quick reminder, flatMap operator switches the current stream in the result of this operator.
For example if we "flatMap" a stream of integers by a stream containing all integers between 1 and each integer, we will obtain a new stream composed of all these substreams:
Stream.of(1, 2, 3)
.flatMap(value -> IntStream.rangeClosed(1, value).boxed())
.forEach(System.out::println);
// output
1
1
2
1
2
3
Each integer was replaced by the chain between 1 and N and each of these flatMap result were concatenated in a single stream before we print each value of the resulting stream.
This is very powerful and enables you to write advanced code in a very simple way, but, regarding our close point, it brings an issue: who is responsible for closing the stream? The most natural answer would likely be to add an onClose and close the mapped stream here, but the code would require to aggregate the intermediate streams to close them all at the end or some more complex logic. Not friendly and completely breaks the stream goodness. This is why java API does not do that and automatically close the resulting streams of a flatMap for you:
Stream.of(1, 2, 3)
.flatMap(value -> IntStream.rangeClosed(1, value)
.onClose(() -> System.out.println("close #" + value))
.boxed())
.forEach(System.out::println);
The output will be:
1
close #1
1
2
close #2
1
2
3
close #3
No need to even close the enclosing stream, when the flatMapped stream is consummed, it is close. Neat, right?
It also has a direct implication which can make your code more fluent: you can always create a stream of one element which does not require to be closed and flatMap your stream requiring to be closed:
Stream.of(0)
.flatMap(ignored -> requestData()
.onClose(() -> System.out.println("close")))
.forEach(System.out::println);
Will output:
id1
id2
close
This example, and particularly the Stream.of(0), is not great but if you think about it, you always have a "static" data as incoming source (an identifier, a where clause etc...). So if you start your stream processing earger enough you will nest your other streams - needing to be closed - and the close() will be called automatically.
Conclusion
Java Stream is really a very powerful API and it can be used in libraries and core code to combine flows all along with your application to benefit the most from this new API. However you have to keep in mind it is designed to be a generic flow of data and thus, can be implemented with all potential technologies - including websockets, SSE etc... This is why it is important to respect the Stream contract for any stream coming from the outside of your own method (inside, you control it) and know what you can/have to do - and make sure you close any read stream properly.
From the same author:
In the same category: