sandbox
Repo to store POC, experiments, tutorials.
- com.hugodesmarques.parallel - Experimento para o artigo "Paralelismo e Concorrência 102: Java parallel streams na prática".
No artigo anterior, "Paralelismo e Concorrência 101", exploramos os conceitos fundamentais sobre esses dois tópicos. Discutimos como esses conceitos permitem que programas realizem múltiplas tarefas simultaneamente, melhorando o desempenho e a eficiência.
Neste segundo artigo da série, vamos nos aprofundar no uso do parallel stream
em Java. Introduzido no Java 8, o parallel stream
é uma funcionalidade que facilita o processamento paralelo de coleções, aproveitando múltiplos núcleos da CPU para melhorar o desempenho de operações em grandes volumes de dados.
Vamos explorar como o parallel stream
funciona, suas vantagens e desvantagens, e como personalizar o pool de threads utilizado. Também discutiremos a técnica de "work-stealing" implementada pelo ForkJoinPool
e sua importância para o balanceamento de carga e eficiência.
Vamos lá que temos muito conteúdo pra cobrir!
O Java 8 introduziu streams como uma nova forma de iterar e realizar operações em coleções de forma declarativa. Os streams fornecem uma API rica para manipulação de dados, permitindo operações como filtro, mapeamento, redução e muito mais.
Os streams podem ser criados no modo sequencial ou no modo de execução paralela. Vejamos como criar e usar ambos os tipos de streams com exemplos.
import java.util.Arrays;
import java.util.List;
public class SequentialStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Criando um stream sequencial
numbers.stream()
.forEach(n -> System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n));
}
}
Neste exemplo, o método stream() cria um stream sequencial a partir da lista de números. A operação forEach
itera sobre cada elemento da lista e imprime o número juntamente com o nome da thread que está processando o elemento. Como é um stream sequencial, todos os elementos são processados pela mesma thread.
Se rodarmos o exemplo acima, nós veremos o seguinte resultado:
➜ sandbox java SequentialStreamExample
Thread: main - Número: 1
Thread: main - Número: 2
Thread: main - Número: 3
Thread: main - Número: 4
Thread: main - Número: 5
Thread: main - Número: 6
Thread: main - Número: 7
Thread: main - Número: 8
Thread: main - Número: 9
Thread: main - Número: 10
Agora vamos rodar um exemplo com parallel
. Para isso, basta criarmos o nosso stream com parallel stream
:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Criando um stream paralelo
numbers.parallelStream()
.forEach(n -> System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n));
}
}
Se executarmos esse exemplo múltiplas vezes, a ordem sempre vai ser algo diferente:
➜ sandbox java ParallelStreamExample
Thread: ForkJoinPool.commonPool-worker-2 - Número: 2
Thread: main - Número: 7
Thread: ForkJoinPool.commonPool-worker-6 - Número: 4
Thread: ForkJoinPool.commonPool-worker-3 - Número: 5
Thread: ForkJoinPool.commonPool-worker-4 - Número: 9
Thread: ForkJoinPool.commonPool-worker-1 - Número: 3
Thread: ForkJoinPool.commonPool-worker-9 - Número: 10
Thread: ForkJoinPool.commonPool-worker-8 - Número: 6
Thread: ForkJoinPool.commonPool-worker-7 - Número: 8
Thread: ForkJoinPool.commonPool-worker-5 - Número: 1
Existem outras formas de criar streams, tanto sequenciais quanto paralelos, mas eu deixo isso como dever de casa pra vocês 🤓.
Agora que vimos um exemplo simples com parallel
, como ele se relaciona com os conceitos que discutimos no artigo anterior de paralelismo e concorrência?
parallel stream
utiliza concorrência?
Quando você transforma um Stream em um parallel stream
, se possível, o Java divide a tarefa em várias subtarefas que podem ser executadas simultaneamente. Cada subtarefa é atribuída a uma thread separada, que pode ser executada em um núcleo diferente do processador. O gerenciamento dessas threads envolve conceitos de concorrência, como:
ForkJoinPool
é frequentemente usado para gerenciar threads em parallel stream
.ForkJoinPool
pega tasks da fila de threads ocupadas para serem executadas em threads que estão desocupadas.parallel stream
utiliza paralelismo?
O parallel stream
aproveita o paralelismo ao executar essas subtarefas simultaneamente em múltiplos núcleos. Isso pode resultar em uma execução mais rápida, especialmente para operações que são independentes e podem ser realizadas em paralelo sem interferência entre si.
Por padrão, o parallel stream
utiliza o ForkJoinPool.commonPool()
, que é um pool de threads compartilhado disponível para todas as tarefas de fork/join. Esse pool é configurado para usar um número de threads igual ao número de núcleos disponíveis no processador, o que permite que as tarefas sejam executadas em paralelo de maneira eficiente.
Se quisermos verificar quantas threads o nosso forkJoinPool
terá, basta fazermos print do número de cores disponíveis no Java. Por exemplo, se eu rodar a seguinte linha em qualquer programa Java no meu mac:
System.out.println("Number of cores available: "+ Runtime.getRuntime().availableProcessors());
Number of cores available: 12
Notem, que o número acima pode ser mais complicado um pouco, em especial, quando se envolve containers.
ForkJoinPool
padrão
🛑 Dica: NÃO faça isso!
O tamanho padrão do ForkJoinPool.commonPool()
pode ser modificado configurando a propriedade do sistema java.util.concurrent.ForkJoinPool.common.parallelism
. Isso pode ser feito ao iniciar a JVM com a opção -D, por exemplo:
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=8 MinhaAplicacao
Este comando configura o ForkJoinPool.commonPool()
para utilizar 8 threads.
Modificar o tamanho padrão do ForkJoinPool.commonPool()
pode afetar negativamente outras partes da aplicação ou bibliotecas que também utilizam o pool comum.
O commonPool
é um recurso compartilhado, e alterar seu comportamento pode introduzir problemas de desempenho e concorrência difíceis de diagnosticar. Em vez disso, é recomendável criar e utilizar um ForkJoinPool
personalizado para tarefas específicas que requerem paralelismo ajustado, garantindo que outras partes da aplicação permaneçam estáveis e previsíveis.
✅ Dica: Se necessário, faça isso!
Para modificar o pool de threads localmente você pode usar o método ForkJoinPool#submit
para submeter uma tarefa que executa o parallel stream
no contexto do pool personalizado.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class CustomForkJoinPoolExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
Note que quando eu fiz o print anterior, eu tinha 12 cores disponíveis, porém observe que ao executar o código acima, eu só terei 4 threads.
Thread: ForkJoinPool-1-worker-4 - Número: 6
Thread: ForkJoinPool-1-worker-4 - Número: 8
Thread: ForkJoinPool-1-worker-1 - Número: 7
Thread: ForkJoinPool-1-worker-2 - Número: 3
Thread: ForkJoinPool-1-worker-3 - Número: 9
Thread: ForkJoinPool-1-worker-3 - Número: 1
Thread: ForkJoinPool-1-worker-4 - Número: 10
Thread: ForkJoinPool-1-worker-1 - Número: 2
Thread: ForkJoinPool-1-worker-2 - Número: 5
Thread: ForkJoinPool-1-worker-3 - Número: 4
Também note que no primeiro exemplo, nós temos tarefas sendo executadas pela main
thread. Porém, no custom
nossas tarefas são executadas apenas pelos workers definidos no nosso pool.
Agora que sabemos sobre como criar parallel stream
e onde eles são executados (ForkJoinPool
), vamos discutir mais algumas coisas legais sobre essa funcionalidade.
Work-Stealing
no ForkJoinPool
Como mencionamos anteriormente, o ForkJoinPool
é o pool de threads padrão utilizado pelo parallel stream
. Esse pool implementa uma técnica chamada "work-stealing" (roubo de trabalho). Esta técnica é fundamental para garantir a eficiência e o balanceamento de carga entre as threads.
ForkJoinPool
, ela é dividida em sub-tarefas menores distribuídas entre as threads do pool. Cada thread mantém uma fila de tarefas.Este mecanismo de roubo de trabalho ajuda a manter todas as threads ocupadas e a balancear a carga de trabalho de maneira eficiente.
Vamos criar um ForkJoinPool personalizado com um número específico de threads e simular tarefas com diferentes tempos de execução. O objetivo é observar como as threads ociosas roubam trabalho das threads ainda ocupadas.
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class WorkStealingExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Configurando o ForkJoinPool para usar um número específico de threads
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
customThreadPool.submit(() ->
numbers.parallelStream()
.forEach(n -> {
try {
if (n % 2 == 0) {
// Simulando uma tarefa que leva tempo
Thread.sleep(2000);
} else {
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread: " + Thread.currentThread().getName() + " - Número: " + n);
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}
Note que se você executar o código acima, vai haver uma tendência das tarefas ímpares terminarem primeiro. Por exemplo, uma das minhas execuções foi:
Thread: ForkJoinPool-1-worker-2 - Número: 3
Thread: ForkJoinPool-1-worker-1 - Número: 7
Thread: ForkJoinPool-1-worker-3 - Número: 9
Thread: ForkJoinPool-1-worker-2 - Número: 5
Thread: ForkJoinPool-1-worker-4 - Número: 6
Thread: ForkJoinPool-1-worker-4 - Número: 1
Thread: ForkJoinPool-1-worker-3 - Número: 10
Thread: ForkJoinPool-1-worker-1 - Número: 2
Thread: ForkJoinPool-1-worker-2 - Número: 4
Thread: ForkJoinPool-1-worker-4 - Número: 8
E o que acontece se o forkJoinPool
não tivesse work-stealing
? Pois bem, as threads com tarefas menores terminariam suas tarefas primeiro e ficariam ociosas, enquanto as threads com tarefas maiores estariam ocupadas, gerando um gargalo desnecessário na aplicação.
Maravilha! Você deve estar pensando "Agora que eu sei tudo isso, eu sempre vou criar parallel streams
para aumentar a performance das minhas aplicações!"
Calma jovem padawan, como sempre, depende...
Sequential
e Parallel
Existe uma linha onde utilizar parallel não é interessante. O Java é excelente em otimizar o código da aplicação, então muitas vezes, um simples sequential stream() vai ter uma performance excelente! Vamos ver um exemplo com código:
// Imports removidos pra manter o código breve. Veja o repo para o código completo.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Fork(value = 2)
public class SeqParallelBenchmark {
@Param({"100", "1000000"})
private int size;
private List<Integer> data;
@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}
@Benchmark
public void test_sequential() {
data.stream().mapToInt(Integer::intValue).sum();
}
@Benchmark
public void test_parallel() {
data.stream().parallel().mapToInt(Integer::intValue).sum();
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(SeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}
Quando eu rodei o benchmark acima na minha máquina eu obtive o seguinte resultado:
Benchmark (size) Mode Cnt Score Error Units
SeqParallelBenchmark.test_sequential 100 avgt 10 ≈ 10⁻⁴ ms/op
SeqParallelBenchmark.test_parallel 100 avgt 10 0.022 ± 0.008 ms/op
SeqParallelBenchmark.test_sequential 1000000 avgt 10 0.482 ± 0.016 ms/op
SeqParallelBenchmark.test_parallel 1000000 avgt 10 0.117 ± 0.016 ms/op
Observe que com 100 elementos, o método sequential roda em 0.0001 ms enquanto que o parallel leva 0.022 ms, quase 200x mais lento. Porém, quando rodamos para milhões de elemento, o parallel se sai 4x mais rápido to que o resultado sequencial.
Eu rodei outro benchmark com uma operação diferente dessa vez:
// Imports removidos pra manter o código breve. Veja o repo para o código completo.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class AnotherSeqParallelBenchmark {
@Param({"100", "1000000"})
private int size;
private List<Integer> data;
@Setup
public void setup() {
data = IntStream.rangeClosed(1, size).boxed().collect(Collectors.toList());
}
@Benchmark
public List<Double> testSequentialStream() {
return data.stream()
.map(Math::sin)
.collect(Collectors.toList());
}
@Benchmark
public List<Double> testParallelStream() {
return data.parallelStream()
.map(Math::sin)
.collect(Collectors.toList());
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(AnotherSeqParallelBenchmark.class.getName()) // specify the benchmark class here
.forks(2)
.build();
new Runner(opt).run();
}
}
E o resultado foi bem similar:
Benchmark (size) Mode Cnt Score Error Units
AnotherSeqParallelBenchmark.testSequentialStream 100 avgt 10 0.001 ± 0.001 ms/op
AnotherSeqParallelBenchmark.testParallelStream 100 avgt 10 0.027 ± 0.005 ms/op
AnotherSeqParallelBenchmark.testParallelStream 1000000 avgt 10 2.234 ± 0.025 ms/op
AnotherSeqParallelBenchmark.testSequentialStream 1000000 avgt 10 16.164 ± 0.573 ms/op
Note como mais uma vez em cima uma de collection pequena o parallel teve uma performance pior que o sequential (~27x pior). Porém, ao se deparar com milhões de cálculo a performance foi ~8x mais rápida do que sequential.
Não utilize parallel
:
Parallel
adiciona um overhead, como vimos acima, com a utilização de uma thread pool, coordenação de tarefas, roubo de tarefas paradas, etc. Dado isso, se sua massa de dados for pequena, é provável que um sequential
execute mais rápido do que um parallel
. Na dúvida, execute testes na sua aplicação para observar se vale a pena.Utilize parallel
:
parallel stream
é o número de núcleos da máquina e sua operação é CPU bound, isso significa que você vai conseguir utilizar todos os núcleos em paralelo, aproveitando ao máximo os recursos disponíveis.Parallel streams
é um modo fácil de embarcar no mundo da programação paralela, já que a funcionalidade abstrai diversas preocupações. Ela não é aplicável para todos os casos, mas quando BEM aplicada pode ser uma mão na roda para aumentar a performance de suas aplicações.
Todos os exemplos deste post estão disponíveis no meu repositório:
Repo to store POC, experiments, tutorials.
No próximo post, vamos explorar mais o uso de threads tradicionais e thread pools com executors. Nos vemos por lá!