Hatena::ブログ(Diary)

めもめも

2012-05-02

GlusterFS Swift APIとオリジナルSwiftの差分


何の話かというと


「日本OpenStackユーザ会 第4回勉強会」で、この資料を使って、GlusterFSのSwift APIを紹介したところ、実際にソースを見たSwiftの有識者の方からコメントをいただきました。曰く、


「まだ力ずくで対応させている感があるので、しばらく様子を見たほうがよさそうかと」


・・・というわけで、みなさんにも今後の発展の様子をみていただくために、現状がどの程度力ずくなのか、私の方で、最新のソースからdiffった結果を残しておきたいと思います。


比較するのは、オリジナルの「swift-1.4.8.tar.gz」とGlusterFS用にパッチのあたった下記のRPMです。


# rpm -qa | grep swift
gluster-swift-container-1.4.8-1.el6.noarch
gluster-swift-account-1.4.8-1.el6.noarch
gluster-swift-object-1.4.8-1.el6.noarch
gluster-swift-proxy-1.4.8-1.el6.noarch
gluster-swift-1.4.8-1.el6.noarch
gluster-swift-plugin-1.0-1.noarch

注1)これらのRPMは、まだ一般公開されていません。少し古いバージョンが下記にあるので、ソースを見たい方はこちらを参照ください。(ただし、この記事に記載の最新版とは相当な差異があるようです。)


swift-1.4.5-1.noarch.rpm

swift-plugin-1.0.-1.el6.noarch.rpm


注2)筆者自身は、オリジナルのSwiftのコードに精通しているわけではないので、diffで出てくる差分のどこまでが、GlusterFS対応のためのものか、比較対象のSwiftのコードがずれているために出てきたものかは分かっていません。とりあえず、以下は、すべての差分はGlusterFS対応のものと考えて書いています。勘違いなどあれば、ご指摘ください。




前提知識


先の資料のイラストでは、「proxy01」というサーバがありますが、実際にはこの中に、proxy/account/container/objectの各サーバが同居して動いています。


# ps -efw | grep swif[t]
root     21434     1  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-proxy-server /etc/swift/proxy-server.conf
root     21435     1  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-container-server /etc/swift/container-server/1.conf
root     21436     1  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-account-server /etc/swift/account-server/1.conf
root     21437     1  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-object-server /etc/swift/object-server/1.conf
root     21454 21437  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-object-server /etc/swift/object-server/1.conf
root     21455 21436  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-account-server /etc/swift/account-server/1.conf
root     21456 21435  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-container-server /etc/swift/container-server/1.conf
root     21457 21434  0 07:06 ?        00:00:00 /usr/bin/python /usr/bin/swift-proxy-server /etc/swift/proxy-server.conf

それぞれのサーバ機能に対して、バックエンドにGlusterFSを使用するようにパッチがあたっています。



Proxyサーバ


まず、Proxyサーバについての結果です。


# diff -up swift/proxy/server.py /usr/lib/python2.6/site-packages/swift/proxy/server.py
--- swift/proxy/server.py	2012-03-22 09:24:46.000000000 +0000
+++ /usr/lib/python2.6/site-packages/swift/proxy/server.py	2012-04-26 07:17:58.000000000 +0000
@@ -1,4 +1,5 @@
 # Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -53,11 +54,20 @@ from webob import Request, Response
 
 from swift.common.ring import Ring
 from swift.common.utils import cache_from_env, ContextPool, get_logger, \
-    get_remote_client, normalize_timestamp, split_path, TRUE_VALUES
+    get_remote_client, normalize_timestamp, split_path, TRUE_VALUES, \
+    plugin_enabled
 from swift.common.bufferedhttp import http_connect
-from swift.common.constraints import check_metadata, check_object_creation, \
-    check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
-    MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
+
+if plugin_enabled():
+    from swift.plugins.constraints import check_object_creation, \
+        MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
+else:
+    from swift.common.constraints import check_object_creation, \
+        MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
+
+from swift.common.constraints import check_metadata, check_utf8, \
+    CONTAINER_LISTING_LIMIT
+
 from swift.common.exceptions import ChunkReadTimeout, \
     ChunkWriteTimeout, ConnectionTimeout

Proxyサーバには以外と手は加えられていません。「plugin_enabled()」は、下記のパッチで追加された関数です。


# diff -up swift/common/utils.py /usr/lib/python2.6/site-packages/swift/common/utils.py
--- swift/common/utils.py	2012-03-22 09:24:46.000000000 +0000
+++ /usr/lib/python2.6/site-packages/swift/common/utils.py	2012-04-26 07:17:58.000000000 +0000
@@ -1,4 +1,5 @@
 # Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -1138,3 +1139,12 @@ def streq_const_time(s1, s2):
     for (a, b) in zip(s1, s2):
         result |= ord(a) ^ ord(b)
     return result == 0
+
+def plugin_enabled():
+    swift_conf = ConfigParser()
+    swift_conf.read(os.path.join('/etc/swift', 'swift.conf'))
+    try:
+        return swift_conf.get('DEFAULT', 'Enable_plugin', 'no') in TRUE_VALUES
+    except NoOptionError, NoSectionError:
+        return False
+

「swift.conf」で下記のように、「Enable_plugin = yes」を指定すると・・・


# cat /etc/swift/swift.conf 
[DEFAULT]
Enable_plugin = yes

[swift-hash]
# random unique string that can never change (DO NOT LOSE)
swift_hash_path_suffix = gluster

「swift/plugin/」以下のGlusterFS専用コード(下記)が利用されるようになるという仕掛けです。


# ls /usr/lib/python2.6/site-packages/swift/plugins/*py
/usr/lib/python2.6/site-packages/swift/plugins/DiskDir.py
/usr/lib/python2.6/site-packages/swift/plugins/DiskFile.py
/usr/lib/python2.6/site-packages/swift/plugins/Glusterfs.py
/usr/lib/python2.6/site-packages/swift/plugins/__init__.py
/usr/lib/python2.6/site-packages/swift/plugins/constraints.py
/usr/lib/python2.6/site-packages/swift/plugins/utils.py


Accountサーバ


次は、Accountサーバです。


まず、設定ファイルを見ると、pipelineで「egg:swift#gluster」というフィルタが追加されています。


# cat /etc/swift/account-server/1.conf 
[DEFAULT]
devices = /srv/1/node
mount_check = false
bind_port = 6012
user = root
log_facility = LOG_LOCAL2

[pipeline:main]
pipeline = gluster account-server

[app:account-server]
use = egg:swift#account

[filter:gluster]
use = egg:swift#gluster

[account-replicator]
vm_test_mode = yes

[account-auditor]

[account-reaper]

egg-infoを見ると、こいつの実体は、「siwft/common/middleware/glutser.py」と分かります。


# cat /usr/lib/python2.6/site-packages/swift-1.4.8-py2.6.egg-info/entry_points.txt 
[paste.app_factory]
account = swift.account.server:app_factory
object = swift.obj.server:app_factory
container = swift.container.server:app_factory
proxy = swift.proxy.server:app_factory

[paste.filter_factory]
formpost = swift.common.middleware.formpost:filter_factory
cname_lookup = swift.common.middleware.cname_lookup:filter_factory
recon = swift.common.middleware.recon:filter_factory
healthcheck = swift.common.middleware.healthcheck:filter_factory
tempurl = swift.common.middleware.tempurl:filter_factory
gluster = swift.common.middleware.gluster:filter_factory
name_check = swift.common.middleware.name_check:filter_factory
catch_errors = swift.common.middleware.catch_errors:filter_factory
ratelimit = swift.common.middleware.ratelimit:filter_factory
memcache = swift.common.middleware.memcache:filter_factory
staticweb = swift.common.middleware.staticweb:filter_factory
domain_remap = swift.common.middleware.domain_remap:filter_factory
tempauth = swift.common.middleware.tempauth:filter_factory
swift3 = swift.common.middleware.swift3:filter_factory

egg-infoの仕組みについては、このあたりを参照ください。


中身をまるごと掲載すると次の通り。ポイントは、「env['fs_onject']」に、Glusterfsクラスのインスタンスが入るという点。


/usr/lib/python2.6/site-packages/swift/common/middleware/gluster.py

# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from swift.common.utils import get_logger, plugin_enabled
from swift import plugins
from ConfigParser import ConfigParser

class Gluster_plugin(object):
    """
    Update the environment with keys that reflect Gluster_plugin enabled
    """

    def __init__(self, app, conf):
        self.app = app
        self.conf = conf
        self.fs_name = 'Glusterfs'
        self.logger = get_logger(conf, log_route='gluster')

    def __call__(self, env, start_response):
        if not plugin_enabled():
            return self.app(env, start_response)
        env['Gluster_enabled'] =True
# fs_objectには、swift/plugins/Glusterfs.pyで定義されたクラスGlusterfsへの参照が入る。
        fs_object = getattr(plugins, self.fs_name, False)
        if not fs_object:
            raise Exception('%s plugin not found', self.fs_name)

# env['fs_onject']には、Glusterfsのインスタンスが入る。
        env['fs_object'] = fs_object()
        fs_conf = ConfigParser()
        if fs_conf.read('/etc/swift/fs.conf'):
            try:
# env['root']には、ボリュームのマウントポイントが入る。
                env['root'] = fs_conf.get ('DEFAULT', 'mount_path')
            except NoSectionError, NoOptionError:
                self.logger.exception(_('ERROR mount_path not present'))
        return self.app(env, start_response)

def filter_factory(global_conf, **local_conf):
    """Returns a WSGI filter app for use with paste.deploy."""
    conf = global_conf.copy()
    conf.update(local_conf)

    def gluster_filter(app):
        return Gluster_plugin(app, conf)
    return gluster_filter

気になるGluterfsクラスの実体ですが、下記のpydocから分かるように、GlusterFSボリュームのマウント/アンマウントと、事前定義されているボリュームのリストを取得するメソッドを提供しています。


# pydoc /usr/lib/python2.6/site-packages/swift/plugins/Glusterfs.py | cat
・・・
CLASSES
    __builtin__.object
        Glusterfs
    
    class Glusterfs(__builtin__.object)
     |  Methods defined here:
     |  
     |  __init__(self)
     |  
     |  get_export_from_account_id(self, account)
     |  
     |  get_export_list(self)
     |  
     |  get_export_list_local(self)
     |  
     |  get_export_list_remote(self)
     |  
     |  mount(self, account)
     |  
     |  unmount(self, mount_path)
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors defined here:
     |  
     |  __dict__
     |      dictionary for instance variables (if defined)
     |  
     |  __weakref__
     |      list of weak references to the object (if defined)

特に、「get_export_from_account_id()」では、「アカウント名=AUTH_<ボリューム名>」という対応でのアカウント名とボリューム名の紐付けがハードコードされていることが分かります。


    def get_export_from_account_id(self, account):
        if not account:
            print 'account is none, returning'
            raise AttributeError

        for export in self.get_export_list():
            if account == 'AUTH_' + export:
                return export

ここで、いよいよ、Accountサーバ本体のdiffです。


# diff -up swift/account/server.py /usr/lib/python2.6/site-packages/swift/account/server.py
--- swift/account/server.py	2012-03-22 09:24:46.000000000 +0000
+++ /usr/lib/python2.6/site-packages/swift/account/server.py	2012-04-26 07:17:58.000000000 +0000
@@ -1,4 +1,5 @@
 # Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -31,7 +32,7 @@ import simplejson
 
 from swift.common.db import AccountBroker
 from swift.common.utils import get_logger, get_param, hash_path, \
-    normalize_timestamp, split_path, storage_directory
+    normalize_timestamp, split_path, storage_directory, plugin_enabled
 from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \
     check_mount, check_float, check_utf8
 from swift.common.db_replicator import ReplicatorRpc
@@ -39,6 +40,8 @@ from swift.common.db_replicator import R
 
 DATADIR = 'accounts'
 
+if plugin_enabled():
+    from swift.plugins.DiskDir import DiskAccount
 
 class AccountController(object):
     """WSGI controller for the account server."""
@@ -52,8 +55,12 @@ class AccountController(object):
             self.mount_check, logger=self.logger)
         self.auto_create_account_prefix = \
             conf.get('auto_create_account_prefix') or '.'
+        self.fs_object = None
 
     def _get_account_broker(self, drive, part, account):
+        if self.fs_object:
+            return DiskAccount(self.root, account, self.fs_object);
+
         hsh = hash_path(account)
         db_dir = storage_directory(DATADIR, part, hsh)
         db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
@@ -121,9 +128,15 @@ class AccountController(object):
                 if broker.is_deleted():
                     return HTTPConflict(request=req)
             metadata = {}
-            metadata.update((key, (value, timestamp))
-                for key, value in req.headers.iteritems()
-                if key.lower().startswith('x-account-meta-'))
+            if not self.fs_object:
+                metadata.update((key, (value, timestamp))
+                    for key, value in req.headers.iteritems()
+                    if key.lower().startswith('x-account-meta-'))
+            else:
+                metadata.update((key, value)
+                    for key, value in req.headers.iteritems()
+                    if key.lower().startswith('x-account-meta-'))
+
             if metadata:
                 broker.update_metadata(metadata)
             if created:
@@ -153,6 +166,9 @@ class AccountController(object):
             broker.stale_reads_ok = True
         if broker.is_deleted():
             return HTTPNotFound(request=req)
+        if self.fs_object:
+            broker.list_containers_iter(None, None,None,
+                                        None, None)
         info = broker.get_info()
         headers = {
             'X-Account-Container-Count': info['container_count'],
@@ -164,9 +180,16 @@ class AccountController(object):
             container_ts = broker.get_container_timestamp(container)
             if container_ts is not None:
                 headers['X-Container-Timestamp'] = container_ts
-        headers.update((key, value)
-            for key, (value, timestamp) in broker.metadata.iteritems()
-            if value != '')
+        if not self.fs_object:
+            headers.update((key, value)
+                for key, (value, timestamp) in broker.metadata.iteritems()
+                if value != '')
+        else:
+            headers.update((key, value)
+                for key, value in broker.metadata.iteritems()
+                if value != '')
+
+
         return HTTPNoContent(request=req, headers=headers)
 
     def GET(self, req):
@@ -190,9 +213,15 @@ class AccountController(object):
             'X-Account-Bytes-Used': info['bytes_used'],
             'X-Timestamp': info['created_at'],
             'X-PUT-Timestamp': info['put_timestamp']}
-        resp_headers.update((key, value)
-            for key, (value, timestamp) in broker.metadata.iteritems()
-            if value != '')
+        if not self.fs_object:
+            resp_headers.update((key, value)
+                for key, (value, timestamp) in broker.metadata.iteritems()
+                if value != '')
+        else:
+            resp_headers.update((key, value)
+                for key, value in broker.metadata.iteritems()
+                if value != '')
+
         try:
             prefix = get_param(req, 'prefix')
             delimiter = get_param(req, 'delimiter')
@@ -224,6 +253,7 @@ class AccountController(object):
                                   content_type='text/plain', request=req)
         account_list = broker.list_containers_iter(limit, marker, end_marker,
                                                    prefix, delimiter)
+
         if out_content_type == 'application/json':
             json_pattern = ['"name":%s', '"count":%s', '"bytes":%s']
             json_pattern = '{' + ','.join(json_pattern) + '}'
@@ -298,15 +328,29 @@ class AccountController(object):
             return HTTPNotFound(request=req)
         timestamp = normalize_timestamp(req.headers['x-timestamp'])
         metadata = {}
-        metadata.update((key, (value, timestamp))
-            for key, value in req.headers.iteritems()
-            if key.lower().startswith('x-account-meta-'))
+        if not self.fs_object:
+            metadata.update((key, (value, timestamp))
+                for key, value in req.headers.iteritems()
+                if key.lower().startswith('x-account-meta-'))
+        else:
+            metadata.update((key, value)
+                for key, value in req.headers.iteritems()
+                if key.lower().startswith('x-account-meta-'))
         if metadata:
             broker.update_metadata(metadata)
         return HTTPNoContent(request=req)
 
+    def plugin(self, env):
+        if env.get('Gluster_enabled', False):
+            self.fs_object = env.get('fs_object')
+            self.root = env.get('root')
+            self.mount_check = False
+        else:
+            self.fs_object = None
+
     def __call__(self, env, start_response):
         start_time = time.time()
+        self.plugin(env)
         req = Request(env)
         self.logger.txn_id = req.headers.get('x-trans-id', None)
         if not check_utf8(req.path_info):

下の方から見ていくと、まず、「__call__」で関数コールされた時に、「plugin()」関数を呼んで、その中で、「self.fs_object(Glusterfsインスタンス)」「self.root(マウントポイント)」をセットしていることが分かります。後は、「self.fs_object」の有無で、GlusterFSの場合かどうかの処理の分岐が追加されています。


ポイントとなる変更部分は、次のところで、account_brokerとして、独自のDiskAccountインスタンスが使用される点です。


     def _get_account_broker(self, drive, part, account):
+        if self.fs_object:
+            return DiskAccount(self.root, account, self.fs_object);
+

こいつは、たとえば、アカウントが保持するコンテナのリストを返す際には、マウントポイント以下のディレクトリをスキャンするなどの動きをします。つまり、コンテナ情報を独自のデータベースに保持するのではなく、GlusterFSのボリュームの中を直接参照するような動きです。同様に、コンテナのメタデータは、コンテナに対応するディレクトリの拡張属性に直接格納しています。DiskAccountは「plugins/DiskDir.py」で定義されているので、詳細はそちらを読んでください。


その他の変更は、メタデータ更新の際に必要だった「timestamp」がなくなっている点です。オリジナルのコードでは、同時に更新リクエストが入った際に、データベースを「遅いものがち」で更新するために、更新リクエストにタイムスタンプを付与していましたが、今の場合、そのあたりは、GlusterFS自身の排他制御にまかせているようです。



Containerサーバ


次は、Containerサーバについての結果です。


# diff -up swift/container/server.py /usr/lib/python2.6/site-packages/swift/container/server.py
--- swift/container/server.py	2012-03-22 09:24:46.000000000 +0000
+++ /usr/lib/python2.6/site-packages/swift/container/server.py	2012-04-26 07:17:58.000000000 +0000
@@ -1,4 +1,5 @@
 # Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -31,7 +32,8 @@ from webob.exc import HTTPAccepted, HTTP
 
 from swift.common.db import ContainerBroker
 from swift.common.utils import get_logger, get_param, hash_path, \
-    normalize_timestamp, storage_directory, split_path, validate_sync_to
+    normalize_timestamp, storage_directory, split_path, validate_sync_to, \
+    plugin_enabled
 from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
     check_mount, check_float, check_utf8
 from swift.common.bufferedhttp import http_connect
@@ -40,6 +42,9 @@ from swift.common.db_replicator import R
 
 DATADIR = 'containers'
 
+if plugin_enabled():
+    from swift.plugins.DiskDir import DiskDir
+
 
 class ContainerController(object):
     """WSGI Controller for the container server."""
@@ -62,6 +67,7 @@ class ContainerController(object):
             ContainerBroker, self.mount_check, logger=self.logger)
         self.auto_create_account_prefix = \
             conf.get('auto_create_account_prefix') or '.'
+        self.fs_object = None
 
     def _get_container_broker(self, drive, part, account, container):
         """
@@ -73,6 +79,11 @@ class ContainerController(object):
         :param container: container name
         :returns: ContainerBroker object
         """
+        if self.fs_object:
+            return DiskDir(self.root, drive, part, account,
+                           container, self.logger,
+                           fs_object = self.fs_object)
+
         hsh = hash_path(account, container)
         db_dir = storage_directory(DATADIR, part, hsh)
         db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
@@ -211,10 +222,18 @@ class ContainerController(object):
                 if broker.is_deleted():
                     return HTTPConflict(request=req)
             metadata = {}
-            metadata.update((key, (value, timestamp))
-                for key, value in req.headers.iteritems()
-                if key.lower() in self.save_headers or
-                   key.lower().startswith('x-container-meta-'))
+            #Note: check the structure of req.headers
+            if not self.fs_object:
+                metadata.update((key, (value, timestamp))
+                    for key, value in req.headers.iteritems()
+                    if key.lower() in self.save_headers or
+                       key.lower().startswith('x-container-meta-'))
+            else:
+                metadata.update((key, value)
+                   for key, value in req.headers.iteritems()
+                   if key.lower() in self.save_headers or
+                      key.lower().startswith('x-container-meta-'))
+
             if metadata:
                 if 'X-Container-Sync-To' in metadata:
                     if 'X-Container-Sync-To' not in broker.metadata or \
@@ -222,6 +241,7 @@ class ContainerController(object):
                             broker.metadata['X-Container-Sync-To'][0]:
                         broker.set_x_container_sync_points(-1, -1)
                 broker.update_metadata(metadata)
+
             resp = self.account_update(req, account, container, broker)
             if resp:
                 return resp
@@ -245,6 +265,11 @@ class ContainerController(object):
         broker.stale_reads_ok = True
         if broker.is_deleted():
             return HTTPNotFound(request=req)
+
+        if self.fs_object:
+            broker.list_objects_iter(None, None, None, None,
+                                     None, None)
+
         info = broker.get_info()
         headers = {
             'X-Container-Object-Count': info['object_count'],
@@ -252,10 +277,17 @@ class ContainerController(object):
             'X-Timestamp': info['created_at'],
             'X-PUT-Timestamp': info['put_timestamp'],
         }
-        headers.update((key, value)
-            for key, (value, timestamp) in broker.metadata.iteritems()
-            if value != '' and (key.lower() in self.save_headers or
-                                key.lower().startswith('x-container-meta-')))
+        if not self.fs_object:
+            headers.update((key, value)
+                for key, (value, timestamp) in broker.metadata.iteritems()
+                if value != '' and (key.lower() in self.save_headers or
+                                    key.lower().startswith('x-container-meta-')))
+        else:
+            headers.update((key, value)
+                for key, value in broker.metadata.iteritems()
+                if value != '' and (key.lower() in self.save_headers or
+                                    key.lower().startswith('x-container-meta-')))
+
         return HTTPNoContent(request=req, headers=headers)
 
     def GET(self, req):
@@ -268,6 +300,7 @@ class ContainerController(object):
                                 request=req)
         if self.mount_check and not check_mount(self.root, drive):
             return Response(status='507 %s is not mounted' % drive)
+
         broker = self._get_container_broker(drive, part, account, container)
         broker.pending_timeout = 0.1
         broker.stale_reads_ok = True
@@ -280,10 +313,17 @@ class ContainerController(object):
             'X-Timestamp': info['created_at'],
             'X-PUT-Timestamp': info['put_timestamp'],
         }
-        resp_headers.update((key, value)
-            for key, (value, timestamp) in broker.metadata.iteritems()
-            if value != '' and (key.lower() in self.save_headers or
-                                key.lower().startswith('x-container-meta-')))
+        if not self.fs_object:
+            resp_headers.update((key, value)
+                for key, (value, timestamp) in broker.metadata.iteritems()
+                if value != '' and (key.lower() in self.save_headers or
+                               key.lower().startswith('x-container-meta-')))
+        else:
+            resp_headers.update((key, value)
+                for key, value in broker.metadata.iteritems()
+                if value != '' and (key.lower() in self.save_headers or
+                               key.lower().startswith('x-container-meta-')))
+
         try:
             path = get_param(req, 'path')
             prefix = get_param(req, 'prefix')
@@ -414,10 +454,17 @@ class ContainerController(object):
             return HTTPNotFound(request=req)
         timestamp = normalize_timestamp(req.headers['x-timestamp'])
         metadata = {}
-        metadata.update((key, (value, timestamp))
-            for key, value in req.headers.iteritems()
-            if key.lower() in self.save_headers or
-               key.lower().startswith('x-container-meta-'))
+        if not self.fs_object:
+            metadata.update((key, (value, timestamp))
+                for key, value in req.headers.iteritems()
+                if key.lower() in self.save_headers or
+                   key.lower().startswith('x-container-meta-'))
+        else:
+             metadata.update((key, value)
+                for key, value in req.headers.iteritems()
+                if key.lower() in self.save_headers or
+                   key.lower().startswith('x-container-meta-'))
+
         if metadata:
             if 'X-Container-Sync-To' in metadata:
                 if 'X-Container-Sync-To' not in broker.metadata or \
@@ -427,8 +474,19 @@ class ContainerController(object):
             broker.update_metadata(metadata)
         return HTTPNoContent(request=req)
 
+    def plugin(self, env):
+        if env.get('Gluster_enabled', False):
+            self.fs_object = env.get('fs_object')
+            if not self.fs_object:
+                raise NoneTypeError
+            self.root = env.get('root')
+            self.mount_check = False
+        else:
+            self.fs_object = None
+
     def __call__(self, env, start_response):
         start_time = time.time()
+        self.plugin(env)
         req = Request(env)
         self.logger.txn_id = req.headers.get('x-trans-id', None)
         if not check_utf8(req.path_info):

基本的な仕掛けは、Accountサーバと同じようです。container_brokerに独自のDiskDirインスタンスを返しています。



Objectサーバ


最後にObjectサーバです。


# diff -up swift/obj/server.py /usr/lib/python2.6/site-packages/swift/obj/server.py
--- swift/obj/server.py	2012-03-22 09:24:46.000000000 +0000
+++ /usr/lib/python2.6/site-packages/swift/obj/server.py	2012-04-26 07:17:58.000000000 +0000
@@ -1,4 +1,5 @@
 # Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -26,6 +27,7 @@ from hashlib import md5
 from tempfile import mkstemp
 from urllib import unquote
 from contextlib import contextmanager
+from ConfigParser import ConfigParser
 
 from webob import Request, Response, UTC
 from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
@@ -37,16 +39,23 @@ from eventlet import sleep, Timeout, tpo
 
 from swift.common.utils import mkdirs, normalize_timestamp, \
     storage_directory, hash_path, renamer, fallocate, \
-    split_path, drop_buffer_cache, get_logger, write_pickle
+    split_path, drop_buffer_cache, get_logger, write_pickle, \
+    plugin_enabled
 from swift.common.bufferedhttp import http_connect
-from swift.common.constraints import check_object_creation, check_mount, \
-    check_float, check_utf8
+if plugin_enabled():
+    from swift.plugins.constraints import check_object_creation
+    from swift.plugins.utils import X_TYPE, X_OBJECT_TYPE, FILE, DIR, MARKER_DIR, \
+         OBJECT, DIR_TYPE, FILE_TYPE
+else:
+    from swift.common.constraints import check_object_creation
+
+from swift.common.constraints import  check_mount, check_float, check_utf8
+
 from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
     DiskFileNotExist
 from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \
     quarantine_renamer
 
-
 DATADIR = 'objects'
 ASYNCDIR = 'async_pending'
 PICKLE_PROTOCOL = 2
@@ -339,6 +348,9 @@ class DiskFile(object):
                 raise
         raise DiskFileNotExist('Data File does not exist.')
 
+if plugin_enabled():
+    from swift.plugins.DiskFile import Gluster_DiskFile
+
 
 class ObjectController(object):
     """Implements the WSGI application for the Swift Object Server."""
@@ -377,6 +389,17 @@ class ObjectController(object):
             'expiring_objects'
         self.expiring_objects_container_divisor = \
             int(conf.get('expiring_objects_container_divisor') or 86400)
+        self.fs_object = None
+
+    def get_DiskFile_obj(self, path, device, partition, account, container, obj,
+                         logger, keep_data_fp=False, disk_chunk_size=65536):
+        if self.fs_object:
+            return Gluster_DiskFile(path, device, partition, account, container,
+                            obj, logger, keep_data_fp,
+                            disk_chunk_size, fs_object = self.fs_object);
+        else:
+            return DiskFile(path, device, partition, account, container,
+                            obj, logger, keep_data_fp, disk_chunk_size)
 
     def async_update(self, op, account, container, obj, host, partition,
                      contdevice, headers_out, objdevice):
@@ -493,7 +516,7 @@ class ObjectController(object):
                                   content_type='text/plain')
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
-        file = DiskFile(self.devices, device, partition, account, container,
+        file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
                         obj, self.logger, disk_chunk_size=self.disk_chunk_size)
 
         if 'X-Delete-At' in file.metadata and \
@@ -548,7 +571,7 @@ class ObjectController(object):
         if new_delete_at and new_delete_at < time.time():
             return HTTPBadRequest(body='X-Delete-At in past', request=request,
                                   content_type='text/plain')
-        file = DiskFile(self.devices, device, partition, account, container,
+        file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
                         obj, self.logger, disk_chunk_size=self.disk_chunk_size)
         orig_timestamp = file.metadata.get('X-Timestamp')
         upload_expiration = time.time() + self.max_upload_time
@@ -580,12 +603,29 @@ class ObjectController(object):
             if 'etag' in request.headers and \
                             request.headers['etag'].lower() != etag:
                 return HTTPUnprocessableEntity(request=request)
-            metadata = {
-                'X-Timestamp': request.headers['x-timestamp'],
-                'Content-Type': request.headers['content-type'],
-                'ETag': etag,
-                'Content-Length': str(os.fstat(fd).st_size),
-            }
+            content_type = request.headers['content-type']
+            if self.fs_object and not content_type:
+                content_type = FILE_TYPE
+            if not self.fs_object:
+                metadata = {
+                    'X-Timestamp': request.headers['x-timestamp'],
+                    'Content-Type': request.headers['content-type'],
+                    'ETag': etag,
+                    'Content-Length': str(os.fstat(fd).st_size),
+                }
+            else:
+                metadata = {
+                    'X-Timestamp': request.headers['x-timestamp'],
+                    'Content-Type': request.headers['content-type'],
+                    'ETag': etag,
+                    'Content-Length': str(os.fstat(fd).st_size),
+                    X_TYPE: OBJECT,
+                    X_OBJECT_TYPE: FILE,
+                }
+
+            if self.fs_object and \
+                request.headers['content-type'].lower() == DIR_TYPE:
+                metadata.update({X_OBJECT_TYPE: MARKER_DIR})
             metadata.update(val for val in request.headers.iteritems()
                     if val[0].lower().startswith('x-object-meta-') and
                     len(val[0]) > 14)
@@ -626,9 +666,9 @@ class ObjectController(object):
                         content_type='text/plain')
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
-        file = DiskFile(self.devices, device, partition, account, container,
-                        obj, self.logger, keep_data_fp=True,
-                        disk_chunk_size=self.disk_chunk_size)
+        file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
+                             obj, self.logger, keep_data_fp=True,
+                             disk_chunk_size=self.disk_chunk_size)
         if file.is_deleted() or ('X-Delete-At' in file.metadata and
                 int(file.metadata['X-Delete-At']) <= time.time()):
             if request.headers.get('if-match') == '*':
@@ -702,7 +742,7 @@ class ObjectController(object):
             return resp
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
-        file = DiskFile(self.devices, device, partition, account, container,
+        file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
                         obj, self.logger, disk_chunk_size=self.disk_chunk_size)
         if file.is_deleted() or ('X-Delete-At' in file.metadata and
                 int(file.metadata['X-Delete-At']) <= time.time()):
@@ -744,7 +784,7 @@ class ObjectController(object):
         if self.mount_check and not check_mount(self.devices, device):
             return Response(status='507 %s is not mounted' % device)
         response_class = HTTPNoContent
-        file = DiskFile(self.devices, device, partition, account, container,
+        file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
                         obj, self.logger, disk_chunk_size=self.disk_chunk_size)
         if 'x-if-delete-at' in request.headers and \
                 int(request.headers['x-if-delete-at']) != \
@@ -797,9 +837,18 @@ class ObjectController(object):
             raise hashes
         return Response(body=pickle.dumps(hashes))
 
+    def plugin(self, env):
+        if env.get('Gluster_enabled', False):
+            self.fs_object = env.get('fs_object')
+            self.devices = env.get('root')
+            self.mount_check = False
+        else:
+            self.fs_object = None
+
     def __call__(self, env, start_response):
         """WSGI Application entry point for the Swift Object Server."""
         start_time = time.time()
+        self.plugin(env)
         req = Request(env)
         self.logger.txn_id = req.headers.get('x-trans-id', None)
         if not check_utf8(req.path_info):

ここでは、container_brokerがオリジナルの「DiskFile」から、「Gluster_DiskFile」に置き換えられています。これは、「plugins/DiskFile.py」で定義されており、中身を見ると、オブジェクトの保存処理がGlusterFS上のファイルの書き込み処理に置き換えられています。特に、GlusterFSをつかう場合は、ディレクリごとアップロードすると、ディレクトリ構造を保持したまま、ボリューム内に格納されますが、そのあたりの処理もこの中で行われています。



まとめ


というわけで、GlusterFS対応のためにどのクラスが差し替えられているかは、だいたい分かりました。あとは差し替えられたクラスの中身を追っていけば、GlusterFS対応時の独自の動きがわかるはずです。このあたりはまたの機会に。。。。

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/enakai00/20120502/1335963519
リンク元