Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fiebr start another Fiber, get value from fiber by fiber.get() throw npe #313

Open
BigPotato opened this issue Aug 25, 2018 · 2 comments
Open

Comments

@BigPotato
Copy link

BigPotato commented Aug 25, 2018

java.lang.NullPointerException
	at co.paralleluniverse.strands.Strand.parkNanos(Strand.java:670) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
	at co.paralleluniverse.strands.ConditionSynchronizer.awaitNanos(ConditionSynchronizer.java:79) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
	at co.paralleluniverse.strands.dataflow.Val.get(Val.java:188) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
	at co.paralleluniverse.fibers.Fiber.get(Fiber.java:1396) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]

when I call fiber.get() or fiber.get(10, TimeUnit), it does not wait, throw a npe .

In my system, for each user request, there are lot of concurrent rpc, so I try to adopt Fiber to improve the throughput, but this problem confused me one week, I need some help, thanks in advance!

this is my code snippet, JMonitor.add("fiber.odd.timeout") is triggered, but JMonitor.add("fiber.odd.timeout") is not.

public class FiberHelper {

private static Logger logger = LogManager.getLogger(FiberHelper.class);

private static FiberScheduler fiberScheduler = new FiberForkJoinScheduler("fiber-scheduler", Runtime.getRuntime().availableProcessors() * 2,
        (Thread t, Throwable e) -> {
            logger.error("fiber exception! " + t.getName(), e);
        }, null, false);

@Suspendable
public static Fiber runFiber(String name, SuspendableRunnable runnable) {
    Fiber fiber = new Fiber<>(name, fiberScheduler, runnable).start();
    return fiber;
}

@Suspendable
public static <T> Fiber<T> runFiber(String name, SuspendableCallable<T> callable) {
    return new Fiber<T>(name, fiberScheduler, callable).start();
}

@Suspendable
public static <T> Fiber<T> runFiberCallable(String name, SuspendableCallable<T> callable, CountDownLatch latch) {
    return new Fiber<T>(name, fiberScheduler, () -> {
        T r = callable.run();
        logger.info(name + " -> " + r);
        latch.countDown();
        return r;
    }).start();
}

public static Builder newBuilder() {
    return new Builder();
}
public static class Builder {
    private Map<String, SuspendableCallable> callableMap = Maps.newHashMap();
    private CountDownLatch latch;
    private Map<String, Fiber> fibers = null;
    private Map<String, Optional> resultMap = null;
    private boolean isStarted = false;

    public Builder submitJob(String name, SuspendableCallable r) {
        if (isStarted) {
            throw new RuntimeException(" this builder is started, cannot submit more jobs!");
        }
        if (callableMap.containsKey(name)) {
            throw new RuntimeException(name  + " duplicated in job list!");
        }
        callableMap.put(name, r);
        return this;
    }

