はじめに
こんにちは。SREチームの西川です。
Cloud Pub/Sub Python Client library のソースコードリーディングを行う機会があり、library内でのconcurrent.futuresの利用方法についてメモしていたので、ついでに記事としてまとめてみました。
※ この記事は、Python3.9時点の実装を前提にした内容です。
concurrent.futuresについて
https://docs.python.org/ja/3.9/library/concurrent.futures.html
簡単にいうと、process / threadを利用して並列処理を簡単に実装できるlibraryです。
並列処理用のパッケージとしては、processであればmultiprocessing
とthreadであればthreading
も例に挙げられます。
ただし、process / threadの生成など低レイヤーをある程度理解した上で処理を実装することが求められます。そういった背景を踏まえて、よりhigh-levelなinterfaceを提供する目的で concurrent.futures
が追加されました。
concurrent.futuresのdocumentにもhigh-level
の記載があり
The concurrent.futures module provides a high-level interface for asynchronously executing callables.
docs.python.org/3/library/concurrent.futures.html
concurrent.futuresのPEPにも、プロセス/スレッドの明示的な起動などの並列処理実行以前に多くの作業が必要であるという内容が記載されています。
Motivation
https://www.python.org/dev/peps/pep-3148/
Python currently has powerful primitives to construct multi-threaded and multi-process applications but parallelizing simple operations requires a lot of work i.e. explicitly launching processes/threads, constructing a work/results queue, and waiting for completion or some other termination condition (e.g. failure, timeout). It is also difficult to design an application with a global process/thread limit when each component invents its own parallel execution strategy.
コード的には、下記のようにProcess(Thread)PoolExecutor
のインスタンスに対して、並列で処理させたい関数を引数で渡してsubmitすると、Executorのclassに応じて、子プロセス / 子スレッドで処理を実行します。
import time
import concurrent.futures
def return_six():
time.sleep(5)
return 6
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
future = executor.submit(return_six)
処理終了時にはresult()
で関数の実行結果を取得できます。
print(future.result()) # 6
処理の状態は、running()
, done()
などのbool値を返すメソッドで確認でき、futures.wait()
で全てのタスクの終了を待ち合わせることもできます。
Cloud Pub/Sub Client libraryでの利用
concurrent.futures
はCloud Pub/Sub Client libraryでは、messageに対するstreaming pullの実装などで利用されています。
(今回は、ブログの都合上 streaming pull に関する説明に絞ります)
streaming pullについても少し触れておくと、双方向ストリーミング (bi-directional streaming RPC) 方式で通信を行い、メッセージを取得する方法で高スループットなどの利点があります。
https://cloud.google.com/pubsub/docs/pull#streamingpull
Sample codeから、Libraryの実装を辿ってどのようにconcurrent.futures
が利用されているかを掘り下げていきたいと思います。
Sample codeでは、callback関数をsubscribe()
メソッドの引数として渡しています。
https://github.com/googleapis/python-pubsub/blob/main/samples/snippets/subscriber.py#L389-L426
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
with subscriber:
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
引数として渡されたcallback関数は、StreamingPullManager
が呼び出したopen()
の引数として渡されます。StreamingPullManager
はmessageのpullに加えて、messageの確認応答期限 (lease) の管理などを行います。
manager = streaming_pull_manager.StreamingPullManager(
self,
subscription,
... (省略) ...
)
future = futures.StreamingPullFuture(manager)
manager.open(callback=callback, on_callback_error=future.set_exception)
return future
open()
の処理内では、渡されたcallback関数を_wrap_callback_errors
の引数として固定した関数(self._callback
)を返します。この関数はmessage consumer側で暗黙的に利用されます。
また、後続の処理ではbidi.ResumableBidiRpc
のインスタンス生成によって、gRPCで利用される各種関数 (e.g. initial_request
)が渡されます。
self._callback = functools.partial(
_wrap_callback_errors, callback, on_callback_error
)
具体的には、bidi.BackgroundConsumer
のインスタンス生成時の引数として渡している、_on_response
でcallbackは利用されています。
(ちなみに、bidi
とはbi-directional streaming RPCの省略形で、BackgroundConsumer
はこの通信用のヘルパークラスの内の一つです。)
_on_response
の処理を見てみると、まずstreaming pullのresponseからmesssageを取り出して pubsub_v1.subscriber.message.Message
のオブジェクトに詰め替えて、_messages_on_hold
にputします。
for received_message in received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
self._messages_on_hold.put(message)
... (省略) ...
self._maybe_release_messages()
putされたmessageは _maybe_release_messages()
の処理内で取り出され、_schedule_message_on_hold()
に渡されます。
while self.load < _MAX_LOAD:
msg = self._messages_on_hold.get()
if not msg:
break
self._schedule_message_on_hold(msg)
released_ack_ids.append(msg.ack_id)
_schedule_message_on_hold
では、messageに対する処理をスケジュールする (schedule()
) 処理とmessage sizeを処理中のデータ量 の合計(_on_hold_bytes
)から減算するという処理を行っています。
def _schedule_message_on_hold(self, msg: "google.cloud.pubsub_v1.subscriber.message.Message"):
self._on_hold_bytes -= msg.size
... (省略) ...
assert self._scheduler is not None
assert self._callback is not None
self._scheduler.schedule(self._callback, msg)
そして、schedule()
の処理内で、self._executor.submit(callback, *args, **kwargs)
(子スレッドでの関数実行) が呼ばれています。
https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/scheduler.py#L128h
def schedule(self, callback: Callable, *args, **kwargs) -> None:
... (省略) ...
try:
self._executor.submit(callback, *args, **kwargs)
ここまでが_on_response
の処理の中身です。
そして、bidi.BackgroundConsumer
に戻り、_on_response
が渡された後から確認していきます。
bidi.BackgroundConsumer
のインスタンス生成時に同名のメンバ変数に_on_response
が渡された後、self._consumer.start()
(messageのconsume処理) が実行されます。
start()
が呼び出されると、background threadが生成されて、呼び出し可能なオブジェクト (target
) として _thread_main
が指定されます。
https://github.com/googleapis/python-api-core/blob/main/google/api_core/bidi.py#L682-L686
def start(self):
"""Start the background thread and begin consuming the thread."""
with self._operational_lock:
ready = threading.Event()
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._thread_main,
args=(ready,),
)
_thread_main
の処理内では、streamをopenした上で最終的にresponaseを受け取った際の処理(_on_response
)が呼ばれるという形になっており、このようにして登録したcallback関数が実行されます。
これが、concurrent.futures
を利用した一連の処理となっています。
https://github.com/googleapis/python-api-core/blob/main/google/api_core/bidi.py#L657
def _thread_main(self, ready):
try:
ready.set()
self._bidi_rpc.add_done_callback(self._on_call_done)
self._bidi_rpc.open()
while self._bidi_rpc.is_active:
with self._wake:
while self._paused:
_LOGGER.debug("paused, waiting for waking.")
self._wake.wait()
_LOGGER.debug("woken.")
_LOGGER.debug("waiting for recv.")
response = self._bidi_rpc.recv()
_LOGGER.debug("recved response.")
self._on_response(response)
サンプルコードでメソッドの結果を取得する処理として出てきたresult()
は、concurrent.futures.Future
を継承したStreamingPullFuture
が用意されており、result()
を呼び出すことができます。
class StreamingPullFuture(futures.Future):
def __init__(self, manager: "StreamingPullManager"):
super(StreamingPullFuture, self).__init__()
self.__manager = manager
self.__manager.add_close_callback(self._on_close_callback)
self.__cancelled = False
def _on_close_callback(self, manager: "StreamingPullManager", result: Any):
if self.done():
# The future has already been resolved in a different thread,
# nothing to do on the streaming pull manager shutdown.
return
if result is None:
self.set_result(True)
else:
self.set_exception(result)
def cancel(self) -> bool:
self.__cancelled = True
self.__manager.close()
return True
def cancelled(self) -> bool:
return self.__cancelled
終わりに
concurrent.futures
の概要とCloud pub/subのClient library内での利用方法についてまとめました。
これ以外にも、Streaming pullの subscribe()
の処理内で利用されている Leaser / Heartbeaterや、bi-directional streaming RPC用の他のヘルパークラスなどコードリーディングの題材としては面白そうなものがあるので、機会があればまとめてみたいと思います。