Skip to content

Reactividad en Servicios Web Java

Share on twitter
Share on linkedin
Share on email
Share on whatsapp
Reactividad en Servicios Web Java

En este artículo pretendemos ser un punto de partida para el desarrollo de Servicios Web Java utilizando técnicas de programación reactiva. Existen muchas publicaciones respecto a Reactividad en la Web. Por ello, no entraremos en detalle sobre la teoría, aunque sí veremos un poco de contexto y resumen de las características más importantes para tenerlas en mente.

La programación reactiva es un paradigma de programación que lleva tiempo entre nosotros. Se conceptuó en 2010 y en 2014 se publicó el “Reactive Manifesto” que sentaba las bases para el desarrollo de aplicaciones siguiendo el modelo reactivo.

Surgió como consecuencia de la demanda de tiempos de respuesta más rápidos y alta disponibilidad de los sistemas, características logradas con modelos previos de Microservicios, pero dando solución a los problemas de uso excesivo de CPU, bloqueos en operaciones de entrada y salida o sobre uso de memoria (debido a grandes Threads Pools) de los que adolecían éstos modelos.

Sistemas Reactivos

Siguiendo los principios del Reactive Manifesto, los sistemas reactivos deben ser:

  • Responsivos: Deben responder en tiempo y de forma adecuada, gestionando los errores de manera apropiada. Responsabilidad es la base de la utilizada y usabilidad de sistemas.
  • Resilientes: Existen tolerancia y recuperación ante fallos. Se mantiene la responsividad incluso cuando el sistema falla.
  • Eslásticos: Existe la capacidad de expansión según la demanda. La responsividad se mantiene incluso con el aumento de carga de trabajo.
  • Orientados a Mensajes: La comunicación entre componentes se basa en el intercambio de mensajes asíncronos para evitar el acoplamiento y mejorar el aislamiento.

Programación Reactiva

La programación reactiva se centra en el trabajo de flujos asíncronos de orígenes de datos finitos o infinitos. No es algo nuevo, ya existen los Buses de Eventos, eventos generados por clicks, etc. La programación reactiva va más allá, habilitando el trabajo de “streams” o flujos de eventos.

Es posible crear un flujo de cualquier tipo de eventos con los que ir alimentando un sistema. El flujo en sí, podrá ser filtrado, transformado, combinado y delimitado. El portal https://rxmarbles.com/ nos muestra diagramas de las posibles transformaciones de un flujo.

Los eventos se emiten a través del flujo de forma ordenada en el tiempo y tienen como resultado un valor, un error o simplemente una señal de completado. Los servicios se pueden suscribir a dichos eventos, siempre de forma asíncrona, definiendo funciones que se ejecutarán (reaccionarán) ante los tres posibles respuestas de un evento.

En RxJava, los streams se representan mediante objetos Observable. Los objetos Observer representan los suscriptores a los eventos emitidos por los Observables:

Observable.just("Reactividad", "Aplicaciones Web", "Java")
    .map(String::toUpperCase)
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            log.debug("Subscribed!");
        }

        @Override
        public void onNext(String s) {
            log.debug("Recived: {}", s);
        }

        @Override
        public void onError(Throwable e) {
            log.debug("Error: {}", e.getMessage());
        }

        @Override
        public void onComplete() {
            log.debug("Completed!");
        }
    });
RxMarble

Output

Subscribed!
Recived: REACTIVIDAD
Recived: APLICACIONES WEB
Recived: JAVA
Completed!

Frameworks Java Reactivos

Existen varios frameworks que implementan los Streams Reactivos como modelo de programación. Los más conocidos para desarrollar bajo Java, a día de publicación de este artículo, son:

Ejemplo: API REST Reactivo (SpringBoot + RxJava2)

Vamos a implementar un prototipo de API de servicios Rest para una aplicación web con los principios de un sistema reactivo utilizando Spring Boot y RxJava2.

Estructura

Partiendo desde cero, utilizamos el sistema de prototipado rápido de aplicaciones Spring Boot, Spring Initializr. Solamente vamos a necesitar las dependencias de Web Starter, para el soporte Web, y Lombok, para agilizar el código:

Spring tiene sus propias librerías para el desarrollo de aplicaciones reactivas (WebFlux y CloudStream), pero utilizamos RxJava2 por motivos académicos. Agregamos la dependencia de RxJava2 a nuestro proyecto. Si utilizamos Gradle, tendremos el fichero build.gradle similar al siguiente:

plugins {
    id 'org.springframework.boot' version '2.1.7.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

group = 'com.poc.ractive'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }

    // Undertow instead of Tomcat
    compile.exclude group: 'org.springframework.boot', module: 'spring-boot-starter-tomcat'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation('org.springframework.boot:spring-boot-starter-undertow')
    compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.11'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

Con RxJava podemos utilizar servidores de aplicaciones como Tomcat, Jetty o Undertow. En este caso utilizaremos Undertow por simple preferencia del autor.

Ya tenemos la base de nuestro proyecto. Vamos a exponer un servicio REST que nos devuelva información de “Cosas”, siendo nuestro modelo el siguiente:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
@EqualsAndHashCode(of = "id")
public class Thing implements Serializable {

    private String id;
    private String value;

}

Las anotaciones de Lombok nos permiten ahorrarnos todo el código repetitivo.El siguiente paso es crear el Servicio que gestione nuestra lógica de negocio sobre el modelo.

Añadiremos un Mapa de objetos “Thing” para tener instancias de pruebas:

@Service
@Slf4j
public class ThingsService implements InitializingBean {

    /**
     * Map of Thing mocks
     */
    private Map<String, Thing> thingsMap = new HashMap<>();

    @Override
    public void afterPropertiesSet() {
        // Populates the Map with mocks at bean intialization
        IntStream.range(1, 10).forEach(
                index -> thingsMap.put("thing" + index, Thing.builder()
                        .id("thing" + index)
                        .value("value" + index)
                        .build()));
    }
}

Nos falta el exponer el API Rest para que nuestro servicio pueda ser utilizado por sistemas externos. Primero definimos el tipo de respuesta que va a dar el API:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ThingResponse implements Serializable {

    private Thing thing;
    private String message;

}

Y por último creamos el controlador que expone el Servicio Rest:

@RestController
@Slf4j
public class ThingsController {

    /**
     * Service for managing {@link Thing} logic
     */
    private ThingsService thingsService;

    /**
     * Custom constructor for dependency injection
     *
     * @param thingsService Service for managing {@link Thing}
     */
    public ThingsController(@Autowired ThingsService thingsService) {
        this.thingsService = thingsService;
    }

    /**
     * Gets the {@link Thing} related to the given Identifier
     *
     * @param id Entity identifier
     * @return {@link ThingResponse} with http 200 (Ok) if the wanted {@link Thing} instance exists or Http 400
     * (Bad Request) if not.
     */
    @GetMapping(value = "/api/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Single<ResponseEntity<ThingResponse>> getThingById(@PathVariable String id) {
        String transactionId = UUID.randomUUID().toString();
        log.debug("{}: Request received for id: '{}'", transactionId, id);

        // TODO
        
        return null;
    }

