Skip to content

Commit

Permalink
Added experimental ES6 Promise
Browse files Browse the repository at this point in the history
  • Loading branch information
saturnism committed Dec 13, 2016
1 parent bcca678 commit a497f79
Showing 1 changed file with 187 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package org.jdeferred.es6;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Created by rayt on 12/12/16.
*/
public class Promise<D, F> implements Runnable {
public enum State {
PENDING,
FULFILLED,
REJECTED
}

private static final ExecutorService es = Executors.newCachedThreadPool();

private final BiConsumer<Consumer<D>, Consumer<F>> executor;
private final List<Consumer<D>> fullfillCallbacks = new LinkedList<>();
private final List<Consumer<F>> rejectCallbacks = new LinkedList<>();
private final AtomicReference<State> state = new AtomicReference<>(State.PENDING);
private D result;
private F reason;

public Promise(BiConsumer<Consumer<D>, Consumer<F>> executor) {
this.executor = executor;
es.submit(this);
}

@Override
public void run() {
executor.accept(d -> {
synchronized (this.state) {
if (this.state.compareAndSet(State.PENDING, State.FULFILLED)) {
this.result = d;
this.triggerFulfillCallbacks();
}
}
}, f -> {
synchronized (this.state) {
if (this.state.compareAndSet(State.PENDING, State.REJECTED)) {
this.reason = f;
this.triggerRejectCallbacks();
}
}
});
}

protected void triggerFulfillCallbacks() {
this.fullfillCallbacks.stream().forEach(f -> f.accept(this.result));
}

protected void triggerRejectCallbacks() {
this.rejectCallbacks.stream().forEach(f -> f.accept(this.reason));
}

public Promise<D, F> then(Consumer<D> onFullfilled) {
this.fullfillCallbacks.add(onFullfilled);
synchronized (this.state) {
if (this.state.get() == State.FULFILLED) {
this.triggerFulfillCallbacks();
}
}
return this;
}

public Promise<D, F> then(Consumer<D> onFulfilled, Consumer<F> onRejected) {
this.rejectCallbacks.add(onRejected);
synchronized (this.state) {
if (this.state.get() == State.REJECTED) {
this.triggerRejectCallbacks();
}
}
return this;
}

public Promise<D, F> catches(Consumer<F> onRejected) {
this.rejectCallbacks.add(onRejected);
synchronized (this.state) {
if (this.state.get() == State.REJECTED) {
this.triggerRejectCallbacks();
}
}
return this;
}

public static <D, F> Promise<Collection<D>, F> all(Promise<D, F>... promises) {
return all(Arrays.asList(promises));
}

public static <D, F> Promise<Collection<D>, F> all(Collection<Promise<D, F>> promises) {
return new Promise<Collection<D>, F>((resolve, reject) -> {
final AtomicBoolean rejected = new AtomicBoolean(false);
final AtomicInteger fullfilledCount = new AtomicInteger(0);
List<D> results = new ArrayList<>(promises.size());
promises.forEach(p -> {
p.then(r -> {
if (!rejected.get()) {
results.add(r);
if (promises.size() == fullfilledCount.incrementAndGet()) {
resolve.accept(results);
}
}
}).catches(e -> {
if (rejected.compareAndSet(false, true))
reject.accept(e);
});
});
});
}

public static <D, Exception> Promise<D, Exception> reject(Exception e) {
return new Promise<D, Exception>((resolve, reject) -> {
reject.accept(e);
});
}

public static <D, F> Promise<D, F> resolve(D value) {
return new Promise<D, F>((resolve, reject) -> {
resolve.accept(value);
});
}

public static <D, F> Promise<D, F> race(Promise<D, F>... promises) {
return race(Arrays.asList(promises));
}

public static <D, F> Promise<D, F> race(Collection<Promise<D, F>> promises) {
return new Promise<D, F>((resolve, reject) -> {
final AtomicReference<State> state = new AtomicReference<>(State.PENDING);
promises.forEach(p -> {
p.catches(e -> {
if (state.compareAndSet(State.PENDING, State.REJECTED))
reject.accept(e);
});
p.then(r -> {
if (state.compareAndSet(State.PENDING, State.FULFILLED))
resolve.accept(r);
});
});
});
}

public static void main(String[] args) {
Promise<String, Void> p = new Promise<>(
(resolve, reject) -> {
resolve.accept("Hello");

}
);

p.then(System.out::println);

Promise<Void, Exception> p2 = new Promise<>(
(resolve, reject) -> {
reject.accept(new Exception("ugh"));
}
);
p2.catches(e -> e.printStackTrace());


List<Promise<Integer, Void>> ps = IntStream.range(0, 10).mapToObj(i -> new Promise<Integer, Void>(
(resolve, reject) -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
resolve.accept(i);
}
)).collect(Collectors.toList());

Promise.reject(new Exception("oops")).catches(e -> e.printStackTrace());

Promise.all(ps).then(r -> r.forEach(System.out::println));

Promise.race(ps).then(System.out::println);
}
}

0 comments on commit a497f79

Please sign in to comment.