ソケット通信がユーザスレッドにディスパッチされる仕組み

ローカルファイルに書いてたメモをこっちにはって捨てる。



ソケット通信はすべてgScoketThreadというスレッドで行われる。

ただし、コネクションを作るのはUIスレッド。

nsHttpChannel::AsyncOpen を呼び出すことで始まる。
呼び出すと nsHttpChannel::Connect が呼ばれる。

~/mozilla/netwerk/protocol/http/src/nsHttpChannel.cpp

nsHttpChannel::Connect(PRBool firstTime)
{
... snip ....

    // hit the net...
    rv = SetupTransaction();
    if (NS_FAILED(rv)) return rv;

    rv = gHttpHandler->InitiateTransaction(mTransaction, mPriority);
    if (NS_FAILED(rv)) return rv;

    return mTransactionPump->AsyncRead(this, nsnull);
}

~/mozilla/netwerk/base/src/nsInputStreamPump.cpp

nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
{
.... snip ....
    // grab event queue (we must do this here by contract, since all notifications
    // must go to the thread which called AsyncRead)
    mTargetThread = do_GetCurrentThread();
    NS_ENSURE_STATE(mTargetThread);

    rv = EnsureWaiting();
    if (NS_FAILED(rv)) return rv;

mTargetThread に自分のスレッドを設定します。自分はUIスレッドです。
asyncReadなので、ここの中では読みません。コールバックで呼ばれて読みます。

nsresult
nsInputStreamPump::EnsureWaiting()
{
    // no need to worry about multiple threads... an input stream pump lives
    // on only one thread.

    if (!mWaiting) {
        nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
        if (NS_FAILED(rv)) {
            NS_ERROR("AsyncWait failed");
            return rv;
        }
        mWaiting = PR_TRUE;
    }
    return NS_OK;
}

ここでAsyncWaitです。InputStreamPumpの名前のとおり、データをInputStreamから読める状態になったらwaitから抜けます。


ネットワークからのパケット到着を管理しているのはここ。これがgSocketThreadの本体です。

~/mozilla/netwerk/base/src/nsSocketTransportService2.cpp

スレッドのmain関数である::Runで同じようなループをしています。

nsSocketTransportService::Run()
{
... snip ...
    // hook ourselves up to observe event processing for this thread
    nsCOMPtr<nsIThreadInternal> threadInt = do_QueryInterface(thread);
    threadInt->SetObserver(this);

    for (;;) {
... snip ...
        // wait for and process the next pending event
        NS_ProcessNextEvent(thread);
    }

自分自身をobserverにしているのでNS_ProcessNextEventを呼び出すたびにOnProcessNextEventが呼ばれます。

nsSocketTransportService::OnProcessNextEvent(nsIThreadInternal *thread,
                                             PRBool mayWait, PRUint32 depth)
{
    // DoPollIteration doesn't support being called recursively.  This case
    // should only happen when someone (e.g., PSM) is issuing a synchronous
    // proxy call from this thread to the main thread.
    if (depth > 1)
        return NS_OK;

    // Favor processing existing sockets before other events.
    DoPollIteration(PR_FALSE);

    PRBool val;
    while (mayWait && NS_SUCCEEDED(thread->HasPendingEvents(&val)) && !val)
        DoPollIteration(PR_TRUE);

    return NS_OK;
}

いかにもな名前が出てきました。

nsSocketTransportService::DoPollIteration(PRBool wait)
{
... snip ...
        //
        // service "active" sockets...
        //
        for (i=0; i<PRInt32(mActiveCount); ++i) {
            PRPollDesc &desc = mPollList[i+1];
            SocketContext &s = mActiveList[i];
            if (n > 0 && desc.out_flags != 0) {
                s.mElapsedTime = 0;
                s.mHandler->OnSocketReady(desc.fd, desc.out_flags);
            }
            

activeになったソケットに対してOnSocketReadyが呼ばれます。
これでnsInputStreamPumpに結びついている(と思う)nsInputStream::OnSocketReadyが呼ばれます。

~/mozilla/netwerk/base/src/nsSocketTransport2.cpp

// called on the socket transport thread...
//
//   condition : failure code if socket has been closed
//
void
nsSocketInputStream::OnSocketReady(nsresult condition)
{
    LOG(("nsSocketInputStream::OnSocketReady [this=%x cond=%x]\n",
        this, condition));

    NS_ASSERTION(PR_GetCurrentThread() == gSocketThread, "wrong thread");

    nsCOMPtr<nsIInputStreamCallback> callback;
    {
        nsAutoLock lock(mTransport->mLock);

        // update condition, but be careful not to erase an already
        // existing error condition.
        if (NS_SUCCEEDED(mCondition))
            mCondition = condition;

        // ignore event if only waiting for closure and not closed.
        if (NS_FAILED(mCondition) || !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
            callback = mCallback;
            mCallback = nsnull;
            mCallbackFlags = 0;
        }
    }

    if (callback)
        callback->OnInputStreamReady(this);
}

nCallbackにはあらかじめ nsInputStreamReadyEvent が設定されてます。
inputStreamがreadyになったときに発生するイベント、という名前で、名前のとおりです。

~/mozilla/xpcom/io/nsStreamUtils.cpp

    NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream *stream)
    {
        mStream = stream;

        nsresult rv =
            mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
        if (NS_FAILED(rv)) {
            NS_WARNING("Dispatch failed");
            return NS_ERROR_FAILURE;
        }

        return NS_OK;
    }

mTargetはスレッドです。AsyncWaitで設定されたやつ。Dispatchは、そのスレッドのイベントキューにイベントを追加する関数。

~/mozilla/xpcom/threads/nsThread.cpp

nsThread::Dispatch(nsIRunnable *event, PRUint32 flags)
{
  LOG(("THRD(%p) Dispatch [%p %x]\n", this, event, flags));

  NS_ENSURE_ARG_POINTER(event);

  PRBool dispatched;
  if (flags & DISPATCH_SYNC) {
    nsThread *thread = nsThreadManager::get()->GetCurrentThread();
    NS_ENSURE_STATE(thread);

    // XXX we should be able to do something better here... we should
    //     be able to monitor the slot occupied by this event and use
    //     that to tell us when the event has been processed.

    nsRefPtr<nsThreadSyncDispatch> wrapper =
        new nsThreadSyncDispatch(thread, event);
    if (!wrapper)
      return NS_ERROR_OUT_OF_MEMORY;
    dispatched = PutEvent(wrapper);

    while (wrapper->IsPending())
      NS_ProcessNextEvent(thread);
  } else {
    NS_ASSERTION(flags == NS_DISPATCH_NORMAL, "unexpected dispatch flags");
    dispatched = PutEvent(event);
  }

  if (NS_UNLIKELY(!dispatched))
    return NS_ERROR_OUT_OF_MEMORY;

  return NS_OK;
}

これでイベントはUIスレッドで処理されてjs問題は解決。