Convert Flux into List, Map – Reactor

In this tutorial, I will show you ways to convert Flux to List, and FLux to Map example that uses collectList(), collectSortedList(), collectMap(), collectMultimap() function..


Ways to convert Flux into Collection

We will use Flux methods such as:

  • collectList(): accumulate sequence into a Mono<List>.
  • collectSortedList(): accumulate sequence and sort into a Mono<List>.
  • collectMap(): convert sequence into a Mono<Map>.
  • collectMultimap(): convert sequence into a Mono<Map> that each Map’s key can be paired with multi-value (in a Collection).

Then the Mono result above will be converted into a real List/Map using block() method.

Declare & Initialize Flux

There are many ways to initialize a Flux, in this tutorial, we’re gonna use a simple way with Flux.just() function.

Flux<String> flux = Flux.just(
						"website_0:bezkoder.com", 
						"meta_0:Java Tutorial",
						"meta_1:Project Reactor");

Flux to List Conversion

Flux collectList()

collectList() will accumulates sequence into a Mono<List>, then we use block() method to subscribe to the Mono and block indefinitely until a next signal is received.

List<String> list1 = flux.collectList().block();
list1.forEach(System.out::println);

Result:

website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor

Flux collectSortedList()

collectSortedList() accumulates sequence and sort into a Mono<List>, then we use block() method to subscribe to the Mono and block it.

List<String> list2 = flux.collectSortedList().block();
list2.forEach(System.out::println);

Result:

meta_0:Java Tutorial
meta_1:Project Reactor
website_0:bezkoder.com

Flux to List without block()

If you don’t want to use block(), you can make it work with Java Disposable.subscribe() method.

List<String> list3 = new ArrayList<>();
flux.collectList().subscribe(list3::addAll);
list3.forEach(System.out::println);

You should know that converting a Flux to a List/Stream makes the whole thing NOT reactive. So the subscribe() method doesn’t help you keep away from blocking. You may or may not want this depending on the use-case.

The block() or subscribe() method won’t return anything if the Flux is infinite. For example:

// never return
Flux.interval(Duration.ofMillis(1000)).collectList().block();

// never return also
Flux.interval(Duration.ofMillis(1000)).collectList().subscribe();

To prevent blocking indefinitely or for too long, just use Duration as a parameter of block() like this: block(Duration.ofMillis(1000)).

If the subscription does not complete on time, it will throw a timeout exception.

Flux to Map Conversion

Flux collectMap()

Function prototype:

Mono<Map> collectMap(keyExtractor, valueExtractor)

– First, the function converts sequence into a Mono<Map>.
– Finally, the Mono becomes real List/Map by block() method.

Map<String, String> map1 = flux
		.collectMap(
				item -> item.split(":")[0], 
				item -> item.split(":")[1])
		.block();
map1.forEach((key, value) -> System.out.println(key + " -> " + value));

Result:

website -> bezkoder.com
meta_1 -> Project Reactor
meta_0 -> Java Tutorial

Flux collectMultimap()

Function prototype:

Mono<Map<Object, Collection>> collectMultimap(keyExtractor, valueExtractor)

collectMultimap(): convert sequence into a Mono<Map> that each Map’s key can be paired with multi-value (in a Collection).

For example, we’re gonna get a Map with website and meta as keys:

Map<String, Collection<String>> map2 = flux
		.collectMultimap(
				item -> item.split("_[0-9]+:")[0], 
				item -> item.split(":")[1])
		.block();
map2.forEach((key, value) -> System.out.println(key + " -> " + value));

Check the result:

website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]

Implementation

Technology

– Java 8
– Maven 3.6.1
– Reactor Core 3.4.0 with the 2020.0.1 release train.

Source Code

package com.bezkoder.reactor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import reactor.core.publisher.Flux;

public class FluxCollection {
	public static void main(String[] args) {
		Flux<String> flux = Flux.just(
				"website_0:bezkoder.com",
				"meta_0:Java Tutorial",
				"meta_1:Project Reactor");

		System.out.println(">>> flux.collectList() >>>");
		List<String> list1 = flux.collectList().block();
		list1.forEach(System.out::println);

		System.out.println("\n>>> flux.collectSortedList() >>>");
		List<String> list2 = flux.collectSortedList().block();
		list2.forEach(System.out::println);

		System.out.println("\n>>> flux to list without block >>>");
		List<String> list3 = new ArrayList<>();
		flux.collectList().subscribe(list3::addAll);
		list3.forEach(System.out::println);

		System.out.println("\n>>> flux.collectMap() >>>");
		Map<String, String> map1 = flux
				.collectMap(
						item -> item.split(":")[0], 
						item -> item.split(":")[1])
				.block();
		map1.forEach((key, value) -> System.out.println(key + " -> " + value));

		System.out.println("\n>>> flux.collectMultimap() >>>");
		Map<String, Collection<String>> map2 = flux
				.collectMultimap(
						item -> item.split("_[0-9]+:")[0], 
						item -> item.split(":")[1])
				.block();
		map2.forEach((key, value) -> System.out.println(key + " -> " + value));

		System.out.println("\n>>> flux to map without block >>>");
		Map<String, Collection<String>> map3 = new HashMap<>();
		flux.collectMultimap(
				item -> item.split("_[0-9]+:")[0],
				item -> item.split(":")[1])
			.subscribe(map3::putAll);
		map3.forEach((key, value) -> System.out.println(key + " -> " + value));
	}
}

The Result

>>> flux.collectList() >>>
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor

>>> flux.collectSortedList() >>>
meta_0:Java Tutorial
meta_1:Project Reactor
website_0:bezkoder.com

>>> flux to list without block() >>>
website_0:bezkoder.com
meta_0:Java Tutorial
meta_1:Project Reactor

>>> flux.collectMap() >>>
meta_1 -> Project Reactor
meta_0 -> Java Tutorial
website_0 -> bezkoder.com

>>> flux.collectMultimap() >>>
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]

>>> flux to map without block() >>>
website -> [bezkoder.com]
meta -> [Java Tutorial, Project Reactor]

Appendix: Getting Reactor

Reactor installation in Maven

– First, import the BOM by adding the following to pom.xml:

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2020.0.1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

– Next, add dependency:

<dependencies>
  <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
  </dependency>
</dependencies>

Reactor installation in Gradle

– First, apply the plugin from the Gradle Plugin Portal:

plugins {
  id "io.spring.dependency-management" version "1.0.7.RELEASE" 
}

– Next use dependency-management to import the BOM:

dependencyManagement {
  imports {
    mavenBom "io.projectreactor:reactor-bom:2020.0.1"
  }
}

– Finally, add dependency:

dependencies {
  implementation 'io.projectreactor:reactor-core' 
}

Further Reading

Leave a Reply

Your email address will not be published. Required fields are marked *