Sterowanie zachowaniem handlerów¶
Na szczegóły zachowania handlerów można wpływać za pomocą atrybutów dodanych nad metodą handlera.
Ponawianie przetwarzania komunikatów¶
Zastosowanie¶
Jeśli przetworzenie komunikatu przez handler może się nie udać z przyczyn, które są chwilowe i po jakimś czasie same ustępują (np. deadlock z konkurencyjną transakcją w bazie danych) warto skonfigurować automatyczne ponawianie przetworzenia komunikatów. Ponowne przetworzenie odbywa się tylko wtedy, gdy poprzednie przetworzenie zakończy się błędem (czyli zgłoszeniem wyjątku). Pierwsze poprawne przetworzenie komunikatu powoduje, że kolejne próby przetworzenia nie będą już podejmowane.
Sposób użycia¶
IncrementalRetry(int retryLimit, int initialInterval, int intervalIncrement, params Type[] exceptions) - Umożliwia ponowne przetwarzanie komunikatów w przypadku błędu przetworzenia.
- retryLimit - Liczba ponowień (handler wykona się o 1 raz więcej).
- initialInterval - Czas w milisekundach po którym ma nastąpić pierwsze ponowienie.
- intervalIncrement - Czas w milisekundach o który rośnie przerwa przed każdym kolejnym ponowieniem.
- exceptions - Typy wyjątków, które mają powodować ponowne przetworzenie komunikatu. W przypadku braku ponowne przetworzenie komunikatu powodują wszystkie wyjątki.
[IncrementalRetry(3, 1000, 2000, typeof(InvalidOperationException))]
public HandlerResult TestCommandHandler(ConsumeContext<TESTS.TestCommand> context)
{
throw new InvalidOperationException();
return HandlerResult.Handled;
}
TestCommand powyższy handler uruchomi się 4 razy. Odstępy pomiędzy wykonaniami to kolejno 1 sekunda, 3 sekundy i 5 sekund.
W kodzie handlera mamy dostęp do informacji, czy jest to pierwsza czy kolejna próba przetworzenia. W tym celu należy wywołać metodę int context.GetRetryAttempt().
Zwraca numer ponownej próby. Jeśli zwróci 0 to jest to pierwszy raz, czyli nie było ponowienia. Jeśli >= 1 to jest to ponowna próba.
Limit współbieżności¶
Zastosowanie¶
Jeśli przetworzenie jednego komunikatu trwa długo, a zależy nam na tym, aby w krótkim czasie przetworzyć dużą liczbę komunikatów, to warto skonfigurować możliwość równoległego przetwarzania dla danego handlera. Należy jednak uważać, gdyż równolegle przetwarzanie powoduje, że nie da się zachować chronologii przetwarzania. Ponadto jeśli przetwarzanie powoduje zapis do bazy danych, to zrównoleglenie przetwarzania może nasilić powstawanie deadlocków w bazie danych lub dłuższe oczekiwanie na równoległy dostęp do tych samych tabel w bazie danych. Domyślnie przetwarzanie równoległe jest wyłączone, więc w danym momencie jeden handler przetwarza jednocześnie tylko jeden komunikat.
Sposób użycia¶
ConcurrencyLimit(int limit) - Umożliwia zwiększenie liczby równoległych wykonań handlera.
* limit - Maksymalna liczba równoległych wykonań handlera.
W przypadkach gdy:
-
limit wynosi 1 - komunkaty są przetwarzane w takiej kolejności, w jakiej zostały wysłane na szynę EDA (jest to wartość domyślna),
-
limit jest większy od 1 - oznacza to więcej niż jeden równoległy wątek przetwarzający komunikaty, co może spowodować, że w przypadku dużej ilości komunikatów oczekujących na przetworzenie, kolejność przetwarzania jest nieprzewidywalna.
Domyślnie dany handler nie wykona się dopóki nie skończy się wykonywanie poprzedniego wywołania, co odpowiada atrybutowi [ConcurrencyLimit(1)].
[ConcurrencyLimit(4)]
public HandlerResult TestCommandHandler(ConsumeContext<TESTS.TestCommand> context)
{
// ...
return HandlerResult.Handled;
}
Jeśli będziemy wysyłać komendy TestCommand odpowiednio szybko, to w powyższym przykładzie może być do czterech jednoczesnie uruchomionych handlerów TestCommandHandler.
Deduplikacja komunikatów¶
Istnieje możliwość skonfigurowania handlera w taki sposób, aby pomijał część lub całość kodu w przypadku gdy wysyłamy duża liczbę podobnych komunikatów. Aby to zrealizować należy po stworzeniu komunikatu, ale jeszcze przed jego wysłaniem wywołać metodę void SetDeduplicationIdentifier(string value) w argumencie podając tak zwany klucz deduplikacji. Klucz deduplikacji to dowolny ciąg znaków. Jeśli wiele komunikatów otrzyma ten sam klucz deduplikacji, to metoda handlera będzie mogła zignorować nadmiarowe komunikaty o tym samym kluczu. Jeśli komunikaty będą miały różne klucze deduplikacji, to handler wykona się co najmniej raz dla każdego komunikatu o zadanym kluczu. Następnie w metodzie handlera można skorzystać z metody bool ShouldIgnore(), która sprawdzi czy komunikat o tym samym kluczu deduplikacji był już wcześniej wykonywany w danym hendlerze i jeśli tak, to zwróci wartość true gdy komunikat został wysłany jeszcze przed rozpoczęniem jego ostatniego przetwarzania.
Przykładwym kluczem deduplikacji jest REF lub symbol zamówienia. Jeśli dane zamówienie podlega częstym zmianom i otrzymujemy wiele zdarzeń mówiących o tym, że dane zamówienie uległo zmianie, to możemy zdeduplikować te zdarzenia i obsłużyć dane zamówienie tylko raz.
Przykład wysyłania komunikatu z kluczem deduplikacji.
public void RiseCustomEvent()
{
var eventMessage = new CustomEvent();
eventMessage.SetDeduplicationIdentifier($"DeduplicationID"); // Ustawiamy klucz deduplikacji
EDA.RaiseEvent(eventMessage); // Wysyłamy zdarzenie
}
Przykład metody handlera ignorującej nadmiarowe komunikaty.
public HandlerResult DeduplicationCommandHandler(ConsumeContext<TestProject.DeduplicationCommand> context)
{
if (context.ShouldIgnore()) //Inaczej: consumeContext.SentTime < consumeContext.GetLastExecutionStartTime();
{
return HandlerResult.Ignored; // Wychodzimy z wartością Ignored jeśli komunikat jest nadmiarowy.
}
// ...
return HandlerResult.Handled;
}
Opóźnianie wykonań handlera¶
Aby dodatkowo zminimalizować ilość wywołań metody danego handlera można zdefiniować opóźnienie jego wywołania względem pojawienia się zdarzenia. Opcja ta jest przydatna w przypadku gdy mamy ustawioną deduplikacje komunikatów. Jeśli ustawimy, że handler wywoła się 10 sekund po przyjściu na szynę zdarzenia, to nawet jeśli w ciągu tych 10 sekund przyjdzie na szynę 100 zdarzeń z tym samym kluczem deduplikacji, handler wykona się tylko raz. Im dłuższe opóźnienie, tym rzadziej będzie wywoływany handler przy natłoku zdarzeń, które potencjalnie wyzwalają ten handler.
Wait(int time) - Wydłuża czas pomiędzy przyjściem zdarzenia na szynę, a uruchomieniem handlera.
* time - Czas w milisekundach.
[Wait(10000)]
public HandlerResult TestCommandHandler(ConsumeContext<TESTS.TestCommand> context)
{
// ...
return HandlerResult.Handled;
}
Powyższy handler wykona się najwyżej raz na 10 sekund.
Fizyczne i logiczne kolejki komunikatów¶
Fizyczne i logiczne kolejki komunikatów stosujemy wtedy, kiedy zależy nam na tym, aby komunikaty o różnyh typach były przetwarzane w odpowiedniej kolejności. Np jeśli chcemy najpierw zmienić status zamówienia (OrderStateChangeCommand), następnie zmodyfikować zamówienie (OrderChangeCommand) i ponownie zmienić jego status (OrderStateChangeCommand), to bez kolejek fizycznych nie jesteśmy w stanie zapewnić właściwej kolejności tych komunikatów. Dzięki zdefiniowaniu kolejki fizycznej, powyższe komunikaty wyślemy na jedną kolejkę fizyczną o określonej nazwie.
W ramach jednej kolejki fizycznej może funkcjonować wiele kolejek logicznych. Kolejki logiczne stosujemy wtedy, gdy chcemy rozróżnić paczki komunikatów dokonujące zmian w ramach konkretnych dokumentów (np konkretnego zamówienia, konkretnej faktury, itp). Kolejki logiczne charakteryzują się tym, że jeśli jeden komunikat w danej kolejce logicznej zostanie przetworzony niepoprawnie, to wszystkie kolejne w tej samej kolejce logicznej zostaną wstrzymane. Chodzi o to aby dla danej kolejki logicznej zachować spójność przetwarzania i albo wykonać poprawnie i chronologicznie wszystkie komunikaty, albo wstrzymać się na pierwszym błędzie. Odblokowanie kolejki logicznej odbywa się ręcznie z poziomu monitora EDA.
Kolejki fizyczne¶
Kolejka fizyczna - Kolejka pod którą dodatkowo (poza własną kolejką handlera) rejestrują sie przypisane do niej handlery i reagują na wysyłane na nią komunikaty. Kolejki te są tworzone w obrębie jednego obiektu biznesowego (adaptera), to znaczy, że jeśli dwa adaptery definiują kolejkę fizyczną o tej samej nazwie, to i tak są to różne kolejki fizyczne.
Założenia kolejek fizycznych:
-
1 typ komunikatu = 1 handler
-
przetwarzają komunikaty chronologicznie, aby zachować kolejność przetwarzania zgodną z kolejnością wysłania
Ważne
Przy złamaniu tego założenia np. gdy dla 1 komunikatu jest zarejestrowanych wiele handlerów, pojedynczy komunikat zostanie przetworzony przez wszystkie handlery reagujące na dany typ komunikatu!
Handler przypisujemy do kolejki fizycznej za pomocą atrybutu PhysicalQueueName w argumencie podając nazwę kolejki fizycznej. Taki atrybut jest jednocześnie definicją takiej kolejki. Każdy handler może należeć do więcej niż jednej kolejki fizycznej.
Przykład
Przykład zakładania kolejek nazwanych
[PhysicalQueueName("QueueName1")]
public HandlerResult QueueCommand1Handler(ConsumeContext<TestProject.QueueCommand1> context)
{
return HandlerResult.Handled;
}
[PhysicalQueueName("PhysicalQueue1")]
[PhysicalQueueName("PhysicalQueue2")]
public HandlerResult QueueCommand2Handler(ConsumeContext<TestProject.QueueCommand2> context)
{
return HandlerResult.Handled;
}
Wysyłanie komendy na kolejkę fizyczną odbywa się podając nastepującą ścieżkę w metodzie SendCommand : [Projekt].[Obiekt].[Nazwa kolejki fizycznej]
Przykład
public void SendCommand()
{
var command = new TestProject.QueueCommand1 { id = 1, Text = "komenda testowa" };
EDA.SendCommand("TestProject.ADAPTER.PhysicalQueue1", command);
}
Kolejki logiczne¶
Kolejka logiczna - to sposób na połączenie komunikatów w logiczne grupy w ramach jednej kolejki fizycznej. W ramach jednej kolejki logicznej błąd przetwarzania na jednym z komunikatów powoduje automatyczne wstrzymanie przetwarzania kolejnych komunikatów w ramach tej samej kolejki logicznej. Komunikaty przypisane do innych kolejek logicznych nie są wstrzymywane.
Kolejki logiczne NIE są współdzielone między kolejkami fizycznymi, a tym samym nie są współdzielone między obiektami. Logiczne kolejki mogą przetwarzać wiele różnych komunikatów (o ile tylko handlery są odpowiednio oznaczone tym samym atrybutem)
Przypisanie komendy do kolejki logicznej odbywa się poprzez wykorzystanie metody SetLogicalQueueIdentifier i podanie nazwy kolejki logicznej.
Przykład
public void SendCommand()
{
var command = new TestProject.QueueCommand1 { id = 1, Text = "komenda testowa" };
command.SetLogicalQueueIdentifier("LogicalQueue1");
EDA.SendCommand("TestProject.ADAPTER.PhysicalQueue1", command);
}
Ważne!
Aby wysłać więcej komend na daną kolejkę logiczną należy do każdej z nich przypisać kolejkę logiczną za pomocą wyżej wymienionej metody!
Blokowanie kolejek logicznych¶
Kolejki logiczne posiadają mechanizm blokowania, tzn. że w razie błędu ze strony handlera, wykonywanie dalszych komend na danej kolejce logicznej zostaje wstrzymane, a wszystkie następne komendy dostaną znacznik Wstrzymany przez kolejkę logiczną, który będzie widoczny w monitorze EDA.
Odblokowanie logicznej kolejki jest możliwe w monitorze EDA poprzez ponowienie komunikatu lub dodanie do następnej wiadomości SetResumeProcessingQueue. Następna komenda wykonana z błędem ponownie zablokuje kolejkę logiczną.
Przykład
public void SendReleaseCommand()
{
var command = new TestProject.QueueCommand1 { Text = "Test", id = 1 };
cmd.SetLogicalQueueIdentifier("LogicalQueue1");
cmd.SetResumeProcessingQueue();
EDA.SendCommand("TestProject.ADAPTER1.QueueName1", command);
}