liuxiaoshui
发布于 2024-07-09 / 55 阅读
0
0

一次Hystrix触发服务降级分析

现象

openfeign调用失败,报错日志如下:

2024-07-09 11:11:15.492 [schedule-pool-37] ERROR c.h.h.a.e.EmployeeDataApiFallbackFactory - [getEmployeeDataByNo,50] - 根据员工卡号获取员工数据异常:{}
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@48a046c7[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@48f9ae15[Wrapped task = null]] rejected from java.util.concurrent.ThreadPoolExecutor@bf74a76[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 760562]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
	at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
	at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
	at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
	at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:50)
	at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10327)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.subscribe(Observable.java:10423)
	at rx.Observable.subscribe(Observable.java:10390)
	at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)
	at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:410)
	at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:378)
	at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:344)
	at org.springframework.cloud.netflix.hystrix.HystrixCircuitBreaker.run(HystrixCircuitBreaker.java:53)
	at org.springframework.cloud.openfeign.FeignCircuitBreakerInvocationHandler.invoke(FeignCircuitBreakerInvocationHandler.java:105)
	at com.sun.proxy.$Proxy185.getEmployeeDataByNo(Unknown Source)
	at com.hy.sche.base.impl.IBaseDataServiceImpl.selectEmployeeDataInfoByDriverCardNo(IBaseDataServiceImpl.java:245)
	at com.hy.sche.simulate.service.impl.SimulateScheServiceImpl.unplannedBusInfoAssemble(SimulateScheServiceImpl.java:818)
	at com.hy.sche.simulate.service.impl.SimulateScheServiceImpl.lambda$selectUnknownInfo$5(SimulateScheServiceImpl.java:641)
	at java.base/java.util.HashMap.forEach(HashMap.java:1337)
	at com.hy.sche.simulate.service.impl.SimulateScheServiceImpl.selectUnknownInfo(SimulateScheServiceImpl.java:639)
	at com.hy.sche.websocket.server.SimulateWebSocketSever.lambda$onOpen$0(SimulateWebSocketSever.java:116)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

分析过程

日志很明显,提交任务被拒绝,即发起feign调用,任务提交失败,故走了fallback逻辑。

1.Hystrix 有两种隔离策略:线程隔离和信号量隔离。默认是线程隔离

  • 信号量隔离:控制并发量,通过信号量限制最大并发请求数,不涉及线程池。

  • 线程隔离:通过线程池进行隔离,控制并发量和超时时间。

可以通过com.netflix.hystrix.HystrixCommandProperties里查看:

protected HystrixCommandProperties(HystrixCommandKey key, Setter builder, String propertyPrefix) {
...
        this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);

...
}


static {
        default_executionIsolationStrategy = HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;
...
}

2.默认的线程隔离参数

public abstract class HystrixThreadPoolProperties {

    /* defaults */
    static int default_coreSize = 10;            // core size of thread pool
    static int default_maximumSize = 10;         // maximum size of thread pool
    static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
    static int default_maxQueueSize = -1;        // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
                                                
...
}

在 Hystrix 的默认线程池隔离策略中,线程池的行为由以下几个参数控制:

coreSize:线程池的核心线程数,默认是 10。

maximumSize:线程池的最大线程数,默认是与 coreSize 相同。

maxQueueSize:线程池队列的最大大小,默认是 -1,这意味着队列大小无限。

queueSizeRejectionThreshold:队列拒绝阈值,默认是 5。

尽管 maxQueueSize 的默认值是 -1,但在实践中,Hystrix 将其设置为 0,这意味着默认情况下线程池的队列大小实际上为 0。当线程池中的线程都在处理任务时,如果新的任务到来,由于队列大小为 0,无法将新任务放入队列,这时就会触发拒绝策略,导致任务被拒绝并抛出 RejectedExecutionException

3.尽管 maxQueueSize 的默认值是 -1,但在实践中,Hystrix 将其设置为 0,这意味着默认情况下线程池的队列大小实际上为 0。从下面代码中给出解释,为什么是0?

默认线程池的初始化在:

static class HystrixThreadPoolDefault implements HystrixThreadPool {
    private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

    private final HystrixThreadPoolProperties properties;
    private final BlockingQueue<Runnable> queue;
    private final ThreadPoolExecutor threadPool;
    private final HystrixThreadPoolMetrics metrics;
    private final int queueSize;

    public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
        this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
        HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        this.queueSize = properties.maxQueueSize().get();

        this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                properties);
        this.threadPool = this.metrics.getThreadPool();
        this.queue = this.threadPool.getQueue();

        /* strategy: HystrixMetricsPublisherThreadPool */
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
    }

关注这行:

 this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        /*
         * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
         * <p>
         * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
         * <p>
         * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
         * and rejecting is the preferred solution.
         */
        if (maxQueueSize <= 0) {//默认情况下为-1
            return new SynchronousQueue<Runnable>();
        } else {
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }

默认情况下maxQueueSize为-1,在这段代码中,当 maxQueueSize 小于 0 时(默认值为 -1),Hystrix 使用 SynchronousQueue 作为队列,这个队列不存储元素,因此任何新任务到来时,如果线程池已满,任务会立即被拒绝。

总结

  • Hystrix 默认核心线程数为 10。

  • 当 10 个线程都在处理任务时,新任务会尝试进入队列。

  • 由于默认的 SynchronousQueue 容量为 0,任务会被立即拒绝。

  • 被拒绝的任务将触发降级逻辑(如 fallback 方法)。

这种设计确保了线程池不会因为排队任务过多而导致响应时间过长,有利于快速失败和服务的稳定性。


评论