Could you hire me? Contact me if you like what I’ve done in this article and think I can create value for your company with my skills.

June 16, 2011 / by Zsolt Soczó

Gondolatok a queue alapú kliens-szerviz kommunikációhoz

Az előző post kommentjei alapján (amit nagyon köszönök mindenkinek) nem kaptam sok bátorítást az aszinkron, queue alapú, kérés-választ különválasztó gazdag kliens – szerviz kommunikációhoz, úgy tűnik senki nem csinált ilyet, így nem akarok úttörő lenni a témában.
Ehhez még hozzájárul, hogy hétvégén méregettem az MSMQ teljesítményét. Azért ezt, mert a WCF is erre épít, és pl. az NServiceBus is.

A tesztkód 1.5kByteos üzeneteket rak át egyik sorból a másikba. Az ötlet innen jött, csak többszálasítottam.

A tesztkód:

using System;
using System.Diagnostics;
using System.Messaging;
using System.Threading;

namespace MsmqTran
{
class Program
{
private const int NumberOfTests = 1000;
private const int MaxDop = 10;
private static readonly ManualResetEvent[] WaitForEmpty = new ManualResetEvent[MaxDop];

static void Main()
{
var q1 = new MessageQueue(@”.\private$\test_queue1″);
var q2 = new MessageQueue(@”.\private$\test_queue2″);

Console.WriteLine(“Filling source queue…”);
var b = new byte[1500];
using (var msmqTx = new MessageQueueTransaction())
{
msmqTx.Begin();
for (int i = 0; i < NumberOfTests; i++) { q1.Send(b, msmqTx); } msmqTx.Commit(); } q2.Purge(); Console.WriteLine("Starting to move data from source queue to destination queue"); var sp = Stopwatch.StartNew(); for (int i = 0; i < MaxDop; i++) { WaitForEmpty[i] = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(o => ProcessMsg(q1, q2, (ManualResetEvent)o), WaitForEmpty[i]);
}

WaitHandle.WaitAll(WaitForEmpty);

Console.WriteLine(“Duration: {0}ms, throughput: {1:F0} messages/s”, sp.ElapsedMilliseconds, 1000.0 * NumberOfTests / sp.ElapsedMilliseconds);
}

private static void ProcessMsg(MessageQueue q1, MessageQueue q2, ManualResetEvent w)
{
while (true)
{
using (var msmqTx = new MessageQueueTransaction())
{
msmqTx.Begin();

Message message;
try
{
message = q1.Receive(TimeSpan.FromMilliseconds(0), msmqTx);
}
catch (MessageQueueException e)
{
Console.WriteLine(e);
w.Set();
break;
}

q2.Send(message, msmqTx);

msmqTx.Commit();
}
}
}
}
}

A gépemen 50 tran/sec-kel megy 1 szálon, és 200 fölé nem nagyon megy. Jó, ez laptop, de relációs adatbáziskezelővel (sql server és oracle is fut a gépen) több ezer tran/seccel mennek a dolgok. Szóval ez elég gázosan lassú. Emellett csúnya leállásokról is írnak a blogokban, amikor beáll az msmq.

Marad a aszinkronított WCF egyelőre, csak a szerverről visszafelé hívásokat tervezem queue alapon megcsinálni, WCF msmq bindinggal. Így tudom értesíteni az appokat polling nélkül. Erre 3 okom van most:
1. Az offline (disconnected) pessimistic lock feloldódott, lehet szerkeszteni valamit.
2. Frissíteni kell a kliens cache-ben valamit.
3. Email jellegű üzenetküldés az appok között.

Köszönöm még egyszer az építő javaslatokat.

Could you hire me? Contact me if you like what I’ve done in this article and think I can create value for your company with my skills.

LEAVE A COMMENT

