qpid+gluster-swift でファイル転送
元々qpidはMQでファイル転送等の大量データのやりとりのツールではないので、、大容量データ転送ともろもろの後続ジョブ制御を兼ねて、qpidとgluster-swift(gluster UFO)を組み合わせてみた。
実現したい内容
実現したい内容は以下になる。
- 1. 巨大ファイル(数GB程度を想定)の転送に対応出来る
- 2. 送信元は、送信先の後処理が終わったかを確認せずに、次の処理に移れる
- 3. 送信先は、ファイルが到着したらすぐに、事後処理を始める
- (4: オプション)各計算ノードは巨大ファイルを載せられるだけのローカルディスクは持たず、直接glusterをマウントして使用する
- 5. 計算ノードが停止していても、送信元はファイル送信を完了出来る
※ 1, 4, 5を実現するために、gluster-swift, 2, 3, 5 を実施するためにqpidを使う形になる。
実施方法
次の順で実施する。
- 1. 送信元はgluster-swiftに対して HTTPでアップロードする。この後、qpidの特定のキューに対して、contentにglusterのファイルパスを指定したメッセージを書き込む。(ファイルパスはその時の時刻等を含めて、重複しないようにしておく)
- 3. 送信先は事後処理を書き込んだスクリプトをqpidに対してアタッチしておく。(メッセージが届くと、MQの消費処理の一環として、事後処理が流れる)
※ ただし、実際に使ってみたところ、gluster-swiftでアップロードしたファイルが必ず 600, rootで設定されるため、その他のユーザーで事後処理を行うために、次の処理も必要になった。(計算ノードのジョブをrootで動かす場合は不要, また、gluster-swift経由でchmodが出来ればこの処理は不要になるはず。。)
- 2. gluster-swift のノードで、 glusterfsに置かれたファイルにchmodを行うプロセスをqpidのserver 処理として稼働させておく。このプロセスは1のキューにアタッチしておき、chmodを実施したあと、3のキューにメッセージを配送する。
なお、テスト環境では次の3つのスクリプトで実施している。
- 1: transferfile.sh
- 2: peformchown.py
- 3: consumefile.py
※ サンプルファイルはブログの後半に添付
※ 同じディレクトリに python-qpidサンプルのspout を置かないと上手く動かない。(swiftコマンドも必要)
まとめ
オープンソースだけだと、ファイル転送/その後の事後処理(非同期) を上手く実施するための仕組みがあまりなかった気がするが(知らないだけかも)、さしあたりこの組み合わせで動くものが出来そうな気がしている。
※ プロプライエタリ製品を含めると結構ある
価格も安く済みそうだし、おすすめかも。
スクリプトサンプル
transferfile.sh
起動方法: $ ./transferfile.sh //// filename=aaa container=container$(date +%s) queuename=myqueuepre replyto=myqueue # upload file swift --auth=https://localhost/auth/v1.0 --user=test:tester --key=testing upload ${container} ${filename} if [[ 0 -ne $? ]] then echo "Cant upload file" exit 31 fi # Then say that I uploaded file ./spout --reply-to=${replyto} ${queuename} ${container}/${filename} if [[ 0 -ne $? ]] then echo "Cant MQ" exit 32 fi
performchown.py(serverからの差分のみ):
起動方法: (root)# ./performchown.py myqueuepre //// $ diff -u server performchown.py --- server 2013-04-06 19:26:30.089313772 +0900 +++ performchown.py 2013-04-06 20:41:20.259411750 +0900 @@ -18,7 +18,7 @@ # under the License. # -import optparse, sys, traceback +import optparse, sys, traceback, os from qpid.messaging import * from qpid.util import URL from subprocess import Popen, STDOUT, PIPE @@ -68,6 +68,13 @@ result = Message(content) else: result = Message("unrecognized message type: %s" % msg_type) + ####### + accountname='test' + print 'Use this:', msg.content + filepath='/mnt/gluster-object/AUTH_%s/%s' % (accountname, msg.content) + os.chmod(filepath, 0644) + result=msg + ####### return result try: @@ -87,7 +94,7 @@ if snd is not None: snd.close() ssn.acknowledge() -except ReceiveError, e: +except ReceiverError, e: print e except KeyboardInterrupt: pass
consumefile.py(drainからの差分のみ):
起動方法: $ ./consumefile.py -f myqueue //// $ diff -u drain consumefile.py --- drain 2013-04-06 19:26:30.084988056 +0900 +++ consumefile.py 2013-04-06 21:00:45.910323049 +0900 @@ -18,7 +18,7 @@ # under the License. # -import optparse +import optparse, os from qpid.messaging import * from qpid.util import URL from qpid.log import enable, DEBUG, WARN @@ -85,6 +85,16 @@ try: msg = rcv.fetch(timeout=timeout) print opts.format % Formatter(msg) + ####### + accountname='test' + print 'Use this:', msg.content + filepath='/mnt/gluster-object/AUTH_%s/%s' % (accountname, msg.content) + f=file(filepath) + print f.read() + f.close() + ## + os.system('swift --auth=https://localhost/auth/v1.0 --user=test:tester --key=testing delete %s' % msg.content.split('/')[0]) + ####### count += 1 ssn.acknowledge() except Empty: