Mark Alan Richards

CompletableFuture does it block

computer-science java

What happens with this (full code below)?

  @Test
public void obviouslySecondFirst() {
allOf(
supplyAsync(() -> first).thenAccept(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenAccept(concurrentLinkedQueue::add)
).join();
assertThat(copyOf(concurrentLinkedQueue), equalTo(of(second, first)));
}

This will randomly swap between returning ["second","first"] and ["first","second"] and therefore, randomly block second

Repeat it...

  @Test
public void obviouslySecondFirstWithWaitBeforeCall() {
final CompletableFuture<String> suppliedFirst = supplyAsyncFirst();
delay(delay);
allOf(
suppliedFirst.thenAccept(addDelayed(concurrentLinkedQueue, delay)),
supplyAsyncSecond().thenAccept(concurrentLinkedQueue::add)
).join();
assertThat(copyOf(concurrentLinkedQueue), equalTo(of(second, first)));
}

This is a bad design!

If the methods were split between two classes (AsyncCompletableFuture and SyncCompletableFuture) then I might forgive this as I could easily code review the differences, but they're all thrown in the same one.

To make matters worse, some methods don't explicitely have an async option.

So there's a method exceptionally(), but no exceptionallyAsync(), will that block when you do supplyAsync(()->x).exceptionally(t->blockingLogging(t))?

Confusing chaining

  @Test
public void secondsecondShouldBeFirstFirst() {
allOf(
supplyAsync(() -> first).thenApply(addDelayed(concurrentLinkedQueue, delay * 2)).thenApply(addDelayed(concurrentLinkedQueue, delay * 2)),
supplyAsync(() -> second).thenApply(addDelayed(concurrentLinkedQueue, delay)).thenApply(addDelayed(concurrentLinkedQueue, delay * 2))
).join();
assertThat(copyOf(concurrentLinkedQueue), equalTo(of(second, second, first, first)));
}

This will randomly block and randomly not: sometimes returning [first,first,second,second] and sometimes [second,first,second,first]

The code ...

package com.example;

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.eclipse.collections.api.factory.Lists.immutable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.collections.api.list.ImmutableList;
import org.junit.jupiter.api.Test;

public class SupplyItAsyncMaybeTest {
private void delay(int seconds) {
try {
SECONDS.sleep(seconds);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}

static <T> ImmutableList<? extends T> ofAll(Iterable<? extends T> ts) {
return immutable.ofAll(ts);
}

static <T> ImmutableList<T> of(T... ts) {
return immutable.of(ts);
}

final String first = "first", second = "second";
final int delay = 2;
final ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();

private Supplier<String> supplyFirstAfterDelay(int seconds, final String initalValue) {
return () -> {
delay(seconds);
return initalValue;
};
}

private Function<String, String> addDelayed(
final ConcurrentLinkedQueue<String> concurrentLinkedQueue, final int seconds) {
return (e) -> {
delay(seconds);
concurrentLinkedQueue.add(e);
return e;
};
}

@Test
public void secondShouldBeFirst() {
allOf(
supplyAsync(() -> first).thenApply(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenApply(concurrentLinkedQueue::add))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, first)));
}

@Test
public void secondsecondShouldBeFirstFirst() {
allOf(
supplyAsync(() -> first)
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2)),
supplyAsync(() -> second)
.thenApply(addDelayed(concurrentLinkedQueue, delay))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2)))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, second, first, first)));
}

@Test
public void secondsecondShouldBeFirstFirstAlways() {
CompletableFuture<String> stringCompletableFuture = supplyAsync(() -> first);
delay(delay);
allOf(
stringCompletableFuture
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2)),
supplyAsync(() -> second)
.thenApply(addDelayed(concurrentLinkedQueue, delay))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2)))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, second, first, first)));
}

@Test
public void secondsecondShouldBeFirstFirstDelayedFutureSupplier() {
allOf(
supplyAsync(supplyFirstAfterDelay(delay, first))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2)),
supplyAsync(supplyFirstAfterDelay(delay, second))
.thenApply(addDelayed(concurrentLinkedQueue, delay))
.thenApply(addDelayed(concurrentLinkedQueue, delay * 2)))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, second, first, first)));
}

@Test
public void secondIsNeverFirst() {
final CompletableFuture<String> suppliedFirst = supplyAsync(() -> first);
delay(delay);
allOf(
suppliedFirst.thenApply(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenAccept(concurrentLinkedQueue::add))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, first)));
}

@Test
public void secondIsNeverFirstWhenDelayIsLonger() {
final CompletableFuture<String> suppliedFirst =
supplyAsync(supplyFirstAfterDelay(delay, first));
delay(delay * 2);
allOf(
suppliedFirst.thenApply(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenAccept(concurrentLinkedQueue::add))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, first)));
}

@Test
public void asyncSimple() {
allOf(
supplyAsync(() -> first).thenApply(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenAcceptAsync(concurrentLinkedQueue::add))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, first)));
}

@Test
public void asyncWithADelay() {
final CompletableFuture<String> suppliedFirst = supplyAsync(() -> first);
delay(delay * 2);
allOf(
suppliedFirst.thenApply(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenAcceptAsync(concurrentLinkedQueue::add))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, first)));
}

@Test
public void asyncWithMultipleDelays() {
CompletableFuture<String> stringCompletableFuture =
supplyAsync(supplyFirstAfterDelay(delay, first));
delay(delay * 2);
allOf(
stringCompletableFuture.thenApply(addDelayed(concurrentLinkedQueue, delay)),
supplyAsync(() -> second).thenApply(concurrentLinkedQueue::add))
.join();
assertThat(ofAll(concurrentLinkedQueue), equalTo(of(second, first)));
}
}