    /**
     * Builds the Response based on the given {@link Thing} instance.
     *
     * @param thing {@link Thing} instance. Only instances with Id are valid.
     * @return {@link ThingResponse} with http 200 (Ok) if the wanted {@link Thing} instance exists or Http 400
     * (Bad Request) if not.
     */
    private static ResponseEntity<ThingResponse> mapThingResponse(Thing thing) {
        if (thing.getId() != null) {
            ThingResponse tResponse = ThingResponse.builder().thing(thing).message("OK").build();
            return new ResponseEntity<>(tResponse, HttpStatus.OK);
        } else {
            ThingResponse tResponse = ThingResponse.builder().message(thing.getValue()).build();
            return new ResponseEntity<>(tResponse, HttpStatus.BAD_REQUEST);
        }
    }

La estructura del controlador es la típica en aplicaciones Spring Boot. Utilizamos la anotación @RestController para indicar que vamos a exponer servicios Rest e inyectamos la instancia del Servicio que realizará la lógica de Negocio directamente en el Constructor. La anotación @Autowired en este caso es opcional, pero la utilizamos por claridad del código.

Para seguir las buenas prácticas, se generará un identificador único de transacción en cada petición para poder tracear las operaciones.

Hemos creado una función “getThingById” que será invocada mediante un GET a nuestro API y devolverá la respuesta en formato Json. La clase Single<T> es la versión reactiva de la llamada a un método. Equivale a un Observable que emite un solo evento o error y es idónea para éste tipo de aplicaciones.

Por ahora nuestro método devuelve nada, tenemos que completar el “TODO” con la llamada al Servicio. Para ello, vamos a comentar diferentes escenarios y cómo resolverlos:

Operaciones simples

Queremos obtener la información de una Cosa sin realizar operaciones colaterales. Agregamos el método “findById” al Servicio “ThingService“:

/**
 * Gets a {@link Thing} instance by its identifier
 *
 * @param id Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> findById(final String transactionId, final String id) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating findById...", transactionId);
        if (thingsMap.containsKey(id)) {
            emitter.onSuccess(thingsMap.get(id));
        } else {
            emitter.onError(new IllegalArgumentException("Thing with id '" + id + "' not found"));
        }
    });
}

En nuestro método, creamos un Reactive Stream de un solo evento mediante “Single.create(…)” y nos suscribimos al flujo. El método “Single.create(…)” acepta una instancia del interfaz funcional “SingleOnSubscribe<T>” con el siguiente descriptor:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of a {@link SingleEmitter} instance that allows pushing
 * an event in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface SingleOnSubscribe<T> {

    /**
     * Called for each SingleObserver that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull SingleEmitter<T> emitter) throws Exception;
}

Los métodos “onSucces” y “onError” nos permiten indicar si la operación se ha realizado correctamente. Actualizamos nuestro Controlador para invocar el Servicio y mostrar algunas trazas que nos ayuden a entender el funcionamiento interno:

@GetMapping(value = "/api/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
public Single<ResponseEntity<ThingResponse>> getThingById(@PathVariable String id) {
    String transactionId = UUID.randomUUID().toString();
    log.debug("{}: Request received for id: '{}'", transactionId, id);
    Single<Thing> response = thingsService.findById(transactionId, id);
    log.debug("{}: Request for id: '{}' processed.", transactionId, id);

    return response
            .doOnDispose(() -> log.debug("{}: Disposing", transactionId))
            .doOnSubscribe(disposable -> log.debug("{}: Subscribed", transactionId))
            .doOnSuccess(thing -> log.debug("{}: Success", transactionId))
            .doOnTerminate(() -> log.debug("{}: Terminate", transactionId))
            .subscribeOn(Schedulers.io())
            .onErrorReturn(throwable -> Thing.builder().value(throwable.getMessage()).build())
            .map(ThingsController::mapThingResponse);
}

Utilizamos el método “subscribeOn” para indicar el Scheduler (pool de Threads) donde se van a suscribir nuestras operaciones. Existen varios Schedudlers facilitados por el framework de RxJava2. Ojeando la documentación vemos que el más indicado para nuestro tipo de aplicación es “Schedulers.io” ya que provee un pool de threads reutilizable para propósitos de IO.

Iniciamos el servidor y lanzamos la petición. Solicitamos la información del objeto que responde al identificador “thing5“, que existe en el mapa de objetos de prueba:

$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"}

Hasta aquí todo correcto, veamos que muestran las trazas:.

DEBUG [  XNIO-1 task-3] ThingsController: 2afb5b25776e: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-3] ThingsController: 2afb5b25776e: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Subscribed
DEBUG [readScheduler-1] ThingsService   : 2afb5b25776e: Evaluating findById...
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Success
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Terminate 

Los servidores de aplicaciones tienen su propio thread pool para las peticiones que llegan, XNIO-1 para el Undertow que utilizamos. Según llega la petición, Undertow crea un nuevo hilo “XNIO-1 task-3” en el que delega la petición. Vemos que se entra en el método Rest, se crea el Single, y ahí termina la responsabilidad del hilo de Undertow, que vuelve a estar disponible en el pool para nuevas peticiones. Paralelamente, Schedulers.io crea un nuevo hilo, “readScheduler-1”, que ejecuta toda la lógica de negocio. Vemos como el suscriptor secuencialmente se suscribe al flujo de eventos, ejecuta la lógica y devuelve el resultado. El siguiente diagrama de secuencia muestra todo el proceso:

Lo que muestra el diagrama es que el Controlador no queda bloqueado en ningún momento, permitiendo aceptar peticiones que se van ejecutando en paralelo. Igualmente el hilo generado por la petición al servidor queda liberado y disponible para nuevas solicitudes en el pool del Servidor Web. Estas características aprovechan toda la capacidad de procesamiento y memoria de la máquina optimizando recursos y permitiendo una escalabilidad horizontal (responsibidad y elasticidad de sistemas).

La siguiente petición muestra el comportamiento del suscriptor al flujo de eventos cuando la petición devuelve un error:

$> curl -X GET http://localhost:8080/api/unknownThing
{"thing":null,"message":"Thing with id 'unknownThing' not found"}

Output

DEBUG [  XNIO-1 task-5] ThingsController: 786622a7da22: Request received for id: 'unknownThing'
DEBUG [  XNIO-1 task-5] ThingsController: 786622a7da22: Request for id: 'unknownThing' processed.
DEBUG [readScheduler-1] ThingsController: 786622a7da22: Subscribed
DEBUG [readScheduler-1] ThingsService   : 786622a7da22: Evaluating findById...
DEBUG [readScheduler-1] ThingsController: 786622a7da22: Terminate

Operaciones combinadas I

Cuando las peticiones a nuestro API requieren operaciones un poco más complejas donde se invocan varios métodos o incluso varios servicios, podemos combinar las operaciones. Por ejemplo, siguiendo el ejemplo, cada vez que se invoque el API, además de devolver el objeto “Cosa” requerido, podría ser necesario realizar operaciones “doCoolOperationsA”“doCoolOperationsB” y, además, loguear en un histórico la petición:

/**
 * Performs iterative operations where each operation doesn't depends on the previous one.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateById(final String transactionId, final String id) {
    return this.findById(transactionId, id)
            .flatMap(thing -> doCoolOperationsA(transactionId, thing))
            .flatMap(thing -> doCoolOperationsB(transactionId, thing))
            .flatMap(thing -> logById(transactionId, thing));
}
/**
 * Performs a logging of the given Thing's identifier
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> logById(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating logById...", transactionId);
        log.debug("{}: Logging Id -> {}", transactionId, thing.getId());
        emitter.onSuccess(thing);
    });
}
/**
 * Performs cool operations with the given {@link Thing} instance
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> doCoolOperationsA(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating doCoolOperationsA...", transactionId);
        emitter.onSuccess(thing);
    });
}
/**
 * Performs cool operations with the given {@link Thing} instance
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> doCoolOperationsB(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating doCoolOperationsB...", transactionId);
        emitter.onSuccess(thing);
    });
}

Modificamos el controlador para invocar el método “operateById” del Servicio “ThingService” y lanzamos la petición REST:

$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"}

Output

DEBUG [  XNIO-1 task-1] ThingsController: 575923ae3691: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-1] ThingsController: 575923ae3691: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Subscribed
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating findById...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating doCoolOperationsA...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating doCoolOperationsB...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating logById...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Logging Id -> thing5
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Success
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Terminate

Comprobamos que cada operación se ha suscrito a un hilo diferente (readScheduler-1, readScheduler-2 y readScheduler-3) y se ha ejecutado en paralelo, siendo el último hilo en ejecutarse el que nos da devuelto la respuesta a la invocación del servicio.

Operaciones en paralelo II

En ocasiones, es posible que se necesite combinar los diferentes resultados de operaciones ejecutadas en paralelo. Para ello, disponemos del operador Zip, que utiliza una una función que recibe los valores como resultado de mezclar dos reactive streams para generar un valor nuevo. Es importante tener en cuenta que al ser operaciones asíncronas no es posible predecir el orden con el que se combinan los resultados:

/**
 * Performs concurrent operations using different threads and merging the result in a single response. The
 * result is the combination of all the results of the executed threads.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateByIdMergingZip(final String transactionId, final String id) {
    Thing thing = this.findById(transactionId, id).blockingGet();
    Single<Thing> operationA = doCoolOperationsA(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationB = doCoolOperationsB(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationC = logById(transactionId, thing).subscribeOn(Schedulers.io());

    return operationA
            .zipWith(operationB, ThingsService::mergeThings)
            .zipWith(operationC, ThingsService::mergeThings);
}

/**
 * Merges two {@link Thing} instances in a new one
 *
 * @param thing1 {@link Thing} in a 'A' state
 * @param thing2 {@link Thing} in a 'B' state
 * @return {@link Thing} in a 'A+B' state
 */
private static Thing mergeThings(Thing thing1, Thing thing2) {
    return new Thing(thing1.getId(), thing1.getValue() + ", " + thing2.getValue());
}

Backpressure

La posibilidad de trabajar con eventos infinitos en nuestros streams reactivos da lugar a escenarios donde los Observables (emisores de eventos) generan eventos mucho más rápido de lo que pueden ser consumidos por nuestros Observers (consumidores). Esta circunstancia es llamada Backpressure y es un escenario a tener en cuenta a la hora de implementar nuestro API reactivo.Existen varios métodos para gestionar backpressure. Por ejemplo, existen estrategias conservadoras que implementan buffers donde los elementos emitidos se van acumulando hasta que un consumidor los procesa y estrategias más agresivas donde los elementos no consumidos a tiempo se descartan o se sobrescriben por los más recientes. La elección de la estrategia correcta dependerá de nuestro sistema.

Sumario

En éste artículo hemos visto los principios de la programación reactiva y los motivos por los que es interesante tenerla en cuenta a la hora de diseñar o mejorar nuestras arquitecturas. Se ha mostrado un ejemplo de API reactiva con Spring Boot y RxJava2, exponiendo varias situaciones más o menos generales y como afrontarlas en código y, por último, hemos hablado de una característica inherente al uso de streams, el backpressure, a tener muy en cuenta.

Comparte el artículo

Share on twitter
Twitter
Share on linkedin
LinkedIn
Share on email
Email
Share on whatsapp
WhatsApp

Una nueva generación de servicios tecnológicos y productos para nuestros clientes