2008-10-05
passengerを読み解く(例のあのプロセスとの通信) vol4
謎
passengerを読み解く(例のあのプロセスが動くまで) vol2 - I am Cruby!
で起動したプロセスは何を行っているんだろうか.
I am Cruby!
ではApacheが生成したプロセスと通信している姿が見られた.
今回はその謎を追う事に.
プロセスの起動先
dup2(fds[0], SERVER_SOCKET_FD); ... // execute! execlp( #if 0 "valgrind", "valgrind", #else m_serverExecutable.c_str(), #endif m_serverExecutable.c_str(), toString(Passenger::getLogLevel()).c_str(), m_spawnServerCommand.c_str(), m_logFile.c_str(), m_rubyCommand.c_str(), m_user.c_str(), statusReportFIFO.c_str(), NULL);
では何が呼び出されたのか.
ApplicationPoolServerExecutableが実行されている.
以下はそのmain部
int main(int argc, char *argv[]) { try { Server server(SERVER_SOCKET_FD, atoi(argv[1]), argv[2], argv[3], argv[4], argv[5], argv[6]); return server.start(); } catch (const exception &e) { P_ERROR(e.what()); return 1; } }
SERVER_SOCKET_FDは 3 である.親プロセスでdup2した事を思い出して欲しい.
serverをnewしている.
Server()
Server(int serverSocket, const unsigned int logLevel, const string &spawnServerCommand, const string &logFile, const string &rubyCommand, const string &user, const string &statusReportFIFO) : pool(spawnServerCommand, logFile, rubyCommand, user) { Passenger::setLogLevel(logLevel); this->serverSocket = serverSocket; this->statusReportFIFO = statusReportFIFO; }
poolについては次回詳しく説明する.
その他はserverSocketを設定したり,状態を設定するファイル名を設定したり.
statusReportFIFOに入っている文字列は以下のパターンである.
/tmp/passenger_status.プロセス名.fifo
server.start()
コードは以下.さすがに長い...orz
int Server::start() { ... /* (1) */ if (!statusReportFIFO.empty()) { statusReportThread = ptr( new Thread( bind(&Server::statusReportThreadMain, this), 1024 * 128 ) ); } while (!this_thread::interruption_requested()) { /* (2) */ int fds[2], ret; char x; /* (3) */ // The received data only serves to wake up the server socket, // and is not important. ret = InterruptableCalls::read(serverSocket, &x, 1); ... // We have an incoming connect request from an // ApplicationPool client. do { ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fds); } while (ret == -1 && errno == EINTR); if (ret == -1) { throw SystemException("Cannot create an anonymous Unix socket", errno); } ... /* (4) */ MessageChannel(serverSocket).writeFileDescriptor(fds[1]); InterruptableCalls::close(fds[1]); /* (5) */ ClientPtr client(new Client(*this, fds[0])); ... { mutex::scoped_lock l(lock); clients.insert(client); } ... /* (6) */ client->start(client); } ... return 0; }
このメソッドを起動して,main()は終了する.
つまりこのコードがforkで起動したプロセスの肝である.
(1)状態を報告するスレッドの起動
if (!statusReportFIFO.empty()) { statusReportThread = ptr( new Thread( bind(&Server::statusReportThreadMain, this), 1024 * 128 ) ); }
この部分では,このプロセスの状態を報告するスレッドを起動する様である.
ここは処理の本質ではなさそうなので,深くは読まない.名前で判断.
(2)メインループ
while (!this_thread::interruption_requested()) { /* (2) */
これも名前で判断.
「リクエストが中断されるまで,処理を繰り返す」
という意味だと思う.
(3)通信待ち受け
// The received data only serves to wake up the server socket, // and is not important. ret = InterruptableCalls::read(serverSocket, &x, 1);
serverSocketは起動した親プロセスと繋がっているソケットである.
ココで親プロセスからの通信を待ち受ける.
実は他プロセスから起動するコードは前回紹介済み.
applicationPoolServer->connect()内の話である.
ApplicationPoolPtr connect() {
...
// Write some random data to wake up the server.
channel.writeRaw("x", 1);
...
}
ココ.この処理が実行されるのは,Apacheが生成した子プロセスなので,親プロセスは一緒であるが無関係である.
ただ,親プロセスでつないでおいたソケットはそのまま,子プロセスに引き継がれるので,無関係ではないか.
腹違いの兄弟?
(4)他プロセスへのファイルディスクプリタ受け渡し
MessageChannel(serverSocket).writeFileDescriptor(fds[1]); InterruptableCalls::close(fds[1]);
ここでは自分を起動したプロセスさんに,socketpairで新しく生成したの片方のファイルディスクプリタを渡す.
/** * Pass a file descriptor. This only works if the underlying file * descriptor is a Unix socket. * * @param fileDescriptor The file descriptor to pass. * @throws SystemException Something went wrong during file descriptor passing. * @throws boost::thread_interrupted * @pre <tt>fileDescriptor >= 0</tt> * @see readFileDescriptor() */ void writeFileDescriptor(int fileDescriptor) { struct msghdr msg; struct iovec vec; char dummy[1]; char control_data[CMSG_SPACE(sizeof(int))]; struct cmsghdr *control_header; int ret; msg.msg_name = NULL; msg.msg_namelen = 0; /* Linux and Solaris require msg_iov to be non-NULL. */ dummy[0] = '\0'; vec.iov_base = dummy; vec.iov_len = sizeof(dummy); msg.msg_iov = &vec; msg.msg_iovlen = 1; msg.msg_control = (caddr_t) &control_data; msg.msg_controllen = sizeof(control_data); msg.msg_flags = 0; control_header = CMSG_FIRSTHDR(&msg); control_header->cmsg_level = SOL_SOCKET; control_header->cmsg_type = SCM_RIGHTS; control_header->cmsg_len = CMSG_LEN(sizeof(int)); memcpy(CMSG_DATA(control_header), &fileDescriptor, sizeof(int)); ret = InterruptableCalls::sendmsg(fd, &msg, 0); }
sendmsg(2)でfdの受け渡しをしている.
受け取り側は前回紹介ずみである.
ApplicationPoolPtr connect() {
...
clientConnection = channel.readFileDescriptor();
...
}
違うプロセス間でfdを受け渡す方法としてはポピュラな方法なんだろうか.
ファイル記述子をUnixドメインソケット経由で渡す - bkブログによると,詳細Unixの第15章に乗っているらしい.
おぉ.glibc内部でも使っているのか.勉強になるなぁ.
(5)Client生成
ClientPtr client(new Client(*this, fds[0]));
clientというのは何だろう?
/***************************************** * Client *****************************************/ /** * Represents a single ApplicationPool client, connected to this server. * * @invariant * The life time of a Client object is guaranteed to be less than * that of its associated Server object. */ class Client { private: ... public: /** * Create a new Client object. * * @param the_server The Server object that this Client belongs to. * @param connection The connection to the ApplicationPool client. * * @note * <tt>connection</tt> will be closed upon destruction */ Client(Server &the_server, int connection) : server(the_server), fd(connection), channel(connection) { thr = NULL; lastSessionID = 0; }
コメントを訳すと,
「一つのApplicationPoolのクライアントを表し,このサーバと繋がっている.」
どうやら,ApplicationPoolとApacheのサブプロセスをつないでいるもののよう.
さっきの,ApplicationPoolServerのpoolメンバの事であろう.
clientを生成してからの続き
ClientPtr client(new Client(*this, fds[0])); ... { mutex::scoped_lock l(lock); clients.insert(client); }
mutexをロックして,clientsにinsertしている.
clientsは?
class Server { private: ... set<ClientPtr> clients;
おぉ,setか.
setはinsertなどすると自動でソートしてくれるクラスだった様な.
C++編(標準ライブラリ) 第9章 set
そうみたい.ふむふむ.
ClientPtrはClientのshared_ptrである.GCのにおいが少し.
(6)Clientのスタート!
/** * Start the thread for handling the connection with this client. * * @param self The iterator of this Client object inside the server's * <tt>clients</tt> set. This is used to remove itself from * the <tt>clients</tt> set once the client has closed the * connection. */ void start(const weak_ptr<Client> self) { thr = new Thread( bind(&Client::threadMain, this, self), CLIENT_THREAD_STACK_SIZE ); }
コメントでは,「clientがコネクトをハンドルするようのThreadを起動する」と書いてある.
ココで一つ疑問.new ThreadってC++にあるんすか?
System.h
using namespace boost; .... /** * Thread class with system call interruption support. */ class Thread: public thread { public: template <class F> explicit Thread(F f, unsigned int stackSize = 0) : thread(f, stackSize) {} /** * Interrupt the thread. This method behaves just like * boost::thread::interrupt(), but will also respect the interruption * points defined in Passenger::InterruptableCalls. * * Note that an interruption request may get lost, depending on the * current execution point of the thread. Thus, one should call this * method in a loop, until a certain goal condition has been fulfilled. * interruptAndJoin() is a convenience method that implements this * pattern. */ void interrupt() { int ret; thread::interrupt(); do { ret = pthread_kill(native_handle(), INTERRUPTION_SIGNAL); } while (ret == EINTR); } /** * Keep interrupting the thread until it's done, then join it. * * @throws boost::thread_interrupted */ void interruptAndJoin() { bool done = false; while (!done) { interrupt(); done = timed_join(posix_time::millisec(10)); } } };
boostのthreadを継承したクラスなのか.boostって知らなかったや.勉強になるなぁ.
あと,bindだけど,関数ポインタみたいなものを作るよう.
なので,新しく作成されたThreadで走るメソッドは以下.
/** * The entry point of the thread that handles the client connection. */ void threadMain(const weak_ptr<Client> self) { vector<string> args; try { while (!this_thread::interruption_requested()) { ... if (!channel.read(args)) { // Client closed connection. break; } ... P_TRACE(4, "Client " << this << ": received message: " << toString(args)); if (args[0] == "get" && args.size() == 7) { processGet(args); } else if (args[0] == "close" && args.size() == 2) { processClose(args); } else if (args[0] == "clear" && args.size() == 1) { processClear(args); } else if (args[0] == "setMaxIdleTime" && args.size() == 2) { processSetMaxIdleTime(args); } else if (args[0] == "setMax" && args.size() == 2) { processSetMax(args); } else if (args[0] == "getActive" && args.size() == 1) { processGetActive(args); } else if (args[0] == "getCount" && args.size() == 1) { processGetCount(args); } else if (args[0] == "setMaxPerApp" && args.size() == 2) { processSetMaxPerApp(atoi(args[1])); } else if (args[0] == "getSpawnServerPid" && args.size() == 1) { processGetSpawnServerPid(args); } else { processUnknownMessage(args); break; } args.clear(); } } catch (const boost::thread_interrupted &) { P_TRACE(2, "Client thread " << this << " interrupted."); } catch (const exception &e) { P_TRACE(2, "Uncaught exception in ApplicationPoolServer client thread:\n" << " message: " << toString(args) << "\n" << " exception: " << e.what()); } /* (1) */ mutex::scoped_lock l(server.lock); ClientPtr myself(self.lock()); if (myself != NULL) { server.clients.erase(myself); } }
この"get" や, "setMaxIdel"などは前回の記事で紹介した,以下の部分とリンクする.
virtual void setMax(unsigned int max) { MessageChannel channel(data->server); mutex::scoped_lock l(data->lock); channel.write("setMax", toString(max).c_str(), NULL); }
これはinitChild内部のserver->connect()で呼び出されるメソッドである.
ふむ,なるほど.こうやってパラメータを設定したりするのか.
(1)
Clientが用済みになった場合,最終的にはself.lock()でweak_ptrからポインタを取り出して,clientsから削除している.
lock()の挙動についてはこちら.letsboost::weak_ptr
削除されれば参照カウンタが0になり,deleteされるはずである.
まとめ
- 起動されたプロセスの役割
次回は
ApplicationPoolServerのpoolメンバに何が入っているのか!
について.
世界を止めない
日立がアプリサーバ新版、Full GC回避し「世界を止めない」
セッションオブジェクトとかを別の領域に確保するらしい.
そんなに大量のセッションオブジェクトを使う機会があるのかな.
数万個以上のセッションオブジェクト?
具体例がよく浮かばんのう.
少数のセッションオブジェクトだったら,きっと普通に使った方が速いだろうし.
でも,コード見てみたいな.
無理かぁ.


