FairMQ利用メモ(日本語)

これは FairMQ を利用したアプリケーション開発のための非公式ユーザーガイド(日本語)です。

1. Introduction

FairMQ はC++言語で書かれたMessage Queingライブラリ及びフレームワークです。 原子核実験におけるモンテカルロシミュレーション、データ解析、データ収集のデータワークフローを分散環境で実行することを想定して開発されています。 大規模データワークフローを扱うために以下の特徴があります。

  • 異なるdata transport技術を用いた 非同期メッセージパッシングの抽象化

    FairMQのコアとなる部分は非同期メッセージパッシングライブラリの ZeroMQ に強い影響を受けています。 zeromqを短い言葉で表現するなら、"高機能なsocket"と言えます。 FIFO (queue)によるデータのバッファリングや転送・受信の非同期処理、複数の接続相手との通信などを簡単に実装できます。

    非同期メッセージパッシング

    ここで"非同期的にメッセージがやりとりされる"というのは、送信(受信)関数を呼び出してから実際の送信(受信)処理が完了するまでの間、呼び出し側はブロックされないことを意味します。 例えば、send関数を呼び出してsend queueにメッセージを詰めたらすぐにsend関数が返り、実際の転送処理は別スレッドで実行されます。

    zeromq ではPUSH/PULL, PUB/SUBといったスケーラビリティのある接続パターンが使用可能です。 zeromqではスレッド間通信、(同一ノード上の)プロセス間通信、ノード間通信が可能ですが、ノード間の接続にはEthernetを使用しておりInfinibandには対応していません。 zeromqでサポートされていない部分をカバーしたり、より効率的な通信手段をサポートするために、FairMQではzeromq以外にも shmem, nanomsg, ofi を使った通信も利用可能になっています。 これらのライブラリを使った非同期メッセージパッシングを同じAPI(関数呼び出しインターフェイス)で実現するために、C++のクラス継承やテンプレートを使ってAPIの抽象化がされています。 APIから呼ばれて通信を行う各ライブラリ zeromq, shmem (shared memory), nanomsg, ofi (open fabric interfaces)のことを transport と呼んでいます。

  • 効率的なdata transport (zero-copy, high throughput)

  • データフォーマットに依存しない(agnostic)通信のためのツール

    FairMQでは特別なデータフォーマットを使用していません。 各種のtransportを使って任意のデータを送受信可能です。 (ただしtransportが使うヘッダーやフッターが追加される場合があるため、異なるtransport同士を直接接続することはできません)

  • より高レベルなデータ処理ワークフローのための 構成要素となるdevice

    FairMQではデータ送受信の非同期化・並列化に加えて、アクターモデル(Actor model)による並列データ処理をするためのフレームワークを提供しています。 アクターモデルでは、アクターと呼ばれるオブジェクト同士が並列でメッセージを送受信しあって処理を行います。 FairMQではアクターのことを device と呼んでいます。 Deviceはステートマシンを持ち、ユーザータスクの実行をステートマシンの状態遷移に載せて、パラメータ初期化→ユーザータスクの処理→終了処理といった流れで行います。 FairMQにおけるdeviceは、その初期化設定・状態制御を行う plugin を実行時に組み込んで機能拡張が可能です。

DAQ-Middlewareの使用経験がある場合は 表 1.1 に示す対応表を見るとFairMQを理解しやすいかと思います。

表 1.1 FairMQとDAQ-Middlewareの対応
FairMQ DAQ-MW
Device Component
Channel, Socket Port (InPort, OutPort)
Plugin ServicePort
Control UI, DDS UI DaqOpeartor
DDS RMS xinetd, bootComps

FairMQはFairRootの一部として開発されていましたが、現在はFairRootとは独立に使用可能になりました。

2. Device

FairMQを使ったデータ処理アプリケーションでは、ユーザータスクを内包する device が基本単位になります。 Deviceは FairMQDevice クラスから派生させて作成します。 基本的には1つのプロセス内で実行されるdeviceは1個です。 色々な機能のdeviceを組合せることで高度なデータ処理を実現します。

Deviceはデータの送信・受信を担当する channel と、処理実行のためのステートマシンから構成されます。 channelについての詳細は 後述 します。 また、Deviceの状態を取得したり操作するための汎用的な手段として plugin という方法を用いています。

2.1. Topology

FairMQではdevice同士のデータ入出力に使うchannelの接続図式を topology と呼んでいます。 以下にtopologyの一例を 図 2.1 に示します。 図中のsampler, splitter, buffer, processor, meger, sinkといったものがdeviceに相当します。 図中の矢印の向きにデータが流れていきます。

example topology image

図 2.1 Example topology (FairMQ)

各deviceはユニークなidが必要です。 idはdevice実行時のconfigurationで値を設定する必要があります。 topologyの設定方法には

  • device実行時のコマンドラインオプションで与える方法
  • topologyを記述したJSONファイルをコマンドラインオプションで与える方法
  • DDSで使用するXMLファイルにtopologyを記述する方法 (大規模分散処理向け)

の3通りを任意に組み合わせて行います。

2.2. State Machine

FairMQDeviceは内部に 図 2.2 に示すステートマシンを持っています。

device states

図 2.2 Device state machine (FairMQ)

プロセスが起動すると 図 2.2 の黒丸から開始してIDLE状態になります。 その後FairMQのデフォルトではINITIALIZING_DEVICEとINITIALIZING_TASKという二段階の初期化過程を経てRUNNINGに遷移します。

注釈

FairMQに用意されているControl plugin(またはDDS plugin)を使った場合はRUNNINGまで遷移します。 自作のpluginを使用すればこの挙動を変更できます。

deviceの初期化ではFairMQDevice同士の接続channel (socket)のconfigurationを行い、 taskの初期化ではユーザーのタスクのためのconfigurationを行うといった使い分けがあります。

注釈

RUNNINGが終了してもchannelがopenのままであれば直前の通信でmessage queueに入力されたけれど実際には使用されなかったデータがqueueに残ったままになります。 そのデータをどうするかはユーザーの実装次第です。

FairMQDeviceはFairMQStateMachineの派生クラスになっており、状態遷移の詳細はFairMQStateMachineクラスに記述されています。 状態遷移表の実装にはBoost MSM (Meta State Machine)ライブラリが使用されています。 状態遷移表を 表 2.1 に示します。 表の意味は

  • Start 状態において Event が発生した際に Guard 条件が成立していれば、Next 状態へ遷移します。遷移の際に Action を実行します。
  • guard条件の記述がない場合、event発生の際に無条件に状態遷移します。

