この記事では、メッセージキューを用いたプロセス間通信について紹介します。
メッセージキューによるプロセス間通信は、Unix系システムでよく使われる方法の一つです。
メッセージキューを使うことで、プロセス間でデータをやり取りすることができます。ここでは、C++でのメッセージキューによるプロセス間通信の概要と簡単な実装例を紹介します。
メッセージキューの概要
メッセージキューは、プロセス間通信を実現するための手法の一つです。メッセージキューでは、キュー構造を利用して、異なるプロセス間でデータを送受信します。
メッセージキューの特性
- キュー構造
-
メッセージはFIFO(First In, First Out)順で処理されるため、送信順序が保持されます。
- 識別子
-
各メッセージには識別子が付与され、受信時に特定の識別子を持つメッセージを取得することができます。
- 永続性
-
メッセージキューはカーネルに存在するため、プロセスが終了してもメッセージは残ります。
- 同期
-
送信側と受信側が非同期に動作でき、送信プロセスがメッセージの受信を待たずに続行できます。
実装例
POSIXメッセージキューを使用したC++での簡単な実装例を紹介します。以下のコードでは、メッセージキューの作成、メッセージの送信、受信を行います。
#include <iostream>
#include <cstring>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
void send_message() {
const char* queue_name = "/example_queue";
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
attr.mq_curmsgs = 0;
mqd_t mq = mq_open(queue_name, O_CREAT | O_WRONLY, 0644, &attr);
if (mq == -1) {
perror("mq_open");
return;
}
const char* message = "Hello, World!";
if (mq_send(mq, message, strlen(message) + 1, 0) == -1) {
perror("mq_send");
return;
}
mq_close(mq);
}
void receive_message() {
const char* queue_name = "/example_queue";
mqd_t mq = mq_open(queue_name, O_RDONLY);
if (mq == -1) {
perror("mq_open");
return;
}
char buffer[256];
ssize_t bytes_read;
bytes_read = mq_receive(mq, buffer, 256, NULL);
if (bytes_read == -1) {
perror("mq_receive");
return;
}
std::cout << "Received message: " << buffer << std::endl;
mq_close(mq);
mq_unlink(queue_name);
}
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <send|receive>" << std::endl;
return 1;
}
if (std::strcmp(argv[1], "send") == 0) {
send_message();
} else if (std::strcmp(argv[1], "receive") == 0) {
receive_message();
} else {
std::cerr << "Invalid argument. Use 'send' or 'receive'." << std::endl;
return 1;
}
return 0;
}
ヘッダファイルのインクルード
#include <iostream>
#include <cstring>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
標準の入出力、文字列操作、メッセージキュー関連のヘッダ、ファイル制御、ファイル状態、POSIXシステムコールを扱うためのヘッダをインクルードします。
メッセージ送信関数
void send_message() {
const char* queue_name = "/example_queue";
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
attr.mq_curmsgs = 0;
mqd_t mq = mq_open(queue_name, O_CREAT | O_WRONLY, 0644, &attr);
if (mq == -1) {
perror("mq_open");
return;
}
const char* message = "Hello, World!";
if (mq_send(mq, message, strlen(message) + 1, 0) == -1) {
perror("mq_send");
return;
}
mq_close(mq);
}
const char* queue_name = "/example_queue";
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
attr.mq_curmsgs = 0;
queue_name
変数にメッセージキューの名前を設定しています。この名前は、システム全体で一意である必要があります。
メッセージキューの設定値は以下です。
mq_flags
:フラグを0に設定(デフォルトの動作)mq_maxmsg
:キューに保持できる最大メッセージ数を10に設定mq_msgsize
:1メッセージの最大サイズを256バイトに設定mq_curmsgs
:現在のメッセージ数(初期値は0)
mqd_t mq = mq_open(queue_name, O_CREAT | O_WRONLY, 0644, &attr);
if (mq == -1) {
perror("mq_open");
return;
}
次に、mq_open
関数でメッセージキューを開きます。mq_open
関数の引数は以下です。
queue_name
:キューの名前O_CREAT | O_WRONLY
:キューが存在しない場合は作成し、書き込み専用で開く0644
:キューのパーミッション(オーナーは読み書き可能、他は読み取り可能)&attr
:メッセージキューの属性
const char* message = "Hello, World!";
if (mq_send(mq, message, strlen(message) + 1, 0) == -1) {
perror("mq_send");
return;
}
message
変数に送信するメッセージを入れます。
mq_send
関数でメッセージをキューに送信します。各引数は以下です。
mq
:メッセージキューのディスクリプタmessage
:送信するメッセージstrlen(message) + 1
:メッセージの長さ(NULL文字を含む)0
:メッセージの優先度(ここではデフォルトの0を使用)
mq_close(mq);
最後にメッセージキューを閉じます。
メッセージ受信関数
void receive_message() {
const char* queue_name = "/example_queue";
mqd_t mq = mq_open(queue_name, O_RDONLY);
if (mq == -1) {
perror("mq_open");
return;
}
char buffer[256];
ssize_t bytes_read;
bytes_read = mq_receive(mq, buffer, 256, NULL);
if (bytes_read == -1) {
perror("mq_receive");
return;
}
std::cout << "Received message: " << buffer << std::endl;
mq_close(mq);
mq_unlink(queue_name);
}
const char* queue_name = "/example_queue";
mqd_t mq = mq_open(queue_name, O_RDONLY);
if (mq == -1) {
perror("mq_open");
return;
}
queue_name
変数にメッセージキューの名前を設定します。送信側と同じ名前に設定する必要があります。
mq_open
関数でメッセージキューを開きます。各引数は以下です。
queue_name
:キューの名前O_RDONLY
:読み取り専用で開く
char buffer[256];
ssize_t bytes_read;
メッセージ受信用バッファを宣言します。
bytes_read = mq_receive(mq, buffer, 256, NULL);
if (bytes_read == -1) {
perror("mq_receive");
return;
}
mq_receive
関数でメッセージキューからメッセージを受信します。各引数は以下です。
mq
:メッセージキューのディスクリプタbuffer
:受信したメッセージを格納するバッファ256
:バッファのサイズNULL
:メッセージの優先度を無視するために使用
std::cout << "Received message: " << buffer << std::endl;
buffer
に格納された受信メッセージを標準出力に表示します。
mq_close(mq);
mq_unlink(queue_name);
メッセージキューを閉じ、削除します。
メイン関数
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <send|receive>" << std::endl;
return 1;
}
if (std::strcmp(argv[1], "send") == 0) {
send_message();
} else if (std::strcmp(argv[1], "receive") == 0) {
receive_message();
} else {
std::cerr << "Invalid argument. Use 'send' or 'receive'." << std::endl;
return 1;
}
return 0;
}
メイン関数では、引数が “send” であれば send_message
関数を呼び出し、”receive” であれば receive_message
関数を呼び出すようにします。
実行結果
まず、以下のコマンドで送信側を実行します。
./<実行ファイル名> send
次に受信側を実行します。
./<実行ファイル名> receive
以下のように受信側でメッセージを受信できていることが確認できます。
Received message: Hello, World!