qpid+gluster-swift でファイル転送

元々qpidはMQでファイル転送等の大量データのやりとりのツールではないので、、大容量データ転送ともろもろの後続ジョブ制御を兼ねて、qpidとgluster-swift(gluster UFO)を組み合わせてみた。

実現したい内容

実現したい内容は以下になる。

  • 1. 巨大ファイル(数GB程度を想定)の転送に対応出来る
  • 2. 送信元は、送信先の後処理が終わったかを確認せずに、次の処理に移れる
  • 3. 送信先は、ファイルが到着したらすぐに、事後処理を始める
  • (4: オプション)各計算ノードは巨大ファイルを載せられるだけのローカルディスクは持たず、直接glusterをマウントして使用する
  • 5. 計算ノードが停止していても、送信元はファイル送信を完了出来る

※ 1, 4, 5を実現するために、gluster-swift, 2, 3, 5 を実施するためにqpidを使う形になる。

実施方法

次の順で実施する。

  • 1. 送信元はgluster-swiftに対して HTTPでアップロードする。この後、qpidの特定のキューに対して、contentにglusterのファイルパスを指定したメッセージを書き込む。(ファイルパスはその時の時刻等を含めて、重複しないようにしておく)
  • 3. 送信先は事後処理を書き込んだスクリプトをqpidに対してアタッチしておく。(メッセージが届くと、MQの消費処理の一環として、事後処理が流れる)

※ ただし、実際に使ってみたところ、gluster-swiftでアップロードしたファイルが必ず 600, rootで設定されるため、その他のユーザーで事後処理を行うために、次の処理も必要になった。(計算ノードのジョブをrootで動かす場合は不要, また、gluster-swift経由でchmodが出来ればこの処理は不要になるはず。。)

  • 2. gluster-swift のノードで、 glusterfsに置かれたファイルにchmodを行うプロセスをqpidのserver 処理として稼働させておく。このプロセスは1のキューにアタッチしておき、chmodを実施したあと、3のキューにメッセージを配送する。

なお、テスト環境では次の3つのスクリプトで実施している。

  • 1: transferfile.sh
  • 2: peformchown.py
  • 3: consumefile.py

※ サンプルファイルはブログの後半に添付
※ 同じディレクトリに python-qpidサンプルのspout を置かないと上手く動かない。(swiftコマンドも必要)

まとめ

オープンソースだけだと、ファイル転送/その後の事後処理(非同期) を上手く実施するための仕組みがあまりなかった気がするが(知らないだけかも)、さしあたりこの組み合わせで動くものが出来そうな気がしている。
プロプライエタリ製品を含めると結構ある
価格も安く済みそうだし、おすすめかも。

スクリプトサンプル

transferfile.sh

起動方法:
 $ ./transferfile.sh
////
filename=aaa
container=container$(date +%s)
queuename=myqueuepre
replyto=myqueue

# upload file
swift --auth=https://localhost/auth/v1.0 --user=test:tester --key=testing  upload ${container} ${filename}
if [[ 0 -ne $? ]]
then
 echo "Cant upload file"
 exit 31
fi

# Then say that I uploaded file
./spout --reply-to=${replyto} ${queuename} ${container}/${filename}
if [[ 0 -ne $? ]]
then
 echo "Cant MQ"
 exit 32
fi

performchown.py(serverからの差分のみ):

起動方法:
 (root)# ./performchown.py myqueuepre
////

$ diff -u server performchown.py 
--- server      2013-04-06 19:26:30.089313772 +0900
+++ performchown.py     2013-04-06 20:41:20.259411750 +0900
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import optparse, sys, traceback
+import optparse, sys, traceback, os
 from qpid.messaging import *
 from qpid.util import URL
 from subprocess import Popen, STDOUT, PIPE
@@ -68,6 +68,13 @@
     result = Message(content)
   else:
     result = Message("unrecognized message type: %s" % msg_type)
+    #######
+    accountname='test'
+    print 'Use this:', msg.content
+    filepath='/mnt/gluster-object/AUTH_%s/%s' % (accountname, msg.content)
+    os.chmod(filepath, 0644)
+    result=msg
+    #######
   return result
 
 try:
@@ -87,7 +94,7 @@
     if snd is not None:
       snd.close()
     ssn.acknowledge()
-except ReceiveError, e:
+except ReceiverError, e:
   print e
 except KeyboardInterrupt:
   pass


consumefile.py(drainからの差分のみ):

起動方法:
 $ ./consumefile.py -f myqueue
////
$ diff -u drain consumefile.py 
--- drain       2013-04-06 19:26:30.084988056 +0900
+++ consumefile.py      2013-04-06 21:00:45.910323049 +0900
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import optparse
+import optparse, os
 from qpid.messaging import *
 from qpid.util import URL
 from qpid.log import enable, DEBUG, WARN
@@ -85,6 +85,16 @@
     try:
       msg = rcv.fetch(timeout=timeout)
       print opts.format % Formatter(msg)
+      #######
+      accountname='test'
+      print 'Use this:', msg.content
+      filepath='/mnt/gluster-object/AUTH_%s/%s' % (accountname, msg.content)
+      f=file(filepath)
+      print f.read()
+      f.close()
+      ##
+      os.system('swift --auth=https://localhost/auth/v1.0 --user=test:tester --key=testing delete %s' % msg.content.split('/')[0])
+      #######
       count += 1
       ssn.acknowledge()
     except Empty: