Aunque los Streams de Java fueron diseñadas para ser utilizadas una sola vez, los programadores aún preguntan cómo reutilizar un Stream. Desde una simple búsqueda en la web, podemos encontrar muchas publicaciones con este mismo problema formuladas de diferentes maneras: “¿Hay alguna forma de utilizar un Strem en Java?“, “Copiar un Stream para evitar que el Stream haya sido ya operado o cerrado”, “Java 8 Stream IllegalException”, y otros.

Por lo tanto, si queremos utilizar un Stream<T> varías veces, entonces debemos:

  1. Rehacer el cálculo para generar la secuencia a partir de la fuente de datos, o
  2. Almacenar el resultado intermedio en una colección.

Sin embargo, ambas opciones tiene inconvenientes y ninguna de ellas es la más adecuada para todos los casos de uso. Usando el primer enfoque, ¿Qué sucede si los datos llegan a través de la red, o desde un archivo de base de datos, u otra fuente externa?. En este caso, debemos regresar a la fuente de datos nuevamente, que puede tener más sobrecarga que almacenar el resultado intermedio en una colección. Entonces, para múltiples recorridos en datos inmutables, tal vez la segunda solución sea más ventajosa. Sin embargo, si solo vamos a usar los datos una vez, entonces obtenemos un problema de eficiencia con este enfoque, ya que no debimos almacenar el origen de datos en la memoria.

Aquí, exploramos una tercera alternativa que memoriza elementos bajo demanda solo cuando se accede a ellos a través de un recorrido transversal. Analizamos las limitaciones y ventajas de cada enfoque en los datos resultantes de una solicitud HTTP, utilizaremos un Supplier<Stream<…>>.

Caso de Uso de Streams

Las operaciones de Stream son muy prácticas para realizar consultas sobre secuencias de elementos, y muchas veces los programadores buscan una forma de consumir un flujo una vez más. Además, en otros entornos de programación, como .NET y JavaScript, los programadores no se ven afectados por esta limitación (recorrido único), porque las operaciones de flujo no se proporcionan en el nivel del Iterador, si no en el nivel Iterable, que se puede atravesar múltiples veces. Por lo tanto, en .NET o JavaScript, podemos operar en los mismos datos con diferentes consultas sin violar ningún estado interno, ya que cada vez que se calcula una consulta, obtendrá un iterador nuevo de la fuente.

Para resaltar los inconvenientes de los enfoques alternativos de reutilización de flujos Java, presentamos un caso de uso basado en una solicitud HTTP. Para ello, utilizaremos la API en línea de World Weather para obtener una secuencia de elementos de información meteorológica en un formato de datos CSV. Particularmente, estamos interesados en una secuencia de temperaturas en grados Celsius en marzo de Lisboa, Portugal. Por lo tanto, debemos realizar una solicitud HTTP GET al URI http://api.worldweatheronline.com/premium/v1/past-weather.ashx con los parámetros de consulta q=37.107, -7.933 correspondientes a la ciudad de Lisboa, fecha=2018-03-01 y enddate=2018-03-01 que define el intervalo de fechas, tp=24 para obtener datos en períodos de 24 horas, format=csv para especificar el formato de los datos, y finalmente key= con el API Key.

Para realizar esta solicitud HTTP, utilizaremos una API no bloqueante, como AsyncHttpClient, y la respuesta será una secuencia de líneas en formato CSV (por ejemplo, Stream<String>). Dado que estamos utilizando una API asíncrona, no queremos esperar a que se complete la respuesta; por lo tanto, el resultado de la solicitud HTTP se envuelve en una promesa de esa respuesta. En Java, una promesa se representa mediante una instancia de CompletableFuture (por ejemplo, CompletableFuture<Stream<String>>) y se obtiene de la siguiente solicitud HTTP con AsyncHttpClient:

Ahora, para obtener una secuencia de temperaturas, debemos analizar el CSV de acuerdo con las siguientes reglas:

  1. Ignore las líneas que comienzan con #, que corresponden a los comentarios
  2. Salta la línea
  3. Filtra líneas alternativas
  4. Extraiga el tercer valor correspondiente a una temperatura en Celsius
  5. Convierta a un número entero

Para realizar estas transformaciones sin esperar la finalización de la respuesta, utilizaremos el método thenApply() de CompetableFuture que pasa una función que convertirá Stream<String> en un IntStream. Por lo tanto, cuando la csv resultante esté disponible (es decir, Stream<String>, continuará procesando la transformación sin bloquear:

Finalmente, podemos construir una clase de servicio Meteorológico que proporcione un método asíncrono auxiliar getTemperaturaAsync (doble latitud, doble registro, LocalDate desde, LocalDate hasta) para obtener una secuencia de temperaturas para una ubicación determinada y un intervalo de fechas:

Enfoque 1: Supplier<Stream<>>

Dado el método getTemperaturesAsync(), podemos obtener una secuencia de temperaturas en Lisboa en marzo como:

Ahora, cada vez que queremos realizar una consulta, podemos obtener la secuencia resultante de CompetableFuture. En el siguiente ejemplo, estamos obteniendo la temperatura máxima en marzo y contando cuántos días alcanzaron esta temperatura.

Sin embargo, la segunda consulta arrojará una excepción porque la transmisión de lisbonTempsInMarch.join() ya se ha operado. Para evitar esta excepción, debemos obtener un nuevo Stream que combine todas las operaciones intermedias con la fuente de datos. Esto significa que tenmos que hacer una solicitud HTTP y repetir todas las transformaciones sobre la respuesta HTTP. Para ello, utilizaremos un Supplier<CompletableFuture<Stream<T>>> que envuelve la solicitud y las transformaciones posteriores en un proveedor.

Y ahora, cada vez que queremos ejecutar una nueva consulta, podemos realizar una nueva solicitud HTTP a través del método get() del proveedor lisbonTempsInMarch, luego obtener el flujo resultante de la respuesta a través de join(), y finalmente invocar las operaciones del flujo deseado como:

Y ahora, cada vez que queremos la invocación consecutiva de get() y join(), podemos poner el método de llamada a join() dentro del proveedor como:

Y ahora podemos escribir:

Brevemente, de acuerdo con este enfoque, estamos creando una nueva cadena de flujo (con todas las transformaciones especificadas en getTemperaturesAsync()) cada vez que queremos consumir un stream. Esta idea se basa en un reclamo de la documentación de Java sobre las operaciones y las canalizaciones de Stream que establece:

Si necesitas atravesar la misma fuente de datos nuevamente, debe regresar a la fuente de datos para obtener un nuevo Stream.

Sin embargo, esta técnica obliga a la recreación de toda la canalización a la fuente de datos, lo que genera IO inevitable debido a la solicitud HTTP. Como los datos de la información meteorológica pasada son inmutables, esta solicitud HTTP es inútil porque siempre obtendremos la misma secuencia de temperatura.


Este artículo se encuentra basado en How to Reuse Java Streams .

Como se reutiliza los Streams de Java
Si te gusto, comparte ...Email this to someone
email
Share on Facebook
Facebook
Tweet about this on Twitter
Twitter
Share on LinkedIn
Linkedin
Share on Google+
Google+
Etiquetado en:        

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Facebook