となります。

表 2.1 FairMQStateMachineの状態遷移表
Start (*_FSM) Event (*_E) Next (*_FSM) Action (*Fct) Guard
IDLE INIT_DEVICE INITIALIZING_DEVICE InitDevice none
IDLE END EXITING Exiting none
INITIALIZING_DEVICE internal_DEVICE_READY DEVICE_READY DeviceReady none
DEVICE_READY INIT_TASK INITIALIZING_TASK InitTask none
DEVICE_READY RESET_DEVICE RESETTING_DEVICE ResetDevice none
INITIALIZING_TASK internal_READY READY Ready none
READY RUN RUNNING Run none
READY RESET_TASK RESETTING_TASK ResetTask none
RUNNING PAUSE PAUSED Pause none
RUNNING STOP READY Stop none
RUNNING internal_READY READY internalStop none
PAUSED RUN RUNNING Resume none
RESETTING_TASK internal_DEVICE_READY DEVICE_READY DeviceReady none
RESETTING_DEVICE internal_IDLE IDLE Idle none
OK ERROR_FOUND ERROR ErrorFound none

2.3. FairMQDevice派生クラスにおけるカスタマイズ

アプリケーション開発者は、ユーザーのタスクを実行するためのdeviceをFairMQDeviceから派生させて、以下のFairMQDeviceの仮想関数を派生クラス側でoverrideして実現したい処理を実装します。 ここに挙げた各関数には対応する xxxWrapper() というラッパー関数が存在し、その中から呼ばれます。

リスト 2.1 カスタマイズ用の仮想関数
virtual void Init(); // called in InitWrapper()
virtual void InitTask(); // called in InitTaskWrapper()
virtual void Run(); // called in RunWrapper()
virtual void PreRun(); // called in RunWrapper()
virtual bool ConditionalRun(); // called in RunWrapper()
virtual void PostRun(); // called in RunWrapper()
virtual void Pause(); // called in PauseWrapper()
virtual void ResetTask(); // called in ResetTaskWrapper()
virtual void Reset(); // called in ResetWrapper()

それぞれの関数がステートマシンのどの状態において呼ばれるかをまとめたのが 表 2.2 です。 いずれも 図 2.2 において色付きで示された(凡例において"state runs in its own thread"と示されている)状態において実行されます。

表 2.2 カスタマイズ用の仮想関数と関数が実行されるときのステートマシンの状態
methods state
Init() INITIALIZING_DEVICE
InitTask() INITIALIZING_TASK
Run()
PreRun()
ConditionalRun()
PostRun()
RUNNING
Pause() PAUSED
ResetTask() RESETTING_TASK
Reset() RESETTING_DEVICE

2.3.1. Init

この関数は、 INITIALIZING_DEVICE 状態のときに FairMQDevice::InitWrapper() の中で呼ばれます。 InitWrapper() では、 Configuration で設定したchannel (socket)のパラメータ(transport type (zeromq, nanomsg, ...)、 endpointの種別(bind or connect)、address)を使ってchannel (socket)の生成および接続を行った上で Init() が呼ばれます。

リスト 2.2 FairMQDevice::InitWrapper()の疑似コード
void FairMQDevice::InitWrapper()
{
   // 与えられたconfiguration parameterから未初期化のchannelのリストを作成 ;

   // 未初期化のbinding channelに対してsocketを作成してbind ;

   CallStateChangeCallbacks(INITIALIZING_DEVICE); // pluginに状態遷移を通知

   // 未初期化のconnecting channelに対してsocketを作成してconnect ;

   Init();

   ChangeState(internal_DEVICE_READY); // "internal_DEVICE_READY"イベントを発生させて"DEVICE_READY"状態に遷移する
}

Init()をoverrideしない場合は何もしません。 また、通常はInit()をoverrideしなくても構いません。

2.3.2. InitTask

この関数は、 INITIALIZING_TASK 状態のときに FairMQDevice::InitTaskWrapper() の中で呼ばれます。

リスト 2.3 FairMQDevice::InitTask()のコード
void FairMQDevice::InitTaskWrapper()
{
   CallStateChangeCallbacks(INITIALIZING_TASK); // pluginに状態遷移を通知
   InitTask();
   ChangeState(internal_READY); // "internal_READY"イベントを発生させて"READY"状態に遷移する
}

InitTask()をoverrideしない場合は何もしません。 通常はoverrideしてユーザーのタスクに必要なパラメータをコマンドラインオプションやパラメータファイルなどから取得して設定します。

2.3.3. Run, PreRun, ConditionalRun, PostRun

これらの関数は、 RUNNING 状態のときに FairMQDevice::RunWrapper() の中で呼ばれます。

リスト 2.4 FairMQDevice::RunWrapper()の疑似コード
void FairMQDevice::RunWrapper()
{
   CallStateChangeCallbacks(RUNNING); // pluginに状態遷移を通知

   // socket処理レートをログするためのスレッドを生成 ;

   // transports (socket) に Resume(= RUNNINGに遷移)したことを通知 ;

   try
   {
       PreRun();

       if (fDataCallbacks) // あるchannelでメッセージを受信した時に実行するcallback関数を、OnData()関数を使って登録してあるかどうか確認
       {  // メッセージ受信時のcallback関数が一個以上登録されている場合
          bool proceed = true;

          while (CheckCurrentState(RUNNING) && proceed)
          {
              for ( . . . . . ) // callback関数に登録されたchannelのリストでループ
              {
                  // proceed = (メッセージ受信を確認し、callback関数を呼んで処理を実行) ;
              }
          }
       }
       else
       {  // OnDataにcallback関数を登録してない場合

          // (ConditionalRunの実行レート調整のため、現在時刻を取得) ;

          while (CheckCurrentState(RUNNING) & ConditionalRun())
          {
              // (ConditionalRunの実行レート調整のため、必要ならsleep) ;
          }
          Run();
       }
   }
   catch (const std::out_of_range& e) {
       // (channel configurationにエラーがある場合は例外が出るので例外を受け取ってエラーをログに出す) ;
   }

   if (CheckCurrentState(RUNNING))
   {
       ChangeState(internal_READY); // "internal_READY"イベントを発生させて"READY"状態に遷移する
   }

   PostRun();

   // socket処理レートをログするスレッドをjoin ;
}

