FairMQ利用メモ(日本語)¶
これは FairMQ を利用したアプリケーション開発のための非公式ユーザーガイド(日本語)です。
1. Introduction¶
FairMQ はC++言語で書かれたMessage Queingライブラリ及びフレームワークです。 原子核実験におけるモンテカルロシミュレーション、データ解析、データ収集のデータワークフローを分散環境で実行することを想定して開発されています。 大規模データワークフローを扱うために以下の特徴があります。
異なるdata transport技術を用いた 非同期メッセージパッシングの抽象化
FairMQのコアとなる部分は非同期メッセージパッシングライブラリの ZeroMQ に強い影響を受けています。 zeromqを短い言葉で表現するなら、"高機能なsocket"と言えます。 FIFO (queue)によるデータのバッファリングや転送・受信の非同期処理、複数の接続相手との通信などを簡単に実装できます。
- 非同期メッセージパッシング
ここで"非同期的にメッセージがやりとりされる"というのは、送信(受信)関数を呼び出してから実際の送信(受信)処理が完了するまでの間、呼び出し側はブロックされないことを意味します。 例えば、send関数を呼び出してsend queueにメッセージを詰めたらすぐにsend関数が返り、実際の転送処理は別スレッドで実行されます。
zeromqではPUSH/PULL, PUB/SUBといったスケーラビリティのある接続パターンが使用可能です。 zeromqではスレッド間通信、(同一ノード上の)プロセス間通信、ノード間通信が可能ですが、ノード間の接続にはEthernetを使用しておりInfinibandには対応していません。 zeromqでサポートされていない部分をカバーしたり、より効率的な通信手段をサポートするために、FairMQではzeromq以外にもshmem,nanomsg,ofiを使った通信も利用可能になっています。 これらのライブラリを使った非同期メッセージパッシングを同じAPI(関数呼び出しインターフェイス)で実現するために、C++のクラス継承やテンプレートを使ってAPIの抽象化がされています。 APIから呼ばれて通信を行う各ライブラリ zeromq, shmem (shared memory), nanomsg, ofi (open fabric interfaces)のことを transport と呼んでいます。効率的なdata transport (zero-copy, high throughput)
データフォーマットに依存しない(agnostic)通信のためのツール
FairMQでは特別なデータフォーマットを使用していません。 各種のtransportを使って任意のデータを送受信可能です。 (ただしtransportが使うヘッダーやフッターが追加される場合があるため、異なるtransport同士を直接接続することはできません)
より高レベルなデータ処理ワークフローのための 構成要素となるdevice
FairMQではデータ送受信の非同期化・並列化に加えて、アクターモデル(Actor model)による並列データ処理をするためのフレームワークを提供しています。 アクターモデルでは、アクターと呼ばれるオブジェクト同士が並列でメッセージを送受信しあって処理を行います。 FairMQではアクターのことを device と呼んでいます。 Deviceはステートマシンを持ち、ユーザータスクの実行をステートマシンの状態遷移に載せて、パラメータ初期化→ユーザータスクの処理→終了処理といった流れで行います。 FairMQにおけるdeviceは、その初期化設定・状態制御を行う plugin を実行時に組み込んで機能拡張が可能です。
DAQ-Middlewareの使用経験がある場合は 表 1.1 に示す対応表を見るとFairMQを理解しやすいかと思います。
表 1.1 FairMQとDAQ-Middlewareの対応¶ FairMQ DAQ-MW Device Component Channel, Socket Port (InPort, OutPort) Plugin ServicePort Control UI, DDS UI DaqOpeartor DDS RMS xinetd, bootComps
FairMQはFairRootの一部として開発されていましたが、現在はFairRootとは独立に使用可能になりました。
2. Device¶
FairMQを使ったデータ処理アプリケーションでは、ユーザータスクを内包する device が基本単位になります。
Deviceは FairMQDevice クラスから派生させて作成します。
基本的には1つのプロセス内で実行されるdeviceは1個です。
色々な機能のdeviceを組合せることで高度なデータ処理を実現します。
Deviceはデータの送信・受信を担当する channel と、処理実行のためのステートマシンから構成されます。 channelについての詳細は 後述 します。 また、Deviceの状態を取得したり操作するための汎用的な手段として plugin という方法を用いています。
2.1. Topology¶
FairMQではdevice同士のデータ入出力に使うchannelの接続図式を topology と呼んでいます。 以下にtopologyの一例を 図 2.1 に示します。 図中のsampler, splitter, buffer, processor, meger, sinkといったものがdeviceに相当します。 図中の矢印の向きにデータが流れていきます。
図 2.1 Example topology (FairMQ)
各deviceはユニークなidが必要です。 idはdevice実行時のconfigurationで値を設定する必要があります。 topologyの設定方法には
- device実行時のコマンドラインオプションで与える方法
- topologyを記述したJSONファイルをコマンドラインオプションで与える方法
- DDSで使用するXMLファイルにtopologyを記述する方法 (大規模分散処理向け)
の3通りを任意に組み合わせて行います。
2.2. State Machine¶
FairMQDeviceは内部に 図 2.2 に示すステートマシンを持っています。
図 2.2 Device state machine (FairMQ)
プロセスが起動すると 図 2.2 の黒丸から開始してIDLE状態になります。 その後FairMQのデフォルトではINITIALIZING_DEVICEとINITIALIZING_TASKという二段階の初期化過程を経てRUNNINGに遷移します。
注釈
FairMQに用意されているControl plugin(またはDDS plugin)を使った場合はRUNNINGまで遷移します。 自作のpluginを使用すればこの挙動を変更できます。
deviceの初期化ではFairMQDevice同士の接続channel (socket)のconfigurationを行い、 taskの初期化ではユーザーのタスクのためのconfigurationを行うといった使い分けがあります。
注釈
RUNNINGが終了してもchannelがopenのままであれば直前の通信でmessage queueに入力されたけれど実際には使用されなかったデータがqueueに残ったままになります。 そのデータをどうするかはユーザーの実装次第です。
FairMQDeviceはFairMQStateMachineの派生クラスになっており、状態遷移の詳細はFairMQStateMachineクラスに記述されています。 状態遷移表の実装にはBoost MSM (Meta State Machine)ライブラリが使用されています。 状態遷移表を 表 2.1 に示します。 表の意味は
Start状態においてEventが発生した際にGuard条件が成立していれば、Next状態へ遷移します。遷移の際にActionを実行します。- guard条件の記述がない場合、event発生の際に無条件に状態遷移します。
となります。
| Start (*_FSM) | Event (*_E) | Next (*_FSM) | Action (*Fct) | Guard |
|---|---|---|---|---|
| IDLE | INIT_DEVICE | INITIALIZING_DEVICE | InitDevice | none |
| IDLE | END | EXITING | Exiting | none |
| INITIALIZING_DEVICE | internal_DEVICE_READY | DEVICE_READY | DeviceReady | none |
| DEVICE_READY | INIT_TASK | INITIALIZING_TASK | InitTask | none |
| DEVICE_READY | RESET_DEVICE | RESETTING_DEVICE | ResetDevice | none |
| INITIALIZING_TASK | internal_READY | READY | Ready | none |
| READY | RUN | RUNNING | Run | none |
| READY | RESET_TASK | RESETTING_TASK | ResetTask | none |
| RUNNING | PAUSE | PAUSED | Pause | none |
| RUNNING | STOP | READY | Stop | none |
| RUNNING | internal_READY | READY | internalStop | none |
| PAUSED | RUN | RUNNING | Resume | none |
| RESETTING_TASK | internal_DEVICE_READY | DEVICE_READY | DeviceReady | none |
| RESETTING_DEVICE | internal_IDLE | IDLE | Idle | none |
| OK | ERROR_FOUND | ERROR | ErrorFound | none |
2.3. FairMQDevice派生クラスにおけるカスタマイズ¶
アプリケーション開発者は、ユーザーのタスクを実行するためのdeviceをFairMQDeviceから派生させて、以下のFairMQDeviceの仮想関数を派生クラス側でoverrideして実現したい処理を実装します。
ここに挙げた各関数には対応する xxxWrapper() というラッパー関数が存在し、その中から呼ばれます。
virtual void Init(); // called in InitWrapper()
virtual void InitTask(); // called in InitTaskWrapper()
virtual void Run(); // called in RunWrapper()
virtual void PreRun(); // called in RunWrapper()
virtual bool ConditionalRun(); // called in RunWrapper()
virtual void PostRun(); // called in RunWrapper()
virtual void Pause(); // called in PauseWrapper()
virtual void ResetTask(); // called in ResetTaskWrapper()
virtual void Reset(); // called in ResetWrapper()
それぞれの関数がステートマシンのどの状態において呼ばれるかをまとめたのが 表 2.2 です。 いずれも 図 2.2 において色付きで示された(凡例において"state runs in its own thread"と示されている)状態において実行されます。
| methods | state |
|---|---|
| Init() | INITIALIZING_DEVICE |
| InitTask() | INITIALIZING_TASK |
Run()
PreRun()
ConditionalRun()
PostRun()
|
RUNNING |
| Pause() | PAUSED |
| ResetTask() | RESETTING_TASK |
| Reset() | RESETTING_DEVICE |
2.3.1. Init¶
この関数は、 INITIALIZING_DEVICE 状態のときに FairMQDevice::InitWrapper() の中で呼ばれます。
InitWrapper() では、 Configuration で設定したchannel (socket)のパラメータ(transport type (zeromq, nanomsg, ...)、 endpointの種別(bind or connect)、address)を使ってchannel (socket)の生成および接続を行った上で Init() が呼ばれます。
void FairMQDevice::InitWrapper()
{
// 与えられたconfiguration parameterから未初期化のchannelのリストを作成 ;
// 未初期化のbinding channelに対してsocketを作成してbind ;
CallStateChangeCallbacks(INITIALIZING_DEVICE); // pluginに状態遷移を通知
// 未初期化のconnecting channelに対してsocketを作成してconnect ;
Init();
ChangeState(internal_DEVICE_READY); // "internal_DEVICE_READY"イベントを発生させて"DEVICE_READY"状態に遷移する
}
Init()をoverrideしない場合は何もしません。 また、通常はInit()をoverrideしなくても構いません。
2.3.2. InitTask¶
この関数は、 INITIALIZING_TASK 状態のときに FairMQDevice::InitTaskWrapper() の中で呼ばれます。
void FairMQDevice::InitTaskWrapper()
{
CallStateChangeCallbacks(INITIALIZING_TASK); // pluginに状態遷移を通知
InitTask();
ChangeState(internal_READY); // "internal_READY"イベントを発生させて"READY"状態に遷移する
}
InitTask()をoverrideしない場合は何もしません。 通常はoverrideしてユーザーのタスクに必要なパラメータをコマンドラインオプションやパラメータファイルなどから取得して設定します。
2.3.3. Run, PreRun, ConditionalRun, PostRun¶
これらの関数は、 RUNNING 状態のときに FairMQDevice::RunWrapper() の中で呼ばれます。
void FairMQDevice::RunWrapper()
{
CallStateChangeCallbacks(RUNNING); // pluginに状態遷移を通知
// socket処理レートをログするためのスレッドを生成 ;
// transports (socket) に Resume(= RUNNINGに遷移)したことを通知 ;
try
{
PreRun();
if (fDataCallbacks) // あるchannelでメッセージを受信した時に実行するcallback関数を、OnData()関数を使って登録してあるかどうか確認
{ // メッセージ受信時のcallback関数が一個以上登録されている場合
bool proceed = true;
while (CheckCurrentState(RUNNING) && proceed)
{
for ( . . . . . ) // callback関数に登録されたchannelのリストでループ
{
// proceed = (メッセージ受信を確認し、callback関数を呼んで処理を実行) ;
}
}
}
else
{ // OnDataにcallback関数を登録してない場合
// (ConditionalRunの実行レート調整のため、現在時刻を取得) ;
while (CheckCurrentState(RUNNING) & ConditionalRun())
{
// (ConditionalRunの実行レート調整のため、必要ならsleep) ;
}
Run();
}
}
catch (const std::out_of_range& e) {
// (channel configurationにエラーがある場合は例外が出るので例外を受け取ってエラーをログに出す) ;
}
if (CheckCurrentState(RUNNING))
{
ChangeState(internal_READY); // "internal_READY"イベントを発生させて"READY"状態に遷移する
}
PostRun();
// socket処理レートをログするスレッドをjoin ;
}
Run(), PreRun(), ConditionalRun(), PostRun()をoverrideしない場合は何もしません(ConditionalRunはfalseを返します)。 リスト 2.4 にある通り、PreRun()の後に2系統の処理ループ
- callbackを使った処理ループ
- ConditionalRunまたはRunによる処理ループ
があり、どちらかが実行されます。 処理ループを抜けるとPostRun()が実行されます。
データ受信時に実行したい処理の記述を簡単にするために、 OnData() を使って実行したい処理をcallbackとして登録する方法もあります。
2.3.3.1. Run(), ConditionalRun(), OnData()の使い分け¶
これらの関数は、いずれもユーザータスクを記述するための関数です。 それぞれの特徴と、どれを使ったらよいかを決めるためのヒントをここでまとめておきます。
- Run()を使うケース任意のケースに使用できます。 ConditionalRun()やOnData()は、データ処理で頻出する部分を抽出して提供されているため、それらが使える場合はRun()よりもコードが簡潔になります。
- ConditionalRun()が使えるケースこの関数を使う場合は繰り返し行われるタスクのwhileループの中を記述するだけで済むという利点があります。 また、処理レートを調整する機能を使いたい場合にも適しています。
- OnData()が使えるケースOnData()関数はそれ自身が処理を行うのではなく、受信channelにメッセージが届いたらそのメッセージに対する処理内容をcallback関数として登録するためのインターフェイスになっています。 メッセージ受信待ち部分をcallback関数の中に書かずに済むため、メッセージ受信時の処理を簡潔に記述できます。 一方、メッセージを受信したときにしかcallback関数は呼ばれないという制約があります。
2.3.4. Pause¶
この関数は、 PAUSED 状態のときに FairMQDevice::PauseWrapper() の中で呼ばれます。
void FairMQDevice::PauseWrapper()
{
CallStateChangeCallbacks(PAUSED); // pluginに状態の遷移を通知
Pause();
}
Pause()をoverrideしない場合、 リスト 2.6 に示した通りデフォルトではpauseが解除されるまでsleepします。
void FairMQDevice::Pause()
{
while (CheckCurrentStatus(PAUSED))
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
LOG(debug) << "paused...";
}
LOG(debug) << "Unpausing";
}
この動作を変更したい場合はoverrideしてください。
2.3.5. ResetTask¶
この関数は、 RESETTING_TASK 状態のときに FairMQDevice::ResetTaskWrapper() の中で呼ばれます。
void FairMQDevice::ResetTaskWrapper()
{
CallStateChangeCallbacks(RESETTING_TASK); // pluginに状態の遷移を通知
ResetTask();
ChangeState(internal_DEVICE_READY); // "internal_DEVICE_READY"イベントを発生させて"DEVICE_READY"状態に遷移する
}
ResetTask()をoverrideしない場合は何もしません。 ユーザータスクの終了後の後始末が必要であればoverrideしてカスタマイズして下さい。
2.3.6. Reset¶
この関数は、 RESETTTING_DEVICE 状態のときに FairMQDevice::ResetWrapper() の中で呼ばれます。
void FairMQDevice::ResetWrapper()
{
CallStateChangeCallbacks(RESETTING_DEVICE); // pluginに状態の遷移を通知
Reset();
ChangeState(internal_IDLE); // "internal_IDLE"イベントを発生させて"IDLE"状態に遷移する
}
Reset()をoverrideしない場合、 リスト 2.9 に示した通りデフォルトではsocketを破棄します。
void FairMQDevice::Reset()
{
// fChannelsは std::unordered_map<std::string, std::vector<FairMQChannel>>型
for (auto& mi : fChannels)
{
// mi.secondはstd::vector<FairMQChannel>型
for (auto& vi : mi.second)
{
vi.fSocket.reset(); // std::unique_ptr<FairMQSocket>::reset()を呼んでsocketをdestruct (代わりにnullptrをセット)
}
}
}
2.3.7. Pluginによる状態遷移¶
deviceがinteractive modeで実行された場合(defaultはinteractive)、キーボード入力によって状態の遷移が行えます。
h- helpp- pauser- runs- stopt- reset taskd- reset deviceq- endj- init taski- init device
interactive mode以外の実行方法、例えばプロセスバックグラウンド実行に関しては以下の2種類があります。
- static (
--control static) - deviceの状態シンプルには init → run → reset → exit のchainに沿って遷移します。 - dds (
--control dds) - deviceの状態は外部コマンド(この場合はfairmq-dds-command-ui)によって操作されます。
3. Transport¶
図 3.1 にTransport関連クラス相関図を示します。
図 3.1 Transport interface (FairMQ)
注釈
図 3.1 はUML(Unified Modeling Language)記法が使われています。 UMLはオブジェクト指向手法の分析や設計を表現する統一された方法です。 クラスの継承関係が白塗三角(△)矢印で示されています(破線矢印は依存関係、実線矢印はフィールド(メンバ変数)関係 etc.)。
注釈
FairMQDeviceが直接FairMQSocketをメンバとして持つのでなく、実際にはFairMQChannelというFairMQSocketのコンテナを扱うクラスが間に入ります。
注釈
黄色で示されるuser code部分はFairRootにおける流儀であり、FairRootではFairTaskクラスから派生させたユーザーのtaskクラスを作成してuser codeを実行させる設計でした。 現在のFairMQはFairRootとの依存関係がなく、FairMQDeviceから直接派生させてユーザーのタスクを記述可能です。
中心にあるFairMQDeviceがユーザーのアプリケーション本体となるdevice(の基底クラス)です。
deviceは FairMQTransportFactory というインターフェイスクラスを介して FairMQSocket オブジェクトや FairMQMessage オブジェクトを作成します。
FairMQSocket も FairMQMessage もインターフェイスクラスであり、transport type毎にFactroy, Socket, Message, ...の実装クラスが存在します。
図 3.1 では、赤色がzeromq, 緑色がnanomsgによる実装に対応します。
3.1. Transport types¶
FairMQで利用可能なtransportの種類とそれぞれの特徴を簡単にまとめておきます。
3.1.1. ZeroMQ (zmq)¶
高速なメッセージパッシングライブラリです。 broker-less zero-copy
3.1.2. Nanomsg (nn)¶
ZeroMQにinspireされています。
3.1.3. SHMEM¶
FairMQではboostとzeromqを使って実装されています。
3.1.4. OFI¶
FairMQではInfiniBandを扱うために使用されます。
3.2. Communication Patterns¶
FairMQDeviceは基本的にはzeromqで使える通信パターンを使用して通信します。
PUSH-PULL
パイプライン型ともよばれます。 1:1で接続している場合は普通のFIFOと同じです。 図 3.3 に示したように複数の接続先をとることが可能です。 なお、queueにこれ以上メッセージを詰めるための空きが無くなった状態(メッセージ・キュー溢れ状態)のことをhigh water mark (HWM)と呼びます。
PUSH socketが複数のPULL socketと接続している場合、PUSH socketがキューに入れたメッセージの送信先は接続中のHWMでないPULL socketから巡回的に選択され均等に送信されます。 すべてのPULL socketがHWM状態でPULL socketが同期sendをした場合、メッセージキューへのメッセージ追加がブロックされます。
PULL socketが複数のPUSH sockeと接続している場合は、空でないPUSH socketから均等に受信します。
PUB-SUB
PUB-SUB patternはラジオのブロードキャストに似ています。 図 3.3 にPUB socketは複数接続しているSUB socketに対して同じメッセージを送信します。 PUB socketはSUB socketについての情報を必要とせず、SUB socketがHWMであってもメッセージを送信可能です。 PUB socketはSUB socketが接続してなくてもメッセージ送信を行うため、SUB socketの接続が遅れた場合にいくつかのメッセージを受信しない可能性があることに注意してください。 なお、SUB socketは複数のPUB socketに接続可能です。
REQ-REP
図 3.5 に示したように、REQ socketがclient、REP socketがserverとなるclient-serverモデルに相当します。 REQ socketは複数のREP socketに接続可能です。
- REQ socketは自分の送信したrequest messageに対するreply messageを受け取るまでは、次のrequest message送信がブロックされます
- REP soecktはrequest messageを受信するまでreply messageの送信がブロックされます
PAIR
exclusive pairです。
- 双方向通信 (bidirectional)
- 接続は1:1のみ
- REQ-REPと違い、メッセージの送受信の順序に制限はありません
マルチスレッド間の同期機構であるセマフォやミューテックスの代わりとしてPAIR patternを使うことを想定しています。
FairMQではこれらの通信パターンを通信の type と呼んでいます。 表 3.1 にFairMQで利用可能なtypeを示します。
| zeromq | nanomsg | shmem | ofi | |
|---|---|---|---|---|
| PAIR | yes | yes | yes | |
| PUSH/PULL | yes | yes | yes | |
| PUB/SUB | yes | yes | no | |
| REQ/REP | yes | yes | yes |
表 3.2 は各transport実装で使用可能なアドレス指定方法のまとめです。
| zeromq | nanomsg | shmem | ofi | comment | |
|---|---|---|---|---|---|
inproc:// |
yes | yes | yes | プロセス内通信(in process) | |
ipc:// |
yes | yes | yes | プロセス間通信(inter process), single node | |
tcp:// |
yes | yes | yes | TCP, single node, multi node |
TCPはリモート、ローカルのどちらの通信にも使えますが、ローカルであればipcの方が高速です。
3.3. Message, ownership¶
Device同士で送受信されるデータは FairMQMessage オブジェクトに格納されてやりとりされます。
FairMQ自身にはdevice同士でやりとりされるデータフォーマットの定義はなく、FairMQMessageには任意のデータを格納可能です。
transport type自身が持つデータフォーマットの制約があるため、例えばFairMQでtransportにZeroMQを使う場合は本来のZeroMQに準拠したデータフォーマットになるためZeroMQ socket同士であれば(他のプログラミング言語のバインディングとも)通信できます。
FairMQMessageオブジェクトの作成にはFairMQDeviceのメンバ関数の NewMessage() を使います。
返値の FairMQMessagePtr は std::uniqut_ptr<FairMQMessage> のエイリアスです。
- 引数なし での呼び出し: メッセージの受信時に受信メッセージの格納先として空のメッセージを作成します。 また、メッセージオブジェクトをコピーする際のコピー先として空のメッセージを作成するときにも使います。
FairMQMessagePtr NewMessage() const;
- 指定サイズ分を確保 : 指定されたバイト数でメッセージ用のバッファを確保します。バッファのポインタを使ってデータを書き込んで下さい。
FairMQMessagePtr NewMessage(const size_t size) const;
- 既存のバッファの所有権を譲渡 : 既存のバッファ(
data)を使ってFairMQMessageオブジェクトを作成します。ZeroMQにおいてはzero-copy操作となります。 リスト 3.3 に示した通り、この場合のNewMessageの引数は4個あります。 第1引数のdataが既存バッファのアドレスで、reinterpret_cast<char*>を使ってchar*へキャストしてください。 第2引数のsizeはバッファのバイト数です。 既存バッファdataの所有権をFairMQMessageへと譲渡することになり、バッファを破棄するための関数を第3引数のffnで与えます。ffnのsignatureはvoid ffn(void *data, void* hint)であり、 NewMessage()の第1引数のdataと 第4引数のhintがffnの引数のdata,hintとして渡されます。hintはffnの実行時に必要なオブジェクトがあればそのアドレスを渡すために使います (例えば、メッセージ作成に使われるdataがhintの内部バッファになっていて、dataだけでなくhintごと削除しなければならない場合にhintを使用します)。 送信用メッセージ作成のほとんどの場合でこのインターフェイスを使用することになります。
using fairmq_free_fn = void(void* data, void* hint); FairMQMessagePtr NewMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const;
dataとhintの使い方について簡単な使用例を示しながら説明します。リスト 3.4 は基本型からメッセージを作成する例です。
uint64_t型(64 bit符号なし整数型)の変数bufferを初期値2286としてnewで作成し、それを元にメッセージを作成しています。 この場合、NewMessageの第1引数dataにはbufferのアドレス(をchar*にキャストしたもの)、第2引数にはバイト数(64 bitなので8 byte)を指定します。 第3引数にはbufferをdeleteするための削除関数を指定します(ここでは削除関数をラムダ式で記述しています)。 この削除関数の第1引数にはNewMessageの第1引数がvoid*として渡されるので、 削除関数内で作成時の型のポインタであるuint64_t*にキャストしてdeleteを行うようにしています。 なお、NewMessageの第4引数(および削除関数の第2引数)であるhintは使用しないので省略するかnullptrを指定します。auto buffer = new uint64_t {2286}; FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer), sizeof(uint64_t), [](void* data, void*) { delete reinterpret_cast<uint64_t*>(data); }, nullptr));リスト 3.5 は基本型配列からメッセージを作成する例です。
uint32_t型(32 bit符号なし整数型)の配列(ここでは要素数100)をnewで作成し、それを元にメッセージを作成しています。NewMessageの第1引数には配列の先頭アドレス(をキャストしたもの)、第2引数には配列のバイト数、第3引数は削除関数を与えています。 この例では配列用のnewでbufferを生成したので、その削除に使うのは配列用のdelete []になります。 この例でもNewMessageの第4引数は使用しません。int n = 100; uint32_t* buffer = new uint32_t[n]; // 配列の内容を変更する操作 ; FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer), n * sizeof(uint32_t), [](void* data, void*) { delete [] reinterpret_cast<uint32_t*>(data); }, nullptr));リスト 3.6 は
std::vector<uint32_t>からメッセージを作成する例です。NewMessageの第1引数には std::vectorの持つ内部バッファの先頭アドレスを渡します。 それにはC++11で追加されたメンバ関数であるdata()を使うと良いでしょう (C++11以前では、std::vectorの内部バッファの先頭アドレスを取得するには&a[0]のようにa[0](std::vectoraの先頭要素)のアドレスを&演算子で取得する記法がありました)。 第2引数は内部バッファのバイト数を渡します。 削除関数ではnewで生成したbufferを削除する必要があるため、NewMessageの第4引数を使って削除関数の第2引数であるhintにstd::vectorのアドレスが渡されるようにします。 削除関数では受け取ったhintをstd::vector<uint32_t>*にキャストしてdeleteを適用するようにします。auto buffer = new std::vector<uint32_t>; // vectorの内容を変更する操作 ; FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer->data()), buffer->size() * sizeof(uint32_t), [](void*, void* hint) { delete reinterpret_cast<std::vector<uint32_t>*>(hint); }, buffer));リスト 3.7 は
std::stringからメッセージを作成する例です。 std::vectorの場合と同様になります。auto buffer = new std::string("hello world"); // new 時点で文字列は空でもよい // 必要であれば文字列の追加・変更などの操作 ; FairMQMessagePtr msg(NewMessage(reinterpret_cast<char*>(buffer->data()), buffer->size(), [](void*, void* hint) { delete reinterpret_cast<std::string*>(hint); }, buffer));
以上では、 NewMessage の説明をしてきました。
以下に挙げる NewSimpleMessage() と NewStaticMessage() といったメンバ関数でも既存バッファからメッセージを作成しますが、リスト 3.3 と違い所有権の移動は発生しません。
NewMessage() と上手く使い分けるといいでしょう。
- 既存のバッファをコピー : 既存バッファ
dataのコピーを作成してメッセージにします。dataの所有権は移動しません。 この関数ではコピー操作が発生するためdataのサイズが小さい場合に適しています。
template <typename T> FairMQMessagePtr NewSimpleMessage(const T& data) const; // この関数内でdataからnewでdataCopyを作成し、 // 3. のNewMessage()の引数リストにおいて、 // ・ dataにdataCopy // ・ ffnにFairMQSimpleMsgCleanup() (dataCopyをdeleteする関数) // を指定しているのと同じことをしています。template <typename T> static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) { delete static_cast<T*>(obj); }
- 既存バッファを参照 : 返値のメッセージは既存バッファ
dataのアドレスを指していますが所有権は移動しません。dataは転送が完了するまで寿命を保証する必要があります (ただし転送は非同期に行われ、いつ完了したかを知る方法はありません)。 このインターフェイスはサードパーティー製ツールが管理する連続したメモリ領域の転送に有用です。dataの指すオブジェクトが、そのデータメンバーとしてポインター(参照)をもつようなshallow-typeの場合は転送されませんので注意が必要です。
3.4. Multi-part message¶
複数のメッセージをひとつに結合して送って、受信した側では結合前の個々のメッセージに分割して利用するという場面がよくあります。
その場合、メッセージの区切りがどこにあるかといったデータフォーマットやプロトコルの設計と、結合されたメッセージを実際に分割処理するコードを書くなどの手間がかかります。
ZeroMQではmulti-part messageという機能を使うと、n個のメッセージの構成を維持したまま送受信可能です。
ZeroMQをベースにしたFairMQにもmulti-part messageの機能があり、FairMQMessageのstd::vectorを扱うための FairMQPart というクラスがあります。
注釈
正確には FairMQPart は std::vector<FairMQMessagePtr> のwrapperクラスです。
なお、 FairMQMessagePtr は std::unique_ptr<FairMQMessage> のエイリアスです。
3.5. Channel¶
channelはFairMQにおける通信のエンドポイントを表現します。
その使い方はUnix network socketに似ています。
deviceは複数のchannelを持つことができ、それぞれ他のdeviceからの接続を待つ bind endpoint か、あるいは他のdeviceへの接続を行う connect endpoint になります。
bind/connectはデータの流れる向きとは関係ありません。
channelは名前とsub channelのindexで管理されます。
同じ名前のchannelにsub channelが複数存在する場合、そのsub channelたちはtransport typeが同じである必要があります。
3.6. Poller¶
pollerはchannelがメッセージを受信したかどうか、あるいはメッセージを送信可能かどうかを確認するために使います。 一つのpollerで複数のchannelを同時に監視できますが、それらは同じtransport typeである必要があります。
4. Configuration¶
4.1. Device Configuration¶
Deviceのconfigurationは主にコマンドラインオプションで与えます。 Device毎にコマンドラインオプションは拡張可能です。
4.2. Communication Channels Configuration¶
channelのconfigurationに使っているparserとして以下の2種類があります。 parserはユーザーで拡張可能です。
4.2.1. JSON Parser¶
ここではJSONファイルによるchannelの設定例を示します。
{
"fairMQOptions": {
"devices": [
{
"id": "sampler1",
"channels": [
{
"name": "data",
"sockets": [
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 1
}
]
}
]
},
{
"id": "sink1",
"channels": [
{
"name": "data",
"sockets": [
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": 1000,
"rcvBufSize": 1000,
"rateLogging": 1
}
]
}
]
}
]
}
}
JSONファイルによるconfigurationは複数のdeviceの設定を記述することができます。
device IDを使ってどのdeviceのパラメータかを特定します。 コマンドラインによるconfigurationでは
--idパラメータとして与えます。 JSONファイルによるconfigurationではidエントリーがそれに対応します。JSONファイルによるconfigurationでは
idの代わりに、keyを使うことで、複数のdeviceに対して共通の設定を適用可能です。socketのオプションには、type, method, address の記述が必須です。 必須項目でないsocketのオプションに対して設定を省略した場合は、そのchannelにおけるデフォルトのパラメータが使用されます。
あるchannelに複数のsub-channelが存在するとき、sub-channelに共通の設定を直接記述可能です。以下に例を示します。
"channels": [{ "name": "data", "type": "push", "method": "bind", "sockets": [{ "address": "tcp://*:5555", "address": "tcp://*:5556", "address": "tcp://*:5557" }] }]
4.2.2. SuboptParser¶
このparserはコマンドラインから直接configurationするのに使います。
key=value という書式をカンマで区切って並べます。
以下に示すのは、 --channel-config というオプションに対してkey/valueペアを列挙したものです。
--chanel-config name=output,type=push,method=bind,address=tcp://127.0.0.1:5555
4.3. コマンドラインオプションのカスタマイズ¶
ここではユーザータスクで使用するパラメータをmain()関数の引数となるコマンドラインオプションで与える方法を説明します。 FairMQではコマンドラインオプションを効率的に解析するツールとしてBoost.Program_optionsライブラリを使用しています。
4.3.1. Boost.Program_options入門¶
- まずはBoost.Program_optionsライブラリの使い方を簡単に説明します。
boost::program_options::options_description型変数を定義し、そのメソッドadd_options()の後に("オプションの名前", boost::program_options::value<オプションの型>(), "オプションの説明")("オプションの名前", boost::program_options::value<オプションの型>()->default_value(デフォルト値), "デフォルト値を持つオプションの説明")("オプションの名前", boost::program_options::value<オプションの型>()->multitoken(), "コンテナのような複数要素を持つオプションの説明")
といった書式を列挙し、最後のステートメントにセミコロン(;)を書きます。
一見奇妙に見えますが、 options.add_options()(オプション1)(オプション2)(オプション3) .... (オプションN); というふうになります(途中で改行OK)。
記述例を リスト 4.1 に示します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | #include <boost/program_options.hpp>
int main (int argc, char* argv[])
{
// define options
boost::program_options::options_description options("hoge option");
options.add_options()
("option1", boost::program_options::value<int>(), "description of option1") // (1) integer without default value
("option2", boost::program_options::value<double>()->default_value(1.234), "description of option2") // (2) double floating point number with default value
("foo", boost::program_options::value<std::string>()->default_value("hoge"), "description of foo") // (3) string with default value
("bar", boost::program_options::value<std::vector<std::string>>()->maultitoken(), "description of bar") // (4) vector of string without default value
("hoge", "description of hoge") // (5) used as a boolean flag
("help,h", "Print this help") // (6) option key of "help" with shortened key of "h"
; // The statement ends with ';'
// parse command line
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, options), vm);
boost::program_options::notify(vm);
// get result of parser
if (vm.count("help")) { // check existence of "help"
std::cout << options << std::endl;
}
auto option1 = vm["option1"].as<int>(); // get "option1" of integer type
auto bar = vm["bar"].as<std::vector<std::string>>(); // get "bar" of vector<string> type
return 0;
}
|
- それぞれ
- "option1" という名前のオプションを
int型変数として受け取る - "option2" は
double型変数で、デフォルト値として1.234を設定 - "foo" は
std::string型 - "bar" は
std::vector<std::string>型 - コマンドラインオプションに"hoge" が含まれているかどうかをboolean flagとして使う
- "help"オプションの短縮形として"h"を使用するための書式
- "option1" という名前のオプションを
というオプションに対応します。仮にこのソースコードをコンパイルして app という名前の実行ファイルを作成した場合、
app --option1 3 --foo su3 app --hoge --bar hello world --option2 0.938 app --help app -h
といった使い方ができます。 それぞれの引数の順番は入れ替えても構いません。
コマンドライン解析の結果は boost::program_options::variables_map 型変数に格納されます。
boost::progrram_options::variables_map 型は std::string をkey, あらゆる値を入れられるheterogeneous containerの any 型コンテナのwrapperである boost::program_options::variable_value をvalueとする std::map<std::string, boost::program_options::variable_value> からの派生クラスです。
コマンドラインオプションとして与えられたかどうかを確認するには std::map の count() が使えます。
値を取得するには、オプション名をkeyとして boost::program_options::variables_map にアクセスし、 boost::program_options::variable_value の as<T>() メソッドを使用します。
注釈
boost::program_options といった名前空間は長いので、実際にソースコードを書く場合は using xxxx; 、 using namespace xxxx; や namespace = xxxx; などで省略形を使用するほうがいいかもしれません。
4.3.2. FairMQにおけるprogram_options¶
オプションの追加は boost::programs_option と同じインターフェイスになります。
後述のdeviceの開発例 にもありますが、main()関数が記述された runFairMQDevice.h にある addCustomOptions という関数の中でユーザータスク用のオプションを記述します。
リスト 4.2 に記述例を載せます。
void addCustomOptions(boost::program_options::options_description& options) { options.add_options() ("hoge", boost::program_options::value<int>(), "hoge option") ("huga", boost::program_options::value<double>(), "huga option"); }
オプションの取得に関しては、 boost::programs_optoin::variables_map のwrapperである FairMQProgOptions というクラスが用意されており、FairMQDeviceのメンバ変数 fConfig としてアクセス可能です。
FairMQProgOptions::GetValue<T>("オプション名") というメソッドを使います。
主に InitTask() でユーザータスクの初期化のためにパラメータを設定するといった用途に使います。
auto foo = fConfig->GetValue<int>("hoge"); auto bar = fConfig->GetValue<double>("huga");
4.4. Introspection¶
deviceの実行バイナリファイナルはどういったオプションが利用可能かを表示させることができます。
-h [ --help]: 全ての利用可能なオプションと、それに対する説明及び各オプションのデフォルト値(もしあれば)を表示します。--print-options: 全ての利用可能なオプションをmachine-readable formatで表示します。<option>:<computed-value>:<<type>>:<description>--print-channels: RegisterChannelEndpoint で登録したchannelの情報を<channel name>:<minimum sub-channels>:<maximum sub- channels>という書式で表示します。
void YourDevice::RegisterChannelEndPoints()
{
// provide channel name, minimum and maximum number of subchannels
RegisterChannelEndpoint("channelA", 1, 10000);
RegisterChannelEndpoint("channelB", 1, 1);
}
6. Logging¶
FairMQLogger.h ファイルから fair::Logger ライブラリをlogging用に読み込んでいます。
log機能は全て LOG(severity) という書式のマクロを介して使用します。
このマクロによるlog出力はスレッドセーフです。
log出力は std::cout やファイル、他にカスタムのsinkに対して行うことができます。
6.1. Log severity¶
logのseverityは以下のAPIで制御します。
fiar::Logger::SetConsoleSeverity("<severity level>");
// and/or
fiar::Logger::SetFileSeverity("<severity level>");
// and/or
fiar::Logger::SetCustomSeverity("<customSinkName>", "<severity level>");
severity levelは次のうちのいずれかになります。
"nolog",
"fatal",
"error",
"warn",
"state",
"info",
"debug",
"debug1",
"debut2",
"debut3",
"debut4",
"trace",
設定したseverityから上のseverity levelに対してloggingが行われます。 fatal seveirtyは常にlogが出力されます。 nologの場合は完全にlog出力を停止します。
FairMQDeviceの場合は、コマンドラインオプションを使って --severity <>level という書式でseverityの設定が可能です。
6.2. Log verbosity¶
log verbosityの制御は以下の関数で行います。
fair::Logger::SetVerbosity("<verbosity level>");
全てのsinkに対して同じ方法になります。 verbosity levelには以の下4通りがあり、それぞれの出力書式も合わせて示してあります。
| level | description |
|---|---|
| low | [severity] message |
| medium | [HH:MM:SS][severity] message |
| high | [process name][HH:MM:SS:μS][severity] message |
| veryhigh | [process name][HH:MM:SS:μS][severity][file:line:function] message |
6.3. Color¶
コンソールにlogを出力する場合はカラー出力のon/offが可能です。
Logger::SetConsoleColor(true); // onにする場合
FairMQDeviceのコマンドラインオプションで与える場合は、 --color <true/false> という書式になります。
6.4. File output¶
logをファイルに出力するには
Logger::InitFileSink("<severity level>", "<filename>", true);
第1引数でseverity levelを設定します。
第2引数で出力先のファイル名を設定します。
第3引数が true の場合はファイル名にタイムスタンプが追加されます。
FairMQDeviceのコマンドラインオプションで与える場合は、 --log-to-file <filename_prefix> という書式になります。
これを指定するとコンソール出力は自動的にoffになります。
6.5. Custom sinks¶
カスタムのログ出力は、 Logger::AddCustomSink("sink name", "<severity>", callback) という関数で追加可能です。
callback 関数は、文字列と fair::LogMetaData 構造体を引数に取る関数(std::function)です。
以下の例はseverityに "trance" を指定することで全てのseverityに対して文字列 content と metadata を出力します。
Logger::AddCustomSink("MyCustomSink", "trace", [](const sttring& content, const LogMetaData& metaData)
{
cout << "content: " << content << endl;
cout << "available metadata: " << endl;
cout << "std::time_t timestamp: " << metadata.timestamp << endl;
cout << "std::chrono::microseconds us: " << metadata.us.count() << endl;
cout << "std::string process_name: " << metadata.process_name << endl;
cout << "std::string file: " << metadata.file << endl;
cout << "std::string line: " << metadata.line << endl;
cout << "std::string func: " << metadata.func << endl;
cout << "std::string severity_name: " << metadata.severity_name << endl;
cout << "fair::Severity severity: " << static_cast<int>(metadata.severity) << endl;
});
Custom sinkだけを有効にしたい場合は、consoleとfileのsinkのseverityを "nolog" に設定します。
7. Development¶
7.1. Device作成例¶
ここではFairMQの枠組みでユーザーのタスクを実行するためのdeviceを作成する方法を説明します。
具体例として、 Sampler と Sink という2つのdeviceを作成します。
(FairMQに用意されているexamples/1-1/Sampler, Sinkと同じです)
- SamplerからSinkに向けて、文字列をメッセージとして転送します。
- どのような文字列を送るかをSamplerのコマンドラインオプション
textで指定します。 - Samplerが送信に使うchannelの名前は
data - Sinkが受信に使うchannelの名前は
data
一般的にはdevice毎に以下の3つのファイルを作成します。 (C++を理解していれば他の構成でも構いません)
- Sampler: Sampler.h, Sampler.cxx, runSampler.cxx
- Sink: Sink.h, Sink.cxx, runSink.cxx
7.1.1. Sampler deviceの作成¶
Samplerクラスのヘッダーファイルです。クラスの定義を記述します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | #ifndef Sampler_h
#define Sampler_h
#include <FairMQDevice.h>
class Sampler : public FairMQDevice
{
public:
Sampler();
~Sampler() = default;
protected:
void InitTask() override;
bool ConditionalRun() override;
private:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
#endif
|
続いてSamplerクラスの実装ファイルです。メンバ関数の定義を記述します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | #include <thread>
#include <chrono>
#include "Sampler.h"
using namespace std::chrono_literals; // (C++14)
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fText = fConfig->GetValue<std::string>("text");
fMaxIteration = fConfig->GetValue<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
std::string* text = new std::string(fText);
FairMQMessagePtr msg(NewMessage(const_cast<char*>(text->data()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
LOG(info) << "Sending \"" << fText << "\"";
if (Send(msg, "data") < 0) {
return false;
}
else if (fMaxIterations >0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
std::this_thread::sleep_for(1s);
}
|
ConditionalRun() では text からメッセージオブジェクトの msg を作成して Send() で送信しています。
Send() は送信キューに詰めるだけで、キューがいっぱいでなければすぐに関数が返ります。
実際に送信動作が行われるのは一度 ConditionalRun() から抜けた後かもしれません。
text の寿命は ConditionalRun() から抜けた後でも有効であることを保証しなければならないため、 text はヒープ領域に作成し、 NewMessage() で msg に所有権を移動させています。
また、送信完了時に破棄しなければならないオブジェクトは NewMesage() の第1引数として与えたデータ本体である text->data() (の指す領域)ではなく、 new で作成した text であるため、 text を適切に delete するための関数をラムダ式で与え、 NewMessage() の第4引数(=破棄関数の第2引数。ここでは void* object )として text を与えています。
最後にmain()関数が記述された runFairMQDevice.h をインクルードして、Samplerを生成するfactory関数 getDevice() と、Samplerのタスク実行に必要なオプションを追加する関数 addCustomOptions() を記述します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #include <runFairMQDevice.h>
#include "Sampler.h"
void addCustomOptions(boost::program_options::options_description& options)
{
options.add_options()
("text", boost::program_options::value<std::string>()->default_value("Help"), "Text to send out")
("max-iterations", boost::program_options::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new Sampler;
}
|
7.1.2. Sink deviceの作成¶
Sinkクラスのヘッダーファイルです。クラスの定義を記述します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | #ifndef Sink_h
#define Sink_h
#include <FairMQDevice.h>
class Sink : public FairMQDevice
{
public:
Sink();
~Sink() = default;
protected:
void InitTask() override;
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
#endif
|
続いてSinkクラスの実装ファイルです。メンバ関数の定義を記述します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | #include "Sink.h"
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
Ondata("data", &Sink:HandleData);
}
void Sink::InitTask()
{
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
}
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << std::string(reinterpret_cast<char*>(msg->Data()), msg->GetSize()) << std::endl;
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
|
Sinkのコンストラクタで data channelでメッセージを受信したら HandleData() を実行するように OnData() を使って HandleData() をcallbackとして登録しています。
OnData を使うとメッセージを Receive する部分を記述しなくて済むので簡潔になります。
受信したメッセージは HandleData() の引数として渡されるので、それを処理する部分を記述します。
この例ではログに出力するだけの処理です。
受信したデータを格納している FairMQMessagePtr は std:unique_ptr<FairMQMessage> のエイリアスで、どこからも参照されなくなった時点で自動的に破棄されます。
最後にmain()関数が記述された runFairMQDevice.h をインクルードして、Sinkを生成するfactory関数 getDevice() と、Sinkのタスク実行に必要なオプションを追加する関数 addCustomOptions() を記述します。
1 2 3 4 5 6 7 8 9 10 11 12 13 | #include <runFairMQDevice.h>
#include "Sink.h"
void addCustomOptions(boost::program_options::options_description& options)
{
options.add_options()
("max-iterations", boost::program_options::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{
return new Sink;
}
|
7.1.3. Sampler - Sinkの実行¶
7.2. CMakeLists.txtの書き方¶
CMakeLists.txtの書き方を簡単に説明します。
7.3. その他のdeviceの例¶
7.4. カスタムのPluginの作成¶
7.5. Testing¶
- unit test (単体テスト)では数10のプロセスによる本格的な分散システムを起動するのが適さないことがよくあります。
- deviceをインスタンシエートしないでテストする例
transport factoryをヒープ上に作成する場合は、channelの正常なシャットダウンのためにchannelを先に破棄する必要があります。
deviceで使える Send/Receive と New*Message/New*Poller APIは全てchannelにもあります。