13 COMMENTS

  • Tóth Viktor June 18, 2011

    Kicsit átheggesztettem a kódodat, mert úgy vettem észre, hogy az egyik szorulást az okozza, hogy a MessageQueue.Send() kicsit tovább tart, illetve nálad a Receive/Send/Receive/Send iterációk lineárisan vannak, emiatt a Receive-eknek meg kell várni a hosszabb Send-et, pedig a következő Receive már elindítható lenne, amig az előző Send tart.
    Emiatt lehet kicsit gyorsítani, hogy ha a Receive lefutott, akkor azonnal indítani kell a következőt aszinkron módon, és csak utána hívni a Send-et.
    Az alábbi kód nálam 2000 msg/s felett produkál (mondjuk nálam az eredeti kód is 300-400 körül futott, de monsjuk akkor is 5-6-szoros sebességnövekedés volt)
    Bocs a retek kódért (tuple meg ilyenek), de most nem az igényeskedés volt a lényeg ;)


    Console.WriteLine(“Starting to move data from source queue to destination queue”);

    var sp = Stopwatch.StartNew();

    using (var msmqTx = new MessageQueueTransaction())
    {
    var finishedEvent = new SemaphoreSlim(0, 4);

    // Nálam négy kérést betárazva már elérte a maximumot 2 magos gépen.
    q1.BeginReceive(TimeSpan.Zero, new Tuple(q1, q2, msmqTx, finishedEvent), ProcessReceive);
    q1.BeginReceive(TimeSpan.Zero, new Tuple(q1, q2, msmqTx, finishedEvent), ProcessReceive);
    q1.BeginReceive(TimeSpan.Zero, new Tuple(q1, q2, msmqTx, finishedEvent), ProcessReceive);
    q1.BeginReceive(TimeSpan.Zero, new Tuple(q1, q2, msmqTx, finishedEvent), ProcessReceive);

    finishedEvent.Wait();
    } // using msmqTx
    Console.WriteLine(“Duration: {0}ms, throughput: {1:F0} messages/s”, sp.ElapsedMilliseconds, 1000.0 * NumberOfTests / sp.ElapsedMilliseconds);
    } // Main()

    static void ProcessReceive(IAsyncResult ar)
    {
    var state = ar.AsyncState as Tuple;

    try
    {
    // átvesszük az üzenetet
    var message = state.Item1.EndReceive(ar);

    // Mielőtt blokkolnánk a Send-del, elkezdjük a következő olvasást
    state.Item1.BeginReceive(TimeSpan.Zero, state, ProcessReceive);

    // Most már blokkolhat a Send
    state.Item2.Send(message, state.Item3);
    }
    catch (MessageQueueException e)
    {
    Console.WriteLine(“Finished”);
    state.Item4.Release();
    } // catch()
    } // ProcessReceive()

  • Tóth Viktor June 18, 2011

    na jó, szétszedte a kacsacsőreimet. szóval a randa tuple így néz ki a fenti kódban:

    new Tuple<MessageQueue, MessageQueue, MessageQueueTransaction, SemaphoreSlim>(q1, q2, msmqTx, finishedEvent)

  • Tóth Viktor June 18, 2011

    ja, és a szemafort ki kell cserélni CountdownEvent-re.

  • Soczó Zsolt June 19, 2011

    Viktor: köszi a módosításokat, csak ezt nem látom, hogy jön össze a tranzakciós határorokkal a következő async receive?

    A másik kérdés, hogy a MaxDopot feljebb húzva pl. 20-ra nem megy fel az eredeti megoldás sebessége 2000-re?

  • Tóth Viktor June 19, 2011

    huhh, tényleg, a tranzakciókat elegánsan kihagytam. A gond az, hogy a BeginReceive-nek nincs is olyan változata, amelyik tud tranzakciókat. Ezért csináltam egy másik megoldást, de ez már közel sem olyan gyors:


    Console.WriteLine(“Starting to move data from source queue to destination queue”);

    var sp = Stopwatch.StartNew();

    var task = Task.Factory.StartNew(
    () => BridgeSegment(
    new Tuple(
    q1, q2)));
    task.Wait();

    Console.WriteLine(“Duration: {0}ms, throughput: {1:F0} messages/s”, sp.ElapsedMilliseconds, 1000.0 * NumberOfTests / sp.ElapsedMilliseconds);
    } // Main()

    static void BridgeSegment(Tuple state)
    {
    try
    {
    using (var msmqTx = new MessageQueueTransaction())
    {
    msmqTx.Begin();
    var message = state.Item1.Receive(TimeSpan.Zero, msmqTx);

    // meg van a csomag, kezdje el a következő iterációt még a send előtt
    Task.Factory.StartNew(() => BridgeSegment(state), TaskCreationOptions.AttachedToParent);

    state.Item2.Send(message, msmqTx);
    msmqTx.Commit();
    } // using msmqTx
    }
    catch (MessageQueueException e)
    {
    } // catch
    } // BridgeSegment()

    Símán használva olyan 5-600 msg/sec-cel megy. Kis trükköt használva, ha beállítjuk, hogy gyorsan tudjon a thread pool szálakat üzembehelyezni, akkor feltornázható kb 1000-1100 msg/sec-ig:
    (ez kell a program elejére:)

    ThreadPool.SetMinThreads(50, 1000);

    Ugye az a gond, hogy alapból kevés szállal indul a pool, és mind el kezd várni a Send-en (vagy a receive-n vagy a commiton) emiatt hiába van már elküldve a következő taszk, hogy az is vegye a csomagokat, csak várnak a taszkok a queue-ban (mivel a kevés meglévő szál vár a send-receive-commit-on). A te eredeti megoldásoddal is ez a helyzet, emiatt nem nő az eredmény a MaxDop emelésével, mivel a thread pool csak óvatosan emeli a szálak számát, ha nincs beállítva a SetMinThreads. Az eredeti megoldásod a SetMinThread-del használva felmegy 700-800-ig. Hogy miért nem megy fel ez is 1000 fölé, nem jöttem rá.

    Az mindenesetre szomorú, hogy szálak tucatjai nélkül milyen lassú, sok szálat indítva (vagy sok taszkot betárazva) meg valahogy nem olyan elegáns az egész. Van 20 szál, és mind a be van állva..

    Aztán meg arra is kíváncsi vagyok, hogy ha a sok szál közül az egyiken nem ér el a commit-ig, hanem rollback-el, akkor mennyire stabil a dolog a többi szálon :)

  • Tóth Viktor June 19, 2011

    a tuple-kon megint elfelejtettem lt-gt-zni. szóval így néznek most ki:
    Tuple<MessageQueue, MessageQueue>

  • ++i June 21, 2011

    Csak enge zavar az ilyen:

    for (…;…; i++ ) …

    Az’é bizonyos szint fölött már nem divat ez; vagy ez csak amolyan szísárpos lazaság?

    Egyáltalán, minek formázod a kódot, ha a fordító fomázás nélkül is megérti. Az ugye csak amolyan szépségápolás, jó a szemnek, meg könnyebben átlátja mindenki, mit akarsz.
    Akkor miért i++, ha egyszer ++i-t akartál írni?

    Késő is van, fáradt is vagyok; belédkötöttem.

    for (…;…; ++i) …

    vagy ezmég nincs meg az a bizonyos szint?

  • Soczó Zsolt June 21, 2011

    ++i:
    Tudom, hogy C++-ban a pre és postfix ++ más hatékonyságú, főleg iterátoroknál és egyebeknél, amik overloadolják ezeket az operátorokat.

    C#-ban tudomásom szerint nincs jelentősége.

    Más kérdés, hogy koncepcionálisan jobb a ++i, ebben igazad van.

    Azért igyál is valamit, meg aludj egyet. :)

  • hron84 June 22, 2011

    Soci, nem lehetne valami kodkezelot engedni a kommentekhez? Ez igy borzasztoan olvashatatlan, ha az ember tanulna is a beszelgetesbol… :s

  • ++i June 24, 2011

    Na jó, leírom, hogy miért rossz a for végiben az i++.
    Azé’, mert értelmetlen ha aszt mondom a józsinak, hogy ülök a fotelbe, látód, iszom a sörömeet.
    De nema fotelbe ülök, hanem a fotelben. Ugye van különbség! Ha már valaki profi módon műveli a programolást, és el is hiszi magáról, hogy profi (Soci tényleg nagyon profi, még mielőtt valaki azt hiszi, hogy mást gondolok), tehát a Soci ne a fotelbe igya a sörét, hanem a fotelben.
    Amikor azt írom, hogy i++, akkor olyan szándékot fejezek ki, hogy kell az i aktuális értéke, mielőtt megnövelem. Dehát a for végére odatett inkrementálás nem erről szól. Akkor miért írják oda nagyon sokan? Mert tudják, hogy a fordító értelmes állat, és nem fog olyan kódot generálni egy beépített típusnál, ami rontani fogja a teljesítményt.
    De ettől még, ha azt akarom mondani a fordítónak, hogy növelje meg az i értékét eggyel, akkor nem azt mondom neki, hogy a fotbe iszok, hanem a fotelben. (Igyen iszok, nem iszom, de ez egy másik történet.)

  • Tóth Viktor June 26, 2011

    ++i: nem értek egyet a következők miatt: A for ciklusban az i++/++i esetében nem azt fejezem ki, hogy milyen értéke kell az i-nek (növelés előtti vagy utáni). Az i++-szal azt fejezem ki, hogy mi történjen az i-vel (pl növekedjen meg) Nem érdekel a kifejezés értéke (használja valamire a runtime? nem!). A kifejezés mellékhatása kell! Ezt pedig tökéletesen kifejezi az i++. Miből jön a for ciklus C-ben? Ebből:

    int i = 0;
    while (i < 20)
    {

    i = i + 1; /* legyen i eggyel nagyobb */
    }

    Ebből lett ez:
    for (int i = 0; i < 20; i = i + 1)

    De mivel Dennis Ritchie meg a barátja szerette tömörséget, kitalálták ezt:

    i += 1; /* legyen i eggyel nagyobb */

    És mivel ez még mindig nem volt elég, jött ez:

    i++; /* legyen i eggyel nagyobb */

    De mivel a programozásban gyakori ez:

    i = 0;
    while (…)
    {
    f(array[i])
    i = i + 1;
    }

    Ha az i++ pontosan az i = i + 1 rövidítése lenne, akkor a tömörségre vágyó Ritchie nem tudta volna felírni ezt:

    i = 0;
    while (…)
    {
    f(array[i++]);
    }

    Ezért okosan kitalálták, hogy az i++ kifejezés értéke legyen az i eredeti értéke, és akkor majd milyen jól néz ki a fenti ciklus.

    Más esetekben, mint például ebben:

    (i = i + 1) < 20 ami amúgy
    (i += 1) < 20 ami amúgy lehetne
    i++ < 20, ha a fenti példa miatt nem trükköznének a kifejezés értékével.

    Szóval mivel vannak olyan helyzetek, ahol az i++ “eredeti értelmezése” kell, kitalálták, hogy akkor a ++i majd azt jelenti, mint az i += 1, az i++ meg mehet a tömbindexes helyzetekbe.

    Ezekben a helyzetekben tehát számit mi a kifejezés értéke. Más esetekben, ahol csak a mellékhatás érdekes, ott teljesen mindegy, hogy melyik formát használja az ember. És a for ciklus loop expression-je egy olyan hely, ahol csak a mellékhatás érdekes. Emiatt mindegy, hogy i++ vagy ++i. Szerintem.
    (amúgy az i++ levezetése a fentiekben csak spekuláció, nem biztos, hogy így történt)

  • Harasoft October 21, 2011

    Szia Soci, ezt nézted már?
    http://www.rabbitmq.com/dotnet.html