Run(), PreRun(), ConditionalRun(), PostRun()をoverrideしない場合は何もしません(ConditionalRunはfalseを返します)。 リスト 2.4 にある通り、PreRun()の後に2系統の処理ループ

  • callbackを使った処理ループ
  • ConditionalRunまたはRunによる処理ループ

があり、どちらかが実行されます。 処理ループを抜けるとPostRun()が実行されます。

データ受信時に実行したい処理の記述を簡単にするために、 OnData() を使って実行したい処理をcallbackとして登録する方法もあります。

2.3.3.1. Run(), ConditionalRun(), OnData()の使い分け

これらの関数は、いずれもユーザータスクを記述するための関数です。 それぞれの特徴と、どれを使ったらよいかを決めるためのヒントをここでまとめておきます。

  • Run()を使うケース
    任意のケースに使用できます。 ConditionalRun()やOnData()は、データ処理で頻出する部分を抽出して提供されているため、それらが使える場合はRun()よりもコードが簡潔になります。
  • ConditionalRun()が使えるケース
    この関数を使う場合は繰り返し行われるタスクのwhileループの中を記述するだけで済むという利点があります。 また、処理レートを調整する機能を使いたい場合にも適しています。
  • OnData()が使えるケース
    OnData()関数はそれ自身が処理を行うのではなく、受信channelにメッセージが届いたらそのメッセージに対する処理内容をcallback関数として登録するためのインターフェイスになっています。 メッセージ受信待ち部分をcallback関数の中に書かずに済むため、メッセージ受信時の処理を簡潔に記述できます。 一方、メッセージを受信したときにしかcallback関数は呼ばれないという制約があります。

2.3.4. Pause

この関数は、 PAUSED 状態のときに FairMQDevice::PauseWrapper() の中で呼ばれます。

リスト 2.5 FairMQDevice::PauseWrapper()のコード
void FairMQDevice::PauseWrapper()
{
   CallStateChangeCallbacks(PAUSED); // pluginに状態の遷移を通知

   Pause();
}

Pause()をoverrideしない場合、 リスト 2.6 に示した通りデフォルトではpauseが解除されるまでsleepします。

リスト 2.6 FairMQDevice::Pause()のコード
void FairMQDevice::Pause()
{
   while (CheckCurrentStatus(PAUSED))
   {
       std::this_thread::sleep_for(std::chrono::milliseconds(500));
       LOG(debug) << "paused...";
   }
   LOG(debug) << "Unpausing";
}

この動作を変更したい場合はoverrideしてください。

2.3.5. ResetTask

この関数は、 RESETTING_TASK 状態のときに FairMQDevice::ResetTaskWrapper() の中で呼ばれます。

リスト 2.7 FairMQDevice::ResetTaskWrapper()のコード
void FairMQDevice::ResetTaskWrapper()
{
   CallStateChangeCallbacks(RESETTING_TASK); // pluginに状態の遷移を通知
   ResetTask();
   ChangeState(internal_DEVICE_READY); // "internal_DEVICE_READY"イベントを発生させて"DEVICE_READY"状態に遷移する
}

ResetTask()をoverrideしない場合は何もしません。 ユーザータスクの終了後の後始末が必要であればoverrideしてカスタマイズして下さい。

2.3.6. Reset

この関数は、 RESETTTING_DEVICE 状態のときに FairMQDevice::ResetWrapper() の中で呼ばれます。

リスト 2.8 FairMQDevice::ResetWrapper()のコード
void FairMQDevice::ResetWrapper()
{
   CallStateChangeCallbacks(RESETTING_DEVICE); // pluginに状態の遷移を通知
   Reset();
   ChangeState(internal_IDLE); // "internal_IDLE"イベントを発生させて"IDLE"状態に遷移する
}

Reset()をoverrideしない場合、 リスト 2.9 に示した通りデフォルトではsocketを破棄します。

リスト 2.9 FairMQDevice::Reset()のコード
void FairMQDevice::Reset()
{
   // fChannelsは std::unordered_map<std::string, std::vector<FairMQChannel>>型
   for (auto& mi : fChannels)
   {
       // mi.secondはstd::vector<FairMQChannel>型
       for (auto& vi : mi.second)
       {
           vi.fSocket.reset(); // std::unique_ptr<FairMQSocket>::reset()を呼んでsocketをdestruct (代わりにnullptrをセット)
       }
   }
}

2.3.7. Pluginによる状態遷移

deviceがinteractive modeで実行された場合(defaultはinteractive)、キーボード入力によって状態の遷移が行えます。

  • h - help
  • p - pause
  • r - run
  • s - stop
  • t - reset task
  • d - reset device
  • q - end
  • j - init task
  • i - init device

interactive mode以外の実行方法、例えばプロセスバックグラウンド実行に関しては以下の2種類があります。

  • static (--control static) - deviceの状態シンプルには init → run → reset → exit のchainに沿って遷移します。
  • dds (--control dds) - deviceの状態は外部コマンド(この場合は fairmq-dds-command-ui)によって操作されます。

3. Transport

図 3.1 にTransport関連クラス相関図を示します。

transport interface

図 3.1 Transport interface (FairMQ)

注釈

図 3.1 はUML(Unified Modeling Language)記法が使われています。 UMLはオブジェクト指向手法の分析や設計を表現する統一された方法です。 クラスの継承関係が白塗三角(△)矢印で示されています(破線矢印は依存関係、実線矢印はフィールド(メンバ変数)関係 etc.)。

注釈

FairMQDeviceが直接FairMQSocketをメンバとして持つのでなく、実際にはFairMQChannelというFairMQSocketのコンテナを扱うクラスが間に入ります。

注釈

黄色で示されるuser code部分はFairRootにおける流儀であり、FairRootではFairTaskクラスから派生させたユーザーのtaskクラスを作成してuser codeを実行させる設計でした。 現在のFairMQはFairRootとの依存関係がなく、FairMQDeviceから直接派生させてユーザーのタスクを記述可能です。

中心にあるFairMQDeviceがユーザーのアプリケーション本体となるdevice(の基底クラス)です。 deviceは FairMQTransportFactory というインターフェイスクラスを介して FairMQSocket オブジェクトや FairMQMessage オブジェクトを作成します。 FairMQSocketFairMQMessage もインターフェイスクラスであり、transport type毎にFactroy, Socket, Message, ...の実装クラスが存在します。 図 3.1 では、赤色がzeromq, 緑色がnanomsgによる実装に対応します。

3.1. Transport types

FairMQで利用可能なtransportの種類とそれぞれの特徴を簡単にまとめておきます。

3.1.1. ZeroMQ (zmq)

高速なメッセージパッシングライブラリです。 broker-less zero-copy

3.1.2. Nanomsg (nn)

ZeroMQにinspireされています。

3.1.3. SHMEM

FairMQではboostとzeromqを使って実装されています。

3.1.4. OFI

FairMQではInfiniBandを扱うために使用されます。

3.2. Communication Patterns

FairMQDeviceは基本的にはzeromqで使える通信パターンを使用して通信します。

  • PUSH-PULL

    パイプライン型ともよばれます。 1:1で接続している場合は普通のFIFOと同じです。 図 3.3 に示したように複数の接続先をとることが可能です。 なお、queueにこれ以上メッセージを詰めるための空きが無くなった状態(メッセージ・キュー溢れ状態)のことをhigh water mark (HWM)と呼びます。

    PUSH socketが複数のPULL socketと接続している場合、PUSH socketがキューに入れたメッセージの送信先は接続中のHWMでないPULL socketから巡回的に選択され均等に送信されます。 すべてのPULL socketがHWM状態でPULL socketが同期sendをした場合、メッセージキューへのメッセージ追加がブロックされます。

    PULL socketが複数のPUSH sockeと接続している場合は、空でないPUSH socketから均等に受信します。

    PUSH-PULL pattern

    図 3.2 PUSH-PULL (Pipeline) pattern

  • PUB-SUB

    PUB-SUB patternはラジオのブロードキャストに似ています。 図 3.3 にPUB socketは複数接続しているSUB socketに対して同じメッセージを送信します。 PUB socketはSUB socketについての情報を必要とせず、SUB socketがHWMであってもメッセージを送信可能です。 PUB socketはSUB socketが接続してなくてもメッセージ送信を行うため、SUB socketの接続が遅れた場合にいくつかのメッセージを受信しない可能性があることに注意してください。 なお、SUB socketは複数のPUB socketに接続可能です。

    PUB-SUB pattern

    図 3.3 PUB-SUB (Publish-Subscrib) pattern

    PUB-SUB pattern (multiple pub)

    図 3.4 PUB-SUB (Publish-Subscrib) pattern (2)

  • REQ-REP

    図 3.5 に示したように、REQ socketがclient、REP socketがserverとなるclient-serverモデルに相当します。 REQ socketは複数のREP socketに接続可能です。

    • REQ socketは自分の送信したrequest messageに対するreply messageを受け取るまでは、次のrequest message送信がブロックされます
    • REP soecktはrequest messageを受信するまでreply messageの送信がブロックされます
    REQ-REP pattern

    図 3.5 REQ-REP (Request-Reply) pattern

    REQ-REP pattern (multiple rep)

    図 3.6 REQ-REP (Request-Reply) pattern (2)

  • PAIR

    exclusive pairです。

    • 双方向通信 (bidirectional)
    • 接続は1:1のみ
    • REQ-REPと違い、メッセージの送受信の順序に制限はありません

    マルチスレッド間の同期機構であるセマフォやミューテックスの代わりとしてPAIR patternを使うことを想定しています。

    PAIR pattern

    図 3.7 Relay race with PAIR pattern

FairMQではこれらの通信パターンを通信の type と呼んでいます。 表 3.1 にFairMQで利用可能なtypeを示します。

表 3.1 Communitation pattern supported in FairMQ
  zeromq nanomsg shmem ofi
PAIR yes yes yes  
PUSH/PULL yes yes yes  
PUB/SUB yes yes no  
REQ/REP yes yes yes  

表 3.2 は各transport実装で使用可能なアドレス指定方法のまとめです。

表 3.2 Address types
  zeromq nanomsg shmem ofi comment
inproc:// yes yes yes   プロセス内通信(in process)
ipc:// yes yes yes   プロセス間通信(inter process), single node
tcp:// yes yes yes   TCP, single node, multi node

TCPはリモート、ローカルのどちらの通信にも使えますが、ローカルであればipcの方が高速です。

3.3. Message, ownership

Device同士で送受信されるデータは FairMQMessage オブジェクトに格納されてやりとりされます。 FairMQ自身にはdevice同士でやりとりされるデータフォーマットの定義はなく、FairMQMessageには任意のデータを格納可能です。 transport type自身が持つデータフォーマットの制約があるため、例えばFairMQでtransportにZeroMQを使う場合は本来のZeroMQに準拠したデータフォーマットになるためZeroMQ socket同士であれば(他のプログラミング言語のバインディングとも)通信できます。

FairMQMessageオブジェクトの作成にはFairMQDeviceのメンバ関数の NewMessage() を使います。 返値の FairMQMessagePtrstd::uniqut_ptr<FairMQMessage> のエイリアスです。

  1. 引数なし での呼び出し: メッセージの受信時に受信メッセージの格納先として空のメッセージを作成します。 また、メッセージオブジェクトをコピーする際のコピー先として空のメッセージを作成するときにも使います。
リスト 3.1 引数なしNewMessage()
FairMQMessagePtr NewMessage() const;
  1. 指定サイズ分を確保 : 指定されたバイト数でメッセージ用のバッファを確保します。バッファのポインタを使ってデータを書き込んで下さい。
リスト 3.2 サイズ指定NewMessage()
FairMQMessagePtr NewMessage(const size_t size) const;
  1. 既存のバッファの所有権を譲渡 : 既存のバッファ( data )を使ってFairMQMessageオブジェクトを作成します。ZeroMQにおいてはzero-copy操作となります。 リスト 3.3 に示した通り、この場合のNewMessageの引数は4個あります。 第1引数の data が既存バッファのアドレスで、 reinterpret_cast<char*> を使って char* へキャストしてください。 第2引数の size はバッファのバイト数です。 既存バッファ data の所有権をFairMQMessageへと譲渡することになり、バッファを破棄するための関数を第3引数の ffn で与えます。 ffn のsignatureは void ffn(void *data, void* hint) であり、 NewMessage()の第1引数の data と 第4引数の hintffn の引数の data , hint として渡されます。 hintffn の実行時に必要なオブジェクトがあればそのアドレスを渡すために使います (例えば、メッセージ作成に使われる datahint の内部バッファになっていて、 data だけでなく hint ごと削除しなければならない場合に hint を使用します)。 送信用メッセージ作成のほとんどの場合でこのインターフェイスを使用することになります。
リスト 3.3 zero-copyのNewMessage()
using fairmq_free_fn = void(void* data, void* hint);
FairMQMessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const;

datahint の使い方について簡単な使用例を示しながら説明します。

リスト 3.4 は基本型からメッセージを作成する例です。 uint64_t 型(64 bit符号なし整数型)の変数 buffer を初期値2286として new で作成し、それを元にメッセージを作成しています。 この場合、 NewMessage の第1引数 data には buffer のアドレス(を char* にキャストしたもの)、第2引数にはバイト数(64 bitなので8 byte)を指定します。 第3引数には bufferdelete するための削除関数を指定します(ここでは削除関数をラムダ式で記述しています)。 この削除関数の第1引数には NewMessage の第1引数が void* として渡されるので、 削除関数内で作成時の型のポインタである uint64_t* にキャストして delete を行うようにしています。 なお、 NewMessage の第4引数(および削除関数の第2引数)である hint は使用しないので省略するかnullptrを指定します。

リスト 3.4 基本型からメッセージオブジェクトを作成する例
auto buffer = new uint64_t {2286};

FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer),
                                sizeof(uint64_t),
                                [](void* data, void*) { delete reinterpret_cast<uint64_t*>(data); },
                                nullptr));

リスト 3.5 は基本型配列からメッセージを作成する例です。 uint32_t 型(32 bit符号なし整数型)の配列(ここでは要素数100)を new で作成し、それを元にメッセージを作成しています。 NewMessage の第1引数には配列の先頭アドレス(をキャストしたもの)、第2引数には配列のバイト数、第3引数は削除関数を与えています。 この例では配列用の newbuffer を生成したので、その削除に使うのは配列用の delete [] になります。 この例でも NewMessage の第4引数は使用しません。

リスト 3.5 基本型の配列からメッセージオブジェクトを作成する例
int n = 100;
uint32_t* buffer = new uint32_t[n];

// 配列の内容を変更する操作 ; 

FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer),
                                n * sizeof(uint32_t),
                                [](void* data, void*) { delete [] reinterpret_cast<uint32_t*>(data); },
                                nullptr));

リスト 3.6std::vector<uint32_t> からメッセージを作成する例です。 NewMessage の第1引数には std::vectorの持つ内部バッファの先頭アドレスを渡します。 それにはC++11で追加されたメンバ関数である data() を使うと良いでしょう (C++11以前では、std::vectorの内部バッファの先頭アドレスを取得するには &a[0] のように a[0] (std::vector a の先頭要素)のアドレスを & 演算子で取得する記法がありました)。 第2引数は内部バッファのバイト数を渡します。 削除関数では new で生成した buffer を削除する必要があるため、 NewMessage の第4引数を使って削除関数の第2引数である hintstd::vector のアドレスが渡されるようにします。 削除関数では受け取った hintstd::vector<uint32_t>* にキャストして delete を適用するようにします。

リスト 3.6 std::vectorからメッセージオブジェクトを作成する例
auto buffer = new std::vector<uint32_t>;

// vectorの内容を変更する操作 ;

FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer->data()),
                                buffer->size() * sizeof(uint32_t),
                                [](void*, void* hint) { delete reinterpret_cast<std::vector<uint32_t>*>(hint); },
                                buffer));

リスト 3.7std::string からメッセージを作成する例です。 std::vectorの場合と同様になります。

リスト 3.7 std::stringからメッセージオブジェクトを作成する例
auto buffer = new std::string("hello world"); // new 時点で文字列は空でもよい

// 必要であれば文字列の追加・変更などの操作 ; 

FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer->data()),
                                buffer->size(),
                                [](void*, void* hint) { delete reinterpret_cast<std::string*>(hint); },
                                buffer));

以上では、 NewMessage の説明をしてきました。 以下に挙げる NewSimpleMessage()NewStaticMessage() といったメンバ関数でも既存バッファからメッセージを作成しますが、リスト 3.3 と違い所有権の移動は発生しません。 NewMessage() と上手く使い分けるといいでしょう。

  1. 既存のバッファをコピー : 既存バッファ data のコピーを作成してメッセージにします。 data の所有権は移動しません。 この関数ではコピー操作が発生するため data のサイズが小さい場合に適しています。
リスト 3.8 NewSimpleMessage()
template <typename T>
FairMQMessagePtr NewSimpleMessage(const T& data) const;
// この関数内でdataからnewでdataCopyを作成し、
// 3. のNewMessage()の引数リストにおいて、
// ・ dataにdataCopy
// ・ ffnにFairMQSimpleMsgCleanup() (dataCopyをdeleteする関数)
// を指定しているのと同じことをしています。
リスト 3.9 FairMQSimpleMsgCleanup()
template <typename T>
static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj)
{
   delete static_cast<T*>(obj);
}
  1. 既存バッファを参照 : 返値のメッセージは既存バッファ data のアドレスを指していますが所有権は移動しません。 data は転送が完了するまで寿命を保証する必要があります (ただし転送は非同期に行われ、いつ完了したかを知る方法はありません)。 このインターフェイスはサードパーティー製ツールが管理する連続したメモリ領域の転送に有用です。 data の指すオブジェクトが、そのデータメンバーとしてポインター(参照)をもつようなshallow-typeの場合は転送されませんので注意が必要です。
リスト 3.10 NewStaticMessage()
template <typename T>
FairMQMessagePtr NewStaticMessage(const T& data) const;
// 3. のNewMessage()の引数リストのffnに
// FairMQNoCleanup() (何もしない関数)を指定しているのと同じことをしています。
リスト 3.11 FairMQNoCleanup()
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) {}

3.3.1. Messageのコピー

メッセージオブジェクトのコピーには FairMQMessage::Copy() を使用します。 リスト 3.12msg0msg へコピーする例です。

リスト 3.12 FairMQMessageのコピーのサンプルコード
// FairMQMessagePtr msg0; 

FairMQMessagePtr msg(NewMessage());
msg->Copy(*msg0);

3.4. Multi-part message

複数のメッセージをひとつに結合して送って、受信した側では結合前の個々のメッセージに分割して利用するという場面がよくあります。 その場合、メッセージの区切りがどこにあるかといったデータフォーマットやプロトコルの設計と、結合されたメッセージを実際に分割処理するコードを書くなどの手間がかかります。 ZeroMQではmulti-part messageという機能を使うと、n個のメッセージの構成を維持したまま送受信可能です。 ZeroMQをベースにしたFairMQにもmulti-part messageの機能があり、FairMQMessageのstd::vectorを扱うための FairMQPart というクラスがあります。

注釈

正確には FairMQPartstd::vector<FairMQMessagePtr> のwrapperクラスです。 なお、 FairMQMessagePtrstd::unique_ptr<FairMQMessage> のエイリアスです。

3.5. Channel

channelはFairMQにおける通信のエンドポイントを表現します。 その使い方はUnix network socketに似ています。 deviceは複数のchannelを持つことができ、それぞれ他のdeviceからの接続を待つ bind endpoint か、あるいは他のdeviceへの接続を行う connect endpoint になります。 bind/connectはデータの流れる向きとは関係ありません。 channelは名前とsub channelのindexで管理されます。 同じ名前のchannelにsub channelが複数存在する場合、そのsub channelたちはtransport typeが同じである必要があります。

3.6. Poller

pollerはchannelがメッセージを受信したかどうか、あるいはメッセージを送信可能かどうかを確認するために使います。 一つのpollerで複数のchannelを同時に監視できますが、それらは同じtransport typeである必要があります。

4. Configuration

4.1. Device Configuration

Deviceのconfigurationは主にコマンドラインオプションで与えます。 Device毎にコマンドラインオプションは拡張可能です。

4.2. Communication Channels Configuration

channelのconfigurationに使っているparserとして以下の2種類があります。 parserはユーザーで拡張可能です。

4.2.1. JSON Parser

ここではJSONファイルによるchannelの設定例を示します。

{
    "fairMQOptions": {
        "devices": [
            {
                "id": "sampler1",
                "channels": [
                    {
                        "name": "data",
                        "sockets": [
                            {
                                "type": "push",
                                "method": "bind",
                                "address": "tcp://*:5555",
                                "sndBufSize": 1000,
                                "rcvBufSize": 1000,
                                "rateLogging": 1
                            }
                        ]
                    }
                ]
            },
            {
                "id": "sink1",
                "channels": [
                    {
                        "name": "data",
                        "sockets": [
                            {
                                "type": "pull",
                                "method": "connect",
                                "address": "tcp://localhost:5555",
                                "sndBufSize": 1000,
                                "rcvBufSize": 1000,
                                "rateLogging": 1
                            }
                        ]
                    }
                ]
            }
        ]
    }
}

JSONファイルによるconfigurationは複数のdeviceの設定を記述することができます。

  • device IDを使ってどのdeviceのパラメータかを特定します。 コマンドラインによるconfigurationでは --id パラメータとして与えます。 JSONファイルによるconfigurationでは id エントリーがそれに対応します。

  • JSONファイルによるconfigurationでは id の代わりに、 key を使うことで、複数のdeviceに対して共通の設定を適用可能です。

  • socketのオプションには、type, method, address の記述が必須です。 必須項目でないsocketのオプションに対して設定を省略した場合は、そのchannelにおけるデフォルトのパラメータが使用されます。

  • あるchannelに複数のsub-channelが存在するとき、sub-channelに共通の設定を直接記述可能です。以下に例を示します。

    "channels": [{
        "name": "data",
        "type": "push",
        "method": "bind",
        "sockets": [{
            "address": "tcp://*:5555",
            "address": "tcp://*:5556",
            "address": "tcp://*:5557"
        }]
    }]
    

4.2.2. SuboptParser

このparserはコマンドラインから直接configurationするのに使います。 key=value という書式をカンマで区切って並べます。 以下に示すのは、 --channel-config というオプションに対してkey/valueペアを列挙したものです。

--chanel-config name=output,type=push,method=bind,address=tcp://127.0.0.1:5555

4.3. コマンドラインオプションのカスタマイズ

ここではユーザータスクで使用するパラメータをmain()関数の引数となるコマンドラインオプションで与える方法を説明します。 FairMQではコマンドラインオプションを効率的に解析するツールとしてBoost.Program_optionsライブラリを使用しています。

4.3.1. Boost.Program_options入門

まずはBoost.Program_optionsライブラリの使い方を簡単に説明します。
boost::program_options::options_description 型変数を定義し、そのメソッド add_options() の後に
("オプションの名前", boost::program_options::value<オプションの型>(), "オプションの説明") ("オプションの名前", boost::program_options::value<オプションの型>()->default_value(デフォルト値), "デフォルト値を持つオプションの説明") ("オプションの名前", boost::program_options::value<オプションの型>()->multitoken(), "コンテナのような複数要素を持つオプションの説明")

といった書式を列挙し、最後のステートメントにセミコロン(;)を書きます。 一見奇妙に見えますが、 options.add_options()(オプション1)(オプション2)(オプション3) .... (オプションN); というふうになります(途中で改行OK)。 記述例を リスト 4.1 に示します。

リスト 4.1 boost::program_optionsの使用例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <boost/program_options.hpp>

int main (int argc, char* argv[])
{
   // define options
   boost::program_options::options_description options("hoge option");
   options.add_options()
       ("option1", boost::program_options::value<int>(), "description of option1")  // (1) integer without default value
       ("option2", boost::program_options::value<double>()->default_value(1.234), "description of option2")  // (2) double floating point number with default value
       ("foo",  boost::program_options::value<std::string>()->default_value("hoge"), "description of foo")  // (3) string with default value
       ("bar",  boost::program_options::value<std::vector<std::string>>()->maultitoken(), "description of bar") // (4) vector of string without default value
       ("hoge", "description of hoge") // (5) used as a boolean flag
       ("help,h", "Print this help") // (6) option key of "help" with shortened key of "h"
       ; // The statement ends with ';'

   // parse command line
   boost::program_options::variables_map vm;
   boost::program_options::store(boost::program_options::parse_command_line(argc, argv, options), vm);
   boost::program_options::notify(vm);

   // get result of parser
   if (vm.count("help")) { // check existence of "help"
       std::cout << options << std::endl;
   }

   auto option1 = vm["option1"].as<int>(); // get "option1" of integer type
   auto bar     = vm["bar"].as<std::vector<std::string>>(); // get "bar" of vector<string> type

   return 0;
}
それぞれ
  1. "option1" という名前のオプションを int 型変数として受け取る
  2. "option2" は double 型変数で、デフォルト値として1.234を設定
  3. "foo" は std::string
  4. "bar" は std::vector<std::string>
  5. コマンドラインオプションに"hoge" が含まれているかどうかをboolean flagとして使う
  6. "help"オプションの短縮形として"h"を使用するための書式

というオプションに対応します。仮にこのソースコードをコンパイルして app という名前の実行ファイルを作成した場合、

app --option1 3 --foo su3
app --hoge --bar hello world --option2 0.938
app --help
app -h

といった使い方ができます。 それぞれの引数の順番は入れ替えても構いません。

コマンドライン解析の結果は boost::program_options::variables_map 型変数に格納されます。 boost::progrram_options::variables_map 型は std::string をkey, あらゆる値を入れられるheterogeneous containerの any 型コンテナのwrapperである boost::program_options::variable_value をvalueとする std::map<std::string, boost::program_options::variable_value> からの派生クラスです。 コマンドラインオプションとして与えられたかどうかを確認するには std::mapcount() が使えます。 値を取得するには、オプション名をkeyとして boost::program_options::variables_map にアクセスし、 boost::program_options::variable_valueas<T>() メソッドを使用します。

注釈

boost::program_options といった名前空間は長いので、実際にソースコードを書く場合は using xxxx;using namespace xxxx;namespace = xxxx; などで省略形を使用するほうがいいかもしれません。

4.3.2. FairMQにおけるprogram_options

オプションの追加は boost::programs_option と同じインターフェイスになります。 後述のdeviceの開発例 にもありますが、main()関数が記述された runFairMQDevice.h にある addCustomOptions という関数の中でユーザータスク用のオプションを記述します。 リスト 4.2 に記述例を載せます。

リスト 4.2 FairMQDeviceでのcustom optionの追加例
void addCustomOptions(boost::program_options::options_description& options)
{
   options.add_options()
       ("hoge", boost::program_options::value<int>(), "hoge option")
       ("huga", boost::program_options::value<double>(), "huga option");
}

オプションの取得に関しては、 boost::programs_optoin::variables_map のwrapperである FairMQProgOptions というクラスが用意されており、FairMQDeviceのメンバ変数 fConfig としてアクセス可能です。 FairMQProgOptions::GetValue<T>("オプション名") というメソッドを使います。 主に InitTask() でユーザータスクの初期化のためにパラメータを設定するといった用途に使います。

リスト 4.3 FairMQDeviceでのcustom optionの取得例
   auto foo = fConfig->GetValue<int>("hoge");
   auto bar = fConfig->GetValue<double>("huga");

4.4. Introspection

deviceの実行バイナリファイナルはどういったオプションが利用可能かを表示させることができます。

  • -h [ --help] : 全ての利用可能なオプションと、それに対する説明及び各オプションのデフォルト値(もしあれば)を表示します。
  • --print-options : 全ての利用可能なオプションをmachine-readable formatで表示します。 <option>:<computed-value>:<<type>>:<description>
  • --print-channels : RegisterChannelEndpoint で登録したchannelの情報を <channel name>:<minimum sub-channels>:<maximum sub- channels> という書式で表示します。
void YourDevice::RegisterChannelEndPoints()
{
  // provide channel name, minimum and maximum number of subchannels
  RegisterChannelEndpoint("channelA", 1, 10000);
  RegisterChannelEndpoint("channelB", 1, 1);
}

5. Plugin

5.1. Control plugin

5.2. DDSとDDS plugin

6. Logging

FairMQLogger.h ファイルから fair::Logger ライブラリをlogging用に読み込んでいます。 log機能は全て LOG(severity) という書式のマクロを介して使用します。 このマクロによるlog出力はスレッドセーフです。 log出力は std::cout やファイル、他にカスタムのsinkに対して行うことができます。

6.1. Log severity

logのseverityは以下のAPIで制御します。

fiar::Logger::SetConsoleSeverity("<severity level>");
// and/or
fiar::Logger::SetFileSeverity("<severity level>");
// and/or
fiar::Logger::SetCustomSeverity("<customSinkName>", "<severity level>");

severity levelは次のうちのいずれかになります。

"nolog",
"fatal",
"error",
"warn",
"state",
"info",
"debug",
"debug1",
"debut2",
"debut3",
"debut4",
"trace",

設定したseverityから上のseverity levelに対してloggingが行われます。 fatal seveirtyは常にlogが出力されます。 nologの場合は完全にlog出力を停止します。

FairMQDeviceの場合は、コマンドラインオプションを使って --severity <>level という書式でseverityの設定が可能です。

6.2. Log verbosity

log verbosityの制御は以下の関数で行います。

fair::Logger::SetVerbosity("<verbosity level>");

全てのsinkに対して同じ方法になります。 verbosity levelには以の下4通りがあり、それぞれの出力書式も合わせて示してあります。

表 6.1 Log verbosity level
level description
low [severity] message
medium [HH:MM:SS][severity] message
high [process name][HH:MM:SS:μS][severity] message
veryhigh [process name][HH:MM:SS:μS][severity][file:line:function] message

6.3. Color

コンソールにlogを出力する場合はカラー出力のon/offが可能です。

Logger::SetConsoleColor(true); // onにする場合

FairMQDeviceのコマンドラインオプションで与える場合は、 --color <true/false> という書式になります。

6.4. File output

logをファイルに出力するには

Logger::InitFileSink("<severity level>", "<filename>", true);

第1引数でseverity levelを設定します。 第2引数で出力先のファイル名を設定します。 第3引数が true の場合はファイル名にタイムスタンプが追加されます。

FairMQDeviceのコマンドラインオプションで与える場合は、 --log-to-file <filename_prefix> という書式になります。 これを指定するとコンソール出力は自動的にoffになります。

6.5. Custom sinks

カスタムのログ出力は、 Logger::AddCustomSink("sink name", "<severity>", callback) という関数で追加可能です。 callback 関数は、文字列と fair::LogMetaData 構造体を引数に取る関数(std::function)です。 以下の例はseverityに "trance" を指定することで全てのseverityに対して文字列 contentmetadata を出力します。

Logger::AddCustomSink("MyCustomSink", "trace", [](const sttring& content, const LogMetaData& metaData)
{
  cout << "content: " << content << endl;

  cout << "available metadata: " << endl;
  cout << "std::time_t timestamp: " << metadata.timestamp << endl;
  cout << "std::chrono::microseconds us: " << metadata.us.count() << endl;
  cout << "std::string process_name: " << metadata.process_name << endl;
  cout << "std::string file: " << metadata.file << endl;
  cout << "std::string line: " << metadata.line << endl;
  cout << "std::string func: " << metadata.func << endl;
  cout << "std::string severity_name: " << metadata.severity_name << endl;
  cout << "fair::Severity severity: " << static_cast<int>(metadata.severity) << endl;
});

Custom sinkだけを有効にしたい場合は、consoleとfileのsinkのseverityを "nolog" に設定します。

7. Development

7.1. Device作成例

ここではFairMQの枠組みでユーザーのタスクを実行するためのdeviceを作成する方法を説明します。 具体例として、 SamplerSink という2つのdeviceを作成します。 (FairMQに用意されているexamples/1-1/Sampler, Sinkと同じです)

  • SamplerからSinkに向けて、文字列をメッセージとして転送します。
  • どのような文字列を送るかをSamplerのコマンドラインオプション text で指定します。
  • Samplerが送信に使うchannelの名前は data
  • Sinkが受信に使うchannelの名前は data

一般的にはdevice毎に以下の3つのファイルを作成します。 (C++を理解していれば他の構成でも構いません)

  • Sampler: Sampler.h, Sampler.cxx, runSampler.cxx
  • Sink: Sink.h, Sink.cxx, runSink.cxx

7.1.1. Sampler deviceの作成

Samplerクラスのヘッダーファイルです。クラスの定義を記述します。

リスト 7.1 Sampler.h
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef Sampler_h
#define Sampler_h

#include <FairMQDevice.h>

class Sampler : public FairMQDevice
{
 public:
  Sampler();
  ~Sampler() = default;

 protected:
  void InitTask() override;
  bool ConditionalRun() override;

 private:
  std::string fText;
  uint64_t fMaxIterations;
  uint64_t fNumIterations;
     
};

#endif

続いてSamplerクラスの実装ファイルです。メンバ関数の定義を記述します。

リスト 7.2 Sampler.cxx
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <thread>
#include <chrono>

#include "Sampler.h"

using namespace std::chrono_literals; // (C++14)

Sampler::Sampler()
  : fText()
  , fMaxIterations(0)
  , fNumIterations(0)
{
}

void Sampler::InitTask()
{
  fText = fConfig->GetValue<std::string>("text");
  fMaxIteration = fConfig->GetValue<uint64_t>("max-iterations");
}

bool Sampler::ConditionalRun()
{
  std::string* text = new std::string(fText);

  FairMQMessagePtr msg(NewMessage(const_cast<char*>(text->data()),
                                  text->length(),
                                  [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
                                  text));

  LOG(info) << "Sending \"" << fText << "\"";

  if (Send(msg, "data") < 0) {
    return false;
  }
  else if (fMaxIterations >0 && ++fNumIterations >= fMaxIterations) {
    LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
    return false;
  }

  std::this_thread::sleep_for(1s);
}

ConditionalRun() では text からメッセージオブジェクトの msg を作成して Send() で送信しています。 Send() は送信キューに詰めるだけで、キューがいっぱいでなければすぐに関数が返ります。 実際に送信動作が行われるのは一度 ConditionalRun() から抜けた後かもしれません。 text の寿命は ConditionalRun() から抜けた後でも有効であることを保証しなければならないため、 text はヒープ領域に作成し、 NewMessage()msg に所有権を移動させています。 また、送信完了時に破棄しなければならないオブジェクトは NewMesage() の第1引数として与えたデータ本体である text->data() (の指す領域)ではなく、 new で作成した text であるため、 text を適切に delete するための関数をラムダ式で与え、 NewMessage() の第4引数(=破棄関数の第2引数。ここでは void* object )として text を与えています。

最後にmain()関数が記述された runFairMQDevice.h をインクルードして、Samplerを生成するfactory関数 getDevice() と、Samplerのタスク実行に必要なオプションを追加する関数 addCustomOptions() を記述します。

リスト 7.3 runSampler.cxx
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#include <runFairMQDevice.h>
#include "Sampler.h"

void addCustomOptions(boost::program_options::options_description& options)
{
  options.add_options()
    ("text", boost::program_options::value<std::string>()->default_value("Help"), "Text to send out")
    ("max-iterations", boost::program_options::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
     
}

FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
  return new Sampler;
}

7.1.2. Sink deviceの作成

Sinkクラスのヘッダーファイルです。クラスの定義を記述します。

リスト 7.4 Sink.h
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef Sink_h
#define Sink_h

#include <FairMQDevice.h>

class Sink : public FairMQDevice
{
 public:
  Sink();
  ~Sink() = default;

 protected:
  void InitTask() override;
  bool HandleData(FairMQMessagePtr&, int);

 private:
  uint64_t fMaxIterations;
  uint64_t fNumIterations;
  
};

#endif

続いてSinkクラスの実装ファイルです。メンバ関数の定義を記述します。

リスト 7.5 Sink.cxx
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "Sink.h"

Sink::Sink()
  : fMaxIterations(0)
  , fNumIterations(0)
{
  Ondata("data", &Sink:HandleData);
}

void Sink::InitTask()
{
  fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
}

bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
  LOG(info) << "Received: \"" << std::string(reinterpret_cast<char*>(msg->Data()), msg->GetSize()) << std::endl;
  if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
    {
      LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
      return false;
    }
  return true;
}

Sinkのコンストラクタで data channelでメッセージを受信したら HandleData() を実行するように OnData() を使って HandleData() をcallbackとして登録しています。 OnData を使うとメッセージを Receive する部分を記述しなくて済むので簡潔になります。 受信したメッセージは HandleData() の引数として渡されるので、それを処理する部分を記述します。 この例ではログに出力するだけの処理です。 受信したデータを格納している FairMQMessagePtrstd:unique_ptr<FairMQMessage> のエイリアスで、どこからも参照されなくなった時点で自動的に破棄されます。

最後にmain()関数が記述された runFairMQDevice.h をインクルードして、Sinkを生成するfactory関数 getDevice() と、Sinkのタスク実行に必要なオプションを追加する関数 addCustomOptions() を記述します。

リスト 7.6 runSink.cxx
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#include <runFairMQDevice.h>
#include "Sink.h"

void addCustomOptions(boost::program_options::options_description& options)
{
  options.add_options()
    ("max-iterations", boost::program_options::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}

FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
  return new Sink;
}

7.1.3. Sampler - Sinkの実行

7.2. CMakeLists.txtの書き方

CMakeLists.txtの書き方を簡単に説明します。

7.3. その他のdeviceの例

7.4. カスタムのPluginの作成

7.5. Testing

unit test (単体テスト)では数10のプロセスによる本格的な分散システムを起動するのが適さないことがよくあります。
deviceをインスタンシエートしないでテストする例

transport factoryをヒープ上に作成する場合は、channelの正常なシャットダウンのためにchannelを先に破棄する必要があります。 deviceで使える Send/ReceiveNew*Message/New*Poller APIは全てchannelにもあります。