System.Threading.Channelsの紹介
System.Threading.ChannelsはC#標準のProducer/Consumerパターンの実装です。
.NET5.0または.NET Core3.0以降から使えます。Unityなどそれ以前の環境でもパッケージを入れて使えます。
Producer/Consumerパターンとは
Producer/Consumerパターンは、データを出力する「Producer(生産者)」とデータを受け取って処理する「Consumer(消費者)」の間に「Channel(通信路)」を挟むデザインパターンです。
このパターンはレストランの厨房にたとえて説明されます。「Producer」はレストランの料理人になります。「Consumer」は配膳係です。料理人は料理を作ってテーブルに置き、配膳係は置かれた料理を取って客のところまで運んでいきます。
このテーブルが「Channel」です。作られた料理(= データ)が受け取られるまで一時的に保管しています。
テーブルがあることによって様々なメリットが生まれます。配膳係は料理人1人1人に完成したか尋ねる代わりに、テーブルを確認して完成した料理を持って行けます。料理人からしても配膳係が来るのを待たずに完成した料理をテーブルに置いてすぐ次の料理に取りかかれます。配膳が追いつかずテーブルが一杯ならテーブルが空くまで料理を休むこともできます。
System.Threading.Channelsではこのテーブルを作成できます。これには以下のような特徴があります。
- Producerがデータを非同期に入力できます。Channelsはそれを溜めておき、Consumerが非同期に取得できます。取得されたデータはChannelsから消えます。
- 先に入力されたデータが先に取得されます。
- 溜めておけるデータの数を制限できます。無制限にもできます。
- データが一杯のときの挙動を指定できます。(空くまで書き込みをブロックする/入力されたデータを捨てる/溜められているデータを捨てる)
- ProducerやConsumerが1つだけのときは、オプション指定することで高速な特化実装を使えます。
- これ以上データが存在しないことを通知できます。
ProducerとConsumerが互いに知らない状態にできるのもProducer/Consumerパターンのメリットになります。それぞれの事情に応じてProducerやConsumerの数が増減しても柔軟に対応できます。
Publish/Subscribeパターンとは
Publish/SubscribeパターンはProducer/Consumerパターンと混同しやすいので、少し脱線して簡単に説明しておきます。
Publish/Subscribeパターンはデータの処理というよりイベントの処理に使うことが多いです。 「Publisher(発行者)」がイベントを発行すると「MessageBroker(仲介者)」を介して「Subscriber(購読者)」に渡されます。
先に説明したProducer/Consumerパターンでは、Producerが出力したデータを最初に見に来たConsumer1つだけが受け取っていました。こちらのPublish/Subscribeパターンでは、Publisherがイベントを発行するとそれを見ているSubscriber全てがそのデータを受け取ります。
このように、1つのデータを1つだけが受け取るか全員が受け取るかがProducer/ConsumerとPulisher/Subscriberの違いになります。
C#のeventはPublish/Subscribeパターンとして見なせます。他にもReactivePropertyやUnityのUnityEvent、UniRx.MessageBroker、MessagePipeなどの実装があります。
System.Threading.Channelsのインターフェース
話を戻してSystem.Threading.Channelsについて見ていきます。
System.Threading.Channelsの使い方は用意されているインターフェースを見ると大体わかります。
public abstract class Channel<T>
{
public ChannelReader<T> Reader { get; }
public ChannelWriter<T> Writer { get; }
}
Channel<T>はProducerがデータを出力するのに使うWriterと、Consumerがデータを受け取るのに使うReaderを持っています。これをProducerとConsumerに渡して使ってもらいます。
public abstract class ChannelWriter<T>
{
public abstract bool TryWrite(T item);
public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
public abstract ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);
public void Complete(Exception error);
public virtual bool TryComplete(Exception error);
}
ChannelWriter<T>はTryWriteやWriteAsyncでデータを出力できます。これ以上データが出力されなくなったらCompleteかTryCompleteで完了を通知できます。
public abstract class ChannelReader<T>
{
public abstract bool TryRead(out T item);
public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
public abstract ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
public virtual IAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default);
public virtual Task Completion { get; }
}
ChannelReader<T>はTryReadやReadAsyncでデータを受け取れます。
public static class Channel
{
public static Channel<T> CreateUnbounded<T>();
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
public static Channel<T> CreateBounded<T>(int capacity);
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options);
}
staticのChannel.CreateUnboundedかChannel.CreateBoundedを使ってChannelを生成できます。Unboundedはデータ数が無制限、Boundedは制限有りです。
System.Threading.Channelsのサンプルコード
実際に使ってみたコード例を以下に挙げます。
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
public class ChannelSample
{
private Channel<int> channel;
public Task ChannelSampleAsync()
{
// データの数が無制限のChannelを生成する
channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
// 今回はProducerが1つだけなのでSingleWriter = trueにする
SingleWriter = true,
});
// Producerは1つ
_ = ProduceAsync(channel.Writer);
// Consumerは3つ
Task task1 = ConsumeAsync(0, channel.Reader);
Task task2 = ConsumeAsync(1, channel.Reader);
Task task3 = ConsumeAsync(2, channel.Reader);
// Consumerが全て終わるまで待つ
return Task.WhenAll(task1, task2, task3);
}
// ランダムな数値を10回出力するProducer
private static async Task ProduceAsync(ChannelWriter<int> writer)
{
var random = new Random();
for (int i = 0; i < 10; ++i)
{
int value = random.Next();
await writer.WriteAsync(value);
}
// これ以上出力しないので完了する
writer.Complete();
}
// 出力された数値を読み取って処理するConsumer
private static async Task ConsumeAsync(int id, ChannelReader<int> reader)
{
// 次のデータが来るまで待ってtrueを返す。完了していたらfalseが返る
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out int value))
{
// 何らかの重い処理をしてから出力する
await Task.Delay(100);
Console.WriteLine($"ReadAsync({id}) received {value}");
}
}
Console.WriteLine($"ReadAsync({id}) completed");
}
}
ChannelSampleAsyncを呼び出すと以下のような出力が得られます。
ReadAsync(1) received 55682499
ReadAsync(2) received 190475826
ReadAsync(0) received 301238786
ReadAsync(1) received 153571653
ReadAsync(0) received 1237136147
ReadAsync(2) received 403956259
ReadAsync(0) received 59638378
ReadAsync(2) received 653195099
ReadAsync(2) completed
ReadAsync(1) received 946549418
ReadAsync(1) completed
ReadAsync(0) received 855244822
ReadAsync(0) completed
参考リンク
・An Introduction to System.Threading.Channels
Producer/ConsumerパターンについてChannelsを使わずに実装しながら丁寧に説明しています。