    @Suspendable
    public Builder start() {
        if (isStarted) {
            throw new RuntimeException(" this builder is started, cannot call start()");
        }
        if (callableMap.isEmpty()) {
            logger.info("no fiber to start");
            return this;
        }
        latch = new CountDownLatch(callableMap.size());
        resultMap = Maps.newConcurrentMap();
        fibers = callableMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e ->
            new Fiber(e.getKey(), fiberScheduler, new SuspendableCallable() {
                @Override
                @Suspendable
                public Object run() throws SuspendExecution, InterruptedException {
                    if (logger.isDebugEnabled()) {
                        logger.debug(Thread.currentThread().getName() + " ###fiber start:" + e.getKey());
                    }
                    Object r = e.getValue().run();
                    if (logger.isDebugEnabled()) {
                        logger.debug(Thread.currentThread().getName() + " ###fiber finish:" + e.getKey() + " -> " + r);
                    }
                    resultMap.put(e.getKey(), Optional.ofNullable(r));
                    logger.info(e.getKey() + " put value to resultMap");
                    latch.countDown();
                    return r;
                }
            }).start()
        ));
        return this;
    }

    @Suspendable
    public boolean awaitMs(long ts) {
        return await(ts, TimeUnit.MILLISECONDS);
    }

    @Suspendable
    public boolean await(long ts, TimeUnit unit) {
        boolean isDone = false;
        try {
            long ts1 = System.currentTimeMillis();
            isDone = latch.await(ts, unit);
            long ts2 = System.currentTimeMillis();
            if (logger.isDebugEnabled()) {
                String log = fibers.entrySet().stream().map(e -> e.getKey() + " isDone? " + e.getValue().isDone()).collect(Collectors.toList()).toString();
                logger.info(ts + " , fiber wait on latch isDone? " + isDone + ", cost " + (ts2 - ts1) + ", " + log);
            }
        } catch (InterruptedException e) {
            logger.error("fail when wait fibers!", e);
        }
        return isDone;
    }

    @Suspendable
    public Map<String, Fiber> getFibers() {
        if (logger.isDebugEnabled()) {
            logger.debug("fibers:" + fibers.keySet());
        }
        return fibers;
    }

    @Suspendable
    public Map<String, Fiber> getFibers(boolean onlyDone) {
        return fibers.entrySet()
                     .stream()
                     .filter(e -> e.getValue().isDone())
                     .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
    }

    public long getUndoneJobCount() {
        return latch.getCount();
    }

    private boolean switchOn = true;

    @Suspendable
    private <T> T getFiberResult(String name, T defaultValue, long timeoutMs) {
        if (fibers == null || !fibers.containsKey(name)) {
            Exception ex = new IllegalStateException(name  + " fiber not exist!");
            logger.error("get value exception!", ex);
            return defaultValue;
        }

        Optional<T> result = resultMap.getOrDefault(name, Optional.ofNullable(defaultValue));
        T r = result.isPresent() ? result.get() : defaultValue;

        if (r == null || r == defaultValue) {
            logger.error(name + " can not get value from resultMap!");
            Fiber<T> fiber = fibers.get(name);
            long ts1 = System.currentTimeMillis();
            try {
                r = fiber.get(timeoutMs > 0 ? timeoutMs : 10, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                JMonitor.add("fiber.odd.timeout");
                logger.error(name + " fiber get timeout!");
            } catch (Exception e) {
                JMonitor.add("fiber.odd.exception");
                //logger.error(name + " fiber get exception!", e);
            }
            long ts2 = System.currentTimeMillis();
            logger.error(name + " fiber get cost:"+ (ts2 - ts1));
        }

        return r;
    }

    @Suspendable
    public <T> T getFiberResultByName(String name, T defaultValue, long timeoutMs) {
        if (switchOn) {
            return getFiberResult(name, defaultValue, timeoutMs);
        }

        if (fibers == null || !fibers.containsKey(name)) {
            Exception ex = new IllegalStateException(name  + " fiber not exist!");
            logger.error("get value exception!", ex);
            return defaultValue;
        }
        Fiber<T> fiber = fibers.get(name);
        T r = defaultValue;

        if (timeoutMs <= 0) {
            if (!fiber.isDone()) {
                if (latch.getCount() == 0) {
                    logger.info(name + " odd!!! latch count is 0, but the future is not done!");
                    try {
                        r = fiber.get(10, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        logger.info(name + " odd!!! latch count is 0, but the future is not done! try another 3ms, still timeout!");
                        JMonitor.add("fiber.odd.timeout");
                    } catch (Exception ex) {
                        logger.info(name + " odd!!! latch count is 0, but the future is not done! try another 3ms, exception!", ex);
                        JMonitor.add("fiber.odd.exception");
                    }
                } else {
                    logger.error(name + " fiber is not done!");
                }
            } else {
                try {
                    r = fiber.get();
                } catch (Exception e) {
                    logger.error(name + " fail to get value from fiber!", e);
                }
            }
        } else {
            try {
                r = fiber.get(timeoutMs, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                logger.error(name + " fail to get value from fiber with timeout!", e);
            }
        }

        if (logger.isDebugEnabled()) {
            logger.debug(name + " get value -> " + r);
        }
        return r;
    }

    @Suspendable
    public <T> T getFiberResultByName(String name) {
        return getFiberResultByName(name, null, -1);
    }

    @Suspendable
    public <T> T getFiberResultByName(String name, long timeoutMs) {
        return getFiberResultByName(name, null, timeoutMs);
    }

    @Suspendable
    public <T> T getFiberResultByName(String name, T defaultValue) {
        return getFiberResultByName(name, defaultValue, -1);
    }
}

}

@BigPotato
Copy link
Author

besides, when i use co.paralleluniverse.strands.concurrent.CountDownLatch , and I call

isDone = latch.await(ts, unit);

java.lang.NullPointerException at co.paralleluniverse.strands.Strand.parkNanos(Strand.java:670) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1357) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.CountDownLatch.await(CountDownLatch.java:268) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at com.meituan.service.mobile.search.thread.FiberHelper$Builder.await(FiberHelper.java:113) ~[classes/:?] at com.meituan.service.mobile.search.thread.FiberHelper$Builder.awaitMs(FiberHelper.java:105) ~[classes/:?]

when i use java.util.concurrent.CountDownLatch, no this exception.

@kimffy24
Copy link

It must be an incorrect instrumentation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants