Шаблон Bulkhead: семафор vs threadPool


Почему две разные реализации?

В этой статье я буду ссылаться на Spring и Resilence4j.

Если вы читаете эту статью, вы, вероятно, уже знаете о шаблоне bulkhead, о том, какую проблему он призван решить, и о наиболее распространенных реализациях: семафоре и threadPool.

По крайней мере, для меня было непросто понять, когда использовать реализацию семафора, а когда — threadPool.

Я знаю, как работает семафор, и я также понимаю паттерн ThreadPool, поэтому короткий и наиболее очевидный ответ мог бы быть таким: используйте threadPool для ограничения количества асинхронных вызовов и используйте семафор для ограничения синхронных.

Почему же возникли трудности? Причина заключалась в следующих вопросах:

  • Почему бы просто не объединить @Async с реализацией на основе семафора?

  • Можно ли использовать обе аннотации вместе? Если да, то зачем нужна реализация threadPool?

@Async и @Bulkhead объединены.

@Bulkhead(name = "Service3", fallbackMethod = "futureFallback")
    @Async
    public CompletableFuture<String> doSomeWork() {
        System.out.println("Excecuting service 3 - " + Thread.currentThread().getName());   
        Util.mockExternalServiceHttpCall(DELAY);
        return CompletableFuture.completedFuture("ok");
    }

Вход в полноэкранный режим Выход из полноэкранного режима

Полный код.

Да, вы можете использовать обе аннотации вместе. Они позволяют вам генерировать ограниченное количество асинхронных вызовов на основе конфигурации семафора bulkhead.

Однако есть некоторые последствия, которые полезно знать.

SimpleAsyncTaskExecutor.

Когда вы аннотируете метод с помощью аннотации @Async, Spring может использовать различные реализации интерфейса TaskExecutor.
По умолчанию фреймворк использует SimpleAsyncTaslExecutor.

Эта реализация будет создавать новый поток каждый раз при вызове аннотированного метода; этот поток не будет использоваться повторно.

Проблема этого подхода заключается в том, что вы будете создавать новый поток, даже если счетчик семафоров равен 0 (переборка заполнена).

Как видно из приведенной ниже трассировки стека, фреймворк сначала создает поток, а затем вызывает шаблон bulkhead, который определяет, есть ли доступные разрешения для продолжения выполнения. Если счетчик семафоров равен нулю, то переборка отклонит выполнение метода.

Thread [_simpleAsyncTask1] (Suspended (breakpoint at line 18 in Service3))  
    Service3.doSomeWork() line: 18  
    Service3$$FastClassBySpringCGLIB$$6085f5a4.invoke(int, Object, Object[]) line: not available    
    MethodProxy.invoke(Object, Object[]) line: 218  
    CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 793 
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 163 
    CglibAopProxy$CglibMethodInvocation.proceed() line: 763 
    MethodInvocationProceedingJoinPoint.proceed() line: 89  
    BulkheadAspect.lambda$handleJoinPointCompletableFuture$0(ProceedingJoinPoint) line: 225 
    1457434357.get() line: not available    
    Bulkhead.lambda$decorateCompletionStage$1(Bulkhead, Supplier) line: 100 
    1251257755.get() line: not available    
    SemaphoreBulkhead(Bulkhead).executeCompletionStage(Supplier<CompletionStage<T>>) line: 557  
    BulkheadAspect.handleJoinPointCompletableFuture(ProceedingJoinPoint, Bulkhead) line: 223    
    BulkheadAspect.proceed(ProceedingJoinPoint, String, Bulkhead, Class<?>) line: 162   
    BulkheadAspect.lambda$bulkheadAroundAdvice$5eb13a26$1(ProceedingJoinPoint, String, Bulkhead, Class) line: 129   
    1746723773.apply() line: not available  
    1746723773(CheckedFunction0<R>).lambda$andThen$ca02ab3$1(CheckedFunction1) line: 265    
    1151489454.apply() line: not available  
    BulkheadAspect.executeFallBack(ProceedingJoinPoint, String, Method, CheckedFunction0<Object>) line: 139 

==> here
 BulkheadAspect.bulkheadAroundAdvice(ProceedingJoinPoint, Bulkhead) line: 128   
    NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]  
    NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62  
    DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43  
    Method.invoke(Object, Object...) line: 566  
    AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethodWithGivenArgs(Object[]) line: 634  
    AspectJAroundAdvice(AbstractAspectJAdvice).invokeAdviceMethod(JoinPoint, JoinPointMatch, Object, Throwable) line: 624   
    AspectJAroundAdvice.invoke(MethodInvocation) line: 72   
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 175 
    CglibAopProxy$CglibMethodInvocation.proceed() line: 763 
    ExposeInvocationInterceptor.invoke(MethodInvocation) line: 97   
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 186 
    CglibAopProxy$CglibMethodInvocation.proceed() line: 763 
    AnnotationAsyncExecutionInterceptor(AsyncExecutionInterceptor).lambda$invoke$0(MethodInvocation, Method) line: 115  
    1466446116.call() line: not available   
    AsyncExecutionAspectSupport.lambda$doSubmit$3(Callable) line: 278   
    409592088.get() line: not available 
    CompletableFuture$AsyncSupply<T>.run() line: 1700   

    ==> here
SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run() line: 286   
    Thread.run() line: 829  

Вход в полноэкранный режим Выход из полноэкранного режима

После выполнения нагрузочного теста, вызывающего в течение 60 секунд аннотированный метод @Aync и @Bulkhead, на картинке инструмента профилирования видно, что приложение использовало только 34 из 514 созданных потоков. Это, очевидно, представляет собой нерациональное использование ресурсов.

ThreadPoolTaskExecutor

Другим вариантом может быть использование реализации TreadPoolTaskExecutor.
После выполнения того же теста с использованием этой реализации количество созданных потоков значительно уменьшилось (41).

Однако проблема этого подхода заключается в том, что мы используем ненужную избыточность. Использование пула потоков и семафора вместе, на мой взгляд, не имеет реальных преимуществ.

Заключение.

Для ограничения асинхронных вызовов используйте реализацию Bulkhead threadPool вместо комбинации @Async и Bulkhead semaphore.

Оцените статью
devanswers.ru
Добавить комментарий