VISASQ Dev Blog

ビザスク開発ブログ

Cloud Pub/Sub Client libraryでのconcurrent.futuresの使われ方を調べた話

はじめに

こんにちは。SREチームの西川です。

Cloud Pub/Sub Python Client library ソースコードリーディングを行う機会があり、library内でのconcurrent.futuresの利用方法についてメモしていたので、ついでに記事としてまとめてみました。

※ この記事は、Python3.9時点の実装を前提にした内容です。

concurrent.futuresについて

https://docs.python.org/ja/3.9/library/concurrent.futures.html

簡単にいうと、process / threadを利用して並列処理を簡単に実装できるlibraryです。

並列処理用のパッケージとしては、processであればmultiprocessing とthreadであればthreadingも例に挙げられます。
ただし、process / threadの生成など低レイヤーをある程度理解した上で処理を実装することが求められます。そういった背景を踏まえて、よりhigh-levelなinterfaceを提供する目的で concurrent.futures が追加されました。

concurrent.futuresのdocumentにもhigh-levelの記載があり

The concurrent.futures module provides a high-level interface for asynchronously executing callables.

docs.python.org/3/library/concurrent.futures.html

concurrent.futuresのPEPにも、プロセス/スレッドの明示的な起動などの並列処理実行以前に多くの作業が必要であるという内容が記載されています。

Motivation
Python currently has powerful primitives to construct multi-threaded and multi-process applications but parallelizing simple operations requires a lot of work i.e. explicitly launching processes/threads, constructing a work/results queue, and waiting for completion or some other termination condition (e.g. failure, timeout). It is also difficult to design an application with a global process/thread limit when each component invents its own parallel execution strategy.

https://www.python.org/dev/peps/pep-3148/

コード的には、下記のようにProcess(Thread)PoolExecutorインスタンスに対して、並列で処理させたい関数を引数で渡してsubmitすると、Executorのclassに応じて、子プロセス / 子スレッドで処理を実行します。

import time
import concurrent.futures

def return_six():
    time.sleep(5)
    return 6


executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
future = executor.submit(return_six)

処理終了時にはresult()で関数の実行結果を取得できます。

print(future.result()) # 6

処理の状態は、running(), done() などのbool値を返すメソッドで確認でき、futures.wait() で全てのタスクの終了を待ち合わせることもできます。

Cloud Pub/Sub Client libraryでの利用

concurrent.futuresはCloud Pub/Sub Client libraryでは、messageに対するstreaming pullの実装などで利用されています。
(今回は、ブログの都合上 streaming pull に関する説明に絞ります)

streaming pullについても少し触れておくと、双方向ストリーミング (bi-directional streaming RPC) 方式で通信を行い、メッセージを取得する方法で高スループットなどの利点があります。

https://cloud.google.com/pubsub/docs/pull#streamingpull

Sample codeから、Libraryの実装を辿ってどのようにconcurrent.futuresが利用されているかを掘り下げていきたいと思います。

Sample codeでは、callback関数をsubscribe() メソッドの引数として渡しています。

https://github.com/googleapis/python-pubsub/blob/main/samples/snippets/subscriber.py#L389-L426

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

with subscriber:
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

引数として渡されたcallback関数は、StreamingPullManagerが呼び出したopen()の引数として渡されます。
StreamingPullManagerはmessageのpullに加えて、messageの確認応答期限 (lease) の管理などを行います。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/client.py#L149-L269

manager = streaming_pull_manager.StreamingPullManager(
    self,
    subscription,
    ... (省略) ...
)

future = futures.StreamingPullFuture(manager)

manager.open(callback=callback, on_callback_error=future.set_exception)

return future

open()の処理内では、渡されたcallback関数を_wrap_callback_errorsの引数として固定した関数(self._callback )を返します。この関数はmessage consumer側で暗黙的に利用されます。
また、後続の処理ではbidi.ResumableBidiRpcインスタンス生成によって、gRPCで利用される各種関数 (e.g. initial_request)が渡されます。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L503

self._callback = functools.partial(
    _wrap_callback_errors, callback, on_callback_error
)

具体的には、bidi.BackgroundConsumerインスタンス生成時の引数として渡している、_on_responseでcallbackは利用されています。
(ちなみに、bidiとはbi-directional streaming RPCの省略形で、BackgroundConsumerはこの通信用のヘルパークラスの内の一つです。)

_on_responseの処理を見てみると、まずstreaming pullのresponseからmesssageを取り出して pubsub_v1.subscriber.message.Messageのオブジェクトに詰め替えて、_messages_on_holdにputします。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L703

for received_message in received_messages:
    message = google.cloud.pubsub_v1.subscriber.message.Message(
       received_message.message,
       received_message.ack_id,
       received_message.delivery_attempt,
       self._scheduler.queue,
    )
    self._messages_on_hold.put(message)

... (省略) ...
self._maybe_release_messages()

putされたmessageは _maybe_release_messages()の処理内で取り出され、_schedule_message_on_hold()に渡されます。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L383

while self.load < _MAX_LOAD:
    msg = self._messages_on_hold.get()
    if not msg:
        break

    self._schedule_message_on_hold(msg)
    released_ack_ids.append(msg.ack_id)

_schedule_message_on_hold では、messageに対する処理をスケジュールする (schedule()) 処理とmessage sizeを処理中のデータ量 の合計(_on_hold_bytes)から減算するという処理を行っています。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L407

def _schedule_message_on_hold(self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"):
    self._on_hold_bytes -= msg.size
    ... (省略) ...
    assert self._scheduler is not None
    assert self._callback is not None
    self._scheduler.schedule(self._callback, msg)

そして、schedule() の処理内で、self._executor.submit(callback, *args, **kwargs) (子スレッドでの関数実行) が呼ばれています。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/scheduler.py#L128h

def schedule(self, callback: Callable, *args, **kwargs) -> None:
   ... (省略) ...
    try:
        self._executor.submit(callback, *args, **kwargs)

ここまでが_on_responseの処理の中身です。
そして、bidi.BackgroundConsumerに戻り、_on_responseが渡された後から確認していきます。

bidi.BackgroundConsumerインスタンス生成時に同名のメンバ変数に_on_responseが渡された後、self._consumer.start()(messageのconsume処理) が実行されます。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L561

start() が呼び出されると、background threadが生成されて、呼び出し可能なオブジェクト (target) として _thread_mainが指定されます。

https://github.com/googleapis/python-api-core/blob/main/google/api_core/bidi.py#L682-L686

def start(self):
    """Start the background thread and begin consuming the thread."""
    with self._operational_lock:
        ready = threading.Event()
        thread = threading.Thread(
            name=_BIDIRECTIONAL_CONSUMER_NAME,
            target=self._thread_main,
            args=(ready,),
        )

_thread_main の処理内では、streamをopenした上で最終的にresponaseを受け取った際の処理(_on_response)が呼ばれるという形になっており、このようにして登録したcallback関数が実行されます。
これが、concurrent.futuresを利用した一連の処理となっています。

https://github.com/googleapis/python-api-core/blob/main/google/api_core/bidi.py#L657

    def _thread_main(self, ready):
        try:
            ready.set()
            self._bidi_rpc.add_done_callback(self._on_call_done)
            self._bidi_rpc.open()

            while self._bidi_rpc.is_active:
                with self._wake:
                    while self._paused:
                        _LOGGER.debug("paused, waiting for waking.")
                        self._wake.wait()
                        _LOGGER.debug("woken.")

                _LOGGER.debug("waiting for recv.")
                response = self._bidi_rpc.recv()
                _LOGGER.debug("recved response.")
                self._on_response(response)

サンプルコードでメソッドの結果を取得する処理として出てきたresult()は、concurrent.futures.Futureを継承したStreamingPullFutureが用意されており、result()を呼び出すことができます。

https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/futures.py#L55

class StreamingPullFuture(futures.Future):
    def __init__(self, manager: "StreamingPullManager"):
        super(StreamingPullFuture, self).__init__()
        self.__manager = manager
        self.__manager.add_close_callback(self._on_close_callback)
        self.__cancelled = False

    def _on_close_callback(self, manager: "StreamingPullManager", result: Any):
        if self.done():
            # The future has already been resolved in a different thread,
            # nothing to do on the streaming pull manager shutdown.
            return

        if result is None:
            self.set_result(True)
        else:
            self.set_exception(result)

    def cancel(self) -> bool:
        self.__cancelled = True
        self.__manager.close()
        return True

    def cancelled(self) -> bool:
        return self.__cancelled

終わりに

concurrent.futuresの概要とCloud pub/subのClient library内での利用方法についてまとめました。

これ以外にも、Streaming pullの subscribe() の処理内で利用されている Leaser / Heartbeaterや、bi-directional streaming RPC用の他のヘルパークラスなどコードリーディングの題材としては面白そうなものがあるので、機会があればまとめてみたいと思います。