现象
openfeign调用失败,报错日志如下:
2024-07-09 11:11:15.492 [schedule-pool-37] ERROR c.h.h.a.e.EmployeeDataApiFallbackFactory - [getEmployeeDataByNo,50] - 根据员工卡号获取员工数据异常:{}
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@48a046c7[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@48f9ae15[Wrapped task = null]] rejected from java.util.concurrent.ThreadPoolExecutor@bf74a76[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 760562]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:50)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.subscribe(Observable.java:10423)
at rx.Observable.subscribe(Observable.java:10390)
at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)
at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:410)
at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:378)
at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:344)
at org.springframework.cloud.netflix.hystrix.HystrixCircuitBreaker.run(HystrixCircuitBreaker.java:53)
at org.springframework.cloud.openfeign.FeignCircuitBreakerInvocationHandler.invoke(FeignCircuitBreakerInvocationHandler.java:105)
at com.sun.proxy.$Proxy185.getEmployeeDataByNo(Unknown Source)
at com.hy.sche.base.impl.IBaseDataServiceImpl.selectEmployeeDataInfoByDriverCardNo(IBaseDataServiceImpl.java:245)
at com.hy.sche.simulate.service.impl.SimulateScheServiceImpl.unplannedBusInfoAssemble(SimulateScheServiceImpl.java:818)
at com.hy.sche.simulate.service.impl.SimulateScheServiceImpl.lambda$selectUnknownInfo$5(SimulateScheServiceImpl.java:641)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at com.hy.sche.simulate.service.impl.SimulateScheServiceImpl.selectUnknownInfo(SimulateScheServiceImpl.java:639)
at com.hy.sche.websocket.server.SimulateWebSocketSever.lambda$onOpen$0(SimulateWebSocketSever.java:116)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
分析过程
日志很明显,提交任务被拒绝,即发起feign调用,任务提交失败,故走了fallback逻辑。
1.Hystrix 有两种隔离策略:线程隔离和信号量隔离。默认是线程隔离
信号量隔离:控制并发量,通过信号量限制最大并发请求数,不涉及线程池。
线程隔离:通过线程池进行隔离,控制并发量和超时时间。
可以通过com.netflix.hystrix.HystrixCommandProperties里查看:
protected HystrixCommandProperties(HystrixCommandKey key, Setter builder, String propertyPrefix) {
...
this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);
...
}
static {
default_executionIsolationStrategy = HystrixCommandProperties.ExecutionIsolationStrategy.THREAD;
...
}
2.默认的线程隔离参数
public abstract class HystrixThreadPoolProperties {
/* defaults */
static int default_coreSize = 10; // core size of thread pool
static int default_maximumSize = 10; // maximum size of thread pool
static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
...
}
在 Hystrix 的默认线程池隔离策略中,线程池的行为由以下几个参数控制:
coreSize:线程池的核心线程数,默认是 10。
maximumSize:线程池的最大线程数,默认是与 coreSize 相同。
maxQueueSize:线程池队列的最大大小,默认是 -1,这意味着队列大小无限。
queueSizeRejectionThreshold:队列拒绝阈值,默认是 5。
尽管 maxQueueSize
的默认值是 -1,但在实践中,Hystrix 将其设置为 0,这意味着默认情况下线程池的队列大小实际上为 0。当线程池中的线程都在处理任务时,如果新的任务到来,由于队列大小为 0,无法将新任务放入队列,这时就会触发拒绝策略,导致任务被拒绝并抛出 RejectedExecutionException
。
3.尽管 maxQueueSize 的默认值是 -1,但在实践中,Hystrix 将其设置为 0,这意味着默认情况下线程池的队列大小实际上为 0。从下面代码中给出解释,为什么是0?
默认线程池的初始化在:
static class HystrixThreadPoolDefault implements HystrixThreadPool {
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);
private final HystrixThreadPoolProperties properties;
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
关注这行:
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
/*
* We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
* <p>
* SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
* <p>
* Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
* and rejecting is the preferred solution.
*/
if (maxQueueSize <= 0) {//默认情况下为-1
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}
默认情况下maxQueueSize为-1,在这段代码中,当 maxQueueSize
小于 0 时(默认值为 -1),Hystrix 使用 SynchronousQueue
作为队列,这个队列不存储元素,因此任何新任务到来时,如果线程池已满,任务会立即被拒绝。
总结
Hystrix 默认核心线程数为 10。
当 10 个线程都在处理任务时,新任务会尝试进入队列。
由于默认的
SynchronousQueue
容量为 0,任务会被立即拒绝。被拒绝的任务将触发降级逻辑(如 fallback 方法)。
这种设计确保了线程池不会因为排队任务过多而导致响应时间过长,有利于快速失败和服务的稳定性。