Skip to content

Reactivity in Java Web Services

Share on twitter
Share on linkedin
Share on email
Share on whatsapp
Reactivity in Java Web Services

In this article we intend to be a starting point for the development of Java Web Services using reactive programming techniques. There are many publications regarding Reactivity on the Web. Therefore, we will not go into detail about the theory, although we will see a little context and summary of the most important features to keep them in mind.

Reactive programming is a programming paradigm that has been around for a while. It was conceived in 2010 and in 2014 the "Reactive Manifesto" was published which laid the foundations for the development of applications following the reactive model.

It emerged as a consequence of the demand for faster response times and high availability of systems, features achieved with previous models of Microservices, but giving solution to the problems of excessive use of CPU, blockages in input and output operations or overuse of memory (due to large Threads Pools) that these models suffered.

Reactive Systems

Following the principles of the Reactive Manifesto, the reactive systems must be

  • Responsible: They must respond on time and in an appropriate manner, managing errors appropriately. Responsibility is the basis of the use and usability of systems.
  • Resilient: There is tolerance and recovery from failure. Responsiveness is maintained even when the system fails.
  • Slings: Expansion capacity exists according to demand. Responsibility is maintained even with increased workload.
  • Message-oriented: Communication between components is based on the exchange of asynchronous messages to avoid coupling and improve isolation.

Reactive Programming

Reactive programming focuses on working with asynchronous flows from finite or infinite data sources. It is not something new, there are already Event Buses, click generated events, etc. Reactive scheduling goes further, enabling the work of "streams" or event flows.

It is possible to create a flow of any kind of events to feed a system with. The flow itself can be filtered, transformed, combined and delimited. The portal https://rxmarbles.com/ shows us diagrams of the possible transformations of a flow.

Events are broadcast through the flow in an orderly fashion over time and result in a value, an error or simply a completion signal. Services can subscribe to these events, always asynchronously, by defining functions that will execute (react to) all three possible responses to an event.

In RxJava, streams are represented by Observable objects. Observer objects represent the subscribers to the events broadcast by the Observables:

Observable.just("Reactividad", "Aplicaciones Web", "Java")
    .map(String::toUpperCase)
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            log.debug("Subscribed!");
        }

        @Override
        public void onNext(String s) {
            log.debug("Recived: {}", s);
        }

        @Override
        public void onError(Throwable e) {
            log.debug("Error: {}", e.getMessage());
        }

        @Override
        public void onComplete() {
            log.debug("Completed!");
        }
    });
RxMarble

Output

Subscribed!
Recived: REACTIVIDAD
Recived: APLICACIONES WEB
Recived: JAVA
Completed!

Reactive Java Frameworks

There are several frameworks that implement Reactive Streams as a programming model. The best known to develop under Java, at the time of publication of this article, are

Example: Reactive REST API (SpringBoot + RxJava2)

We are going to implement a prototype Rest Services API for a web application with the principles of a reactive system using Spring Boot and RxJava2.

Structure

Starting from scratch, we use the Spring Boot application rapid prototyping system, Spring Initializr. We will only need the Web Starter dependencies, for web support, and Lombok, to speed up the code:

Spring has its own libraries for the development of reactive applications (WebFlux and CloudStream), but we use RxJava2 for academic reasons. We added the RxJava2 dependency to our project. If we use Gradle, we will have the build.gradle file similar to the following one:

plugins {
    id 'org.springframework.boot' version '2.1.7.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

group = 'com.poc.ractive'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }

    // Undertow instead of Tomcat
    compile.exclude group: 'org.springframework.boot', module: 'spring-boot-starter-tomcat'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation('org.springframework.boot:spring-boot-starter-undertow')
    compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.11'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

With RxJava we can use application servers such as Tomcat, Jetty or Undertow. In this case we will use Undertow by simple preference of the author.

We already have the basis of our project. We are going to expose a REST service that will return us information of "Things", being our model the next one:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
@EqualsAndHashCode(of = "id")
public class Thing implements Serializable {

    private String id;
    private String value;

}

Lombok's annotations allow us to save all the repetitive code. The next step is to create the Service that manages our business logic on the model.

We will add a "Thing" Object Map to have test instances:

@Service
@Slf4j
public class ThingsService implements InitializingBean {

    /**
     * Map of Thing mocks
     */
    private Map<String, Thing> thingsMap = new HashMap<>();

    @Override
    public void afterPropertiesSet() {
        // Populates the Map with mocks at bean intialization
        IntStream.range(1, 10).forEach(
                index -> thingsMap.put("thing" + index, Thing.builder()
                        .id("thing" + index)
                        .value("value" + index)
                        .build()));
    }
}

We need to expose the API Rest so that our service can be used by external systems. First we define the type of response that the API will give:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ThingResponse implements Serializable {

    private Thing thing;
    private String message;

}

And finally we created the controller that exposes the Rest Service:

@RestController
@Slf4j
public class ThingsController {

    /**
     * Service for managing {@link Thing} logic
     */
    private ThingsService thingsService;

    /**
     * Custom constructor for dependency injection
     *
     * @param thingsService Service for managing {@link Thing}
     */
    public ThingsController(@Autowired ThingsService thingsService) {
        this.thingsService = thingsService;
    }

    /**
     * Gets the {@link Thing} related to the given Identifier
     *
     * @param id Entity identifier
     * @return {@link ThingResponse} with http 200 (Ok) if the wanted {@link Thing} instance exists or Http 400
     * (Bad Request) if not.
     */
    @GetMapping(value = "/api/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Single<ResponseEntity<ThingResponse>> getThingById(@PathVariable String id) {
        String transactionId = UUID.randomUUID().toString();
        log.debug("{}: Request received for id: '{}'", transactionId, id);

        // TODO
        
        return null;
    }

    /**
     * Builds the Response based on the given {@link Thing} instance.
     *
     * @param thing {@link Thing} instance. Only instances with Id are valid.
     * @return {@link ThingResponse} with http 200 (Ok) if the wanted {@link Thing} instance exists or Http 400
     * (Bad Request) if not.
     */
    private static ResponseEntity<ThingResponse> mapThingResponse(Thing thing) {
        if (thing.getId() != null) {
            ThingResponse tResponse = ThingResponse.builder().thing(thing).message("OK").build();
            return new ResponseEntity<>(tResponse, HttpStatus.OK);
        } else {
            ThingResponse tResponse = ThingResponse.builder().message(thing.getValue()).build();
            return new ResponseEntity<>(tResponse, HttpStatus.BAD_REQUEST);
        }
    }

The structure of the controller is typical for Spring Boot applications. We use the @RestController notation to indicate that we are going to expose Rest services and inject the instance of the Service that will perform the Business logic directly into the Builder. The @Autowired notation in this case is optional, but we use it for clarity of code.

In order to follow good practice, a unique transaction identifier will be generated for each request so that transactions can be traced.

Hemos creado una función “getThingById” que será invocada mediante un GET a nuestro API y devolverá la respuesta en formato Json. La clase Single<T> es la versión reactiva de la llamada a un método. Equivale a un Observable que emite un solo evento o error y es idónea para éste tipo de aplicaciones.

For now our method returns nothing, we have to complete the "ALL" with the call to the Service. To do this, we will discuss different scenarios and how to solve them:

Outright transactions

We want to get the information on a Thing without doing any collateral operations. We added the "findById" method to the "ThingService":

/**
 * Gets a {@link Thing} instance by its identifier
 *
 * @param id Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> findById(final String transactionId, final String id) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating findById...", transactionId);
        if (thingsMap.containsKey(id)) {
            emitter.onSuccess(thingsMap.get(id));
        } else {
            emitter.onError(new IllegalArgumentException("Thing with id '" + id + "' not found"));
        }
    });
}

En nuestro método, creamos un Reactive Stream de un solo evento mediante “Single.create(…)” y nos suscribimos al flujo. El método “Single.create(…)” acepta una instancia del interfaz funcional “SingleOnSubscribe<T>” con el siguiente descriptor:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of a {@link SingleEmitter} instance that allows pushing
 * an event in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface SingleOnSubscribe<T> {

    /**
     * Called for each SingleObserver that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull SingleEmitter<T> emitter) throws Exception;
}

The "onSuccess" and "onError" methods allow us to indicate if the operation was performed correctly. We update our Controller to invoke the Service and show some traces that help us understand the internal workings:

@GetMapping(value = "/api/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
public Single<ResponseEntity<ThingResponse>> getThingById(@PathVariable String id) {
    String transactionId = UUID.randomUUID().toString();
    log.debug("{}: Request received for id: '{}'", transactionId, id);
    Single<Thing> response = thingsService.findById(transactionId, id);
    log.debug("{}: Request for id: '{}' processed.", transactionId, id);

    return response
            .doOnDispose(() -> log.debug("{}: Disposing", transactionId))
            .doOnSubscribe(disposable -> log.debug("{}: Subscribed", transactionId))
            .doOnSuccess(thing -> log.debug("{}: Success", transactionId))
            .doOnTerminate(() -> log.debug("{}: Terminate", transactionId))
            .subscribeOn(Schedulers.io())
            .onErrorReturn(throwable -> Thing.builder().value(throwable.getMessage()).build())
            .map(ThingsController::mapThingResponse);
}

We use the "subscribeOn" method to indicate the Scheduler (pool of Threads) where our operations will be subscribed to. There are several Schedulers provided by the RxJava2 framework. Looking at the documentation we see that the most suitable for our type of application is "Schedulers.io" as it provides a reusable thread pool for IO purposes.

We start the server and launch the request. We ask for the information of the object that responds to the identifier "thing5", which exists in the map of test objects:

$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"}

So far so good, let's see what the traces show:.

DEBUG [  XNIO-1 task-3] ThingsController: 2afb5b25776e: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-3] ThingsController: 2afb5b25776e: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Subscribed
DEBUG [readScheduler-1] ThingsService   : 2afb5b25776e: Evaluating findById...
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Success
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Terminate 

The application servers have their own thread pool for incoming requests, XNIO-1 for the Undertow we use. As the request arrives, Undertow creates a new thread "XNIO-1 task-3" to which it delegates the request. We see that we enter the Rest method, the Single is created, and there ends the responsibility of the Undertow thread, which is again available in the pool for new requests. At the same time, Schedulers.io creates a new thread, "readScheduler-1", which executes all the business logic. We see how the subscriber sequentially subscribes to the event flow, executes the logic and returns the result. The following sequence diagram shows the entire process:

What the diagram shows is that the Controller is not locked at any time, allowing it to accept requests that are executed in parallel. Likewise the thread generated by the request to the server is released and available for new requests in the Web Server pool. These characteristics take advantage of all the processing capacity and memory of the machine, optimizing resources and allowing a horizontal scalability (system responsiveness and elasticity).

The following request shows the subscriber's behavior to the event flow when the request returns an error:

$> curl -X GET http://localhost:8080/api/unknownThing
{"thing":null,"message":"Thing with id 'unknownThing' not found"}

Output

DEBUG [  XNIO-1 task-5] ThingsController: 786622a7da22: Request received for id: 'unknownThing'
DEBUG [  XNIO-1 task-5] ThingsController: 786622a7da22: Request for id: 'unknownThing' processed.
DEBUG [readScheduler-1] ThingsController: 786622a7da22: Subscribed
DEBUG [readScheduler-1] ThingsService   : 786622a7da22: Evaluating findById...
DEBUG [readScheduler-1] ThingsController: 786622a7da22: Terminate

Combined operations I

When requests to our API require slightly more complex operations where several methods or even several services are invoked, we can combine the operations. For example, following the example, every time the API is invoked, besides returning the required "Thing" object, it could be necessary to perform "doCoolOperationsA", "doCoolOperationsB" operations and, in addition, log the request:

/**
 * Performs iterative operations where each operation doesn't depends on the previous one.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateById(final String transactionId, final String id) {
    return this.findById(transactionId, id)
            .flatMap(thing -> doCoolOperationsA(transactionId, thing))
            .flatMap(thing -> doCoolOperationsB(transactionId, thing))
            .flatMap(thing -> logById(transactionId, thing));
}
/**
 * Performs a logging of the given Thing's identifier
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> logById(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating logById...", transactionId);
        log.debug("{}: Logging Id -> {}", transactionId, thing.getId());
        emitter.onSuccess(thing);
    });
}
/**
 * Performs cool operations with the given {@link Thing} instance
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> doCoolOperationsA(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating doCoolOperationsA...", transactionId);
        emitter.onSuccess(thing);
    });
}
/**
 * Performs cool operations with the given {@link Thing} instance
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> doCoolOperationsB(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating doCoolOperationsB...", transactionId);
        emitter.onSuccess(thing);
    });
}

We modified the driver to invoke the "operateById" method of the "ThingService" and launched the REST request:

$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"}

Output

DEBUG [  XNIO-1 task-1] ThingsController: 575923ae3691: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-1] ThingsController: 575923ae3691: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Subscribed
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating findById...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating doCoolOperationsA...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating doCoolOperationsB...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating logById...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Logging Id -> thing5
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Success
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Terminate

We check that each operation has been subscribed to a different thread (readScheduler-1, readScheduler-2 and readScheduler-3) and has been executed in parallel, being the last thread to be executed the one that gives us back the answer to the service invocation.

Parallel operations II

Sometimes, you may need to combine the different results of operations executed in parallel. For this purpose, we have the Zip operator, which uses a function that receives the values as a result of mixing two reactive streams to generate a new value. It is important to note that since these are asynchronous trades it is not possible to predict the order in which the results are combined:

/**
 * Performs concurrent operations using different threads and merging the result in a single response. The
 * result is the combination of all the results of the executed threads.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateByIdMergingZip(final String transactionId, final String id) {
    Thing thing = this.findById(transactionId, id).blockingGet();
    Single<Thing> operationA = doCoolOperationsA(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationB = doCoolOperationsB(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationC = logById(transactionId, thing).subscribeOn(Schedulers.io());

    return operationA
            .zipWith(operationB, ThingsService::mergeThings)
            .zipWith(operationC, ThingsService::mergeThings);
}

/**
 * Merges two {@link Thing} instances in a new one
 *
 * @param thing1 {@link Thing} in a 'A' state
 * @param thing2 {@link Thing} in a 'B' state
 * @return {@link Thing} in a 'A+B' state
 */
private static Thing mergeThings(Thing thing1, Thing thing2) {
    return new Thing(thing1.getId(), thing1.getValue() + ", " + thing2.getValue());
}

Backpressure

The possibility of working with infinite events in our reactive streams gives rise to scenarios where Observables (event broadcasters) generate events much faster than they can be consumed by our Observers (consumers). This circumstance is called Backpressure and is a scenario to take into account when implementing our reactive API. For example, there are conservative strategies that implement buffers where the elements issued are accumulated until a consumer processes them and more aggressive strategies where the elements not consumed in time are discarded or overwritten by the most recent ones. The choice of the right strategy will depend on our system.

Summary

In this article we have seen the principles of reactive programming and the reasons why it is interesting to take it into account when designing or improving our architectures. We have shown an example of reactive API with Spring Boot and RxJava2, exposing several more or less general situations and how to face them in code and, finally, we have talked about a characteristic inherent to the use of streams, the backpressure, to be taken into account.

Share the article

Share on twitter
Twitter
Share on linkedin
LinkedIn
Share on email
Email
Share on whatsapp
WhatsApp

A new generation of technological services and products for our customers