初心者用kotlin

kotlinのメモです。

1207 views

Flowableのサンプル1

Flowableは、RxJavaの一部で、非同期データのストリームを扱うためのクラスです。特に、データの発行速度が受信者の処理能力を超える場合にバックプレッシャー制御を提供します。

以下は、簡単なFlowableの使用例です。

import io.reactivex.Flowable

fun main(args: Array<String>) {
    val flow = Flowable.just(1, 2, 3) // 1, 2, 3のデータを発行
    flow.subscribe {
        println(it) // データを受信して表示
    }

    Thread.sleep(1000) // スレッドを1秒間スリープ(非同期処理のための猶予時間)
}

このサンプルでは、Flowable.just(1, 2, 3)により、1, 2, 3の3つの値が順次発行され、それを購読者が受け取って表示しています。

実行結果

1
2
3

Flowableのサンプル2

次に、Flowableのより複雑な例を示します。ここでは、独自のFlowableを定義し、購読者(サブスクライバ)と一緒に動作する例です。

import io.reactivex.*
import io.reactivex.schedulers.Schedulers
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

// FlowableOnSubscribeは、データ提供者(エミッター)を定義するためのインターフェース。
class MyFlow : FlowableOnSubscribe<String> {
    override fun subscribe(emitter: FlowableEmitter<String>) {
        // データを発射する(例: 0から100までの数値を文字列に変換して発射)
        for (i in 0..100) {
            emitter.onNext(i.toString())
        }

        // データの発行が終わったことを通知
        emitter.onComplete()
    }
}

// データを購読するサブスクライバの定義
class MySubScriber<String> : Subscriber<String> {
    var subscription: Subscription? = null

    override fun onComplete() {
        println("すべてのデータを受信しました")
    }

    override fun onSubscribe(s: Subscription?) {
        // 購読開始時の処理。まず1つのデータをリクエストする。
        subscription = s
        subscription?.request(1)
        println("購読を開始しました")
    }

    override fun onNext(t: String) {
        // データを受信した際の処理
        println("データを受信: $t")
        // 次のデータをリクエスト
        subscription?.request(1)
    }

    override fun onError(t: Throwable?) {
        // エラー発生時の処理
        println("エラーが発生しました: ${t?.message}")
    }

    // 購読をキャンセルするためのメソッド
    fun dispose() {
        subscription?.cancel()
    }
}

fun main(args: Array<String>) {
    // データ提供者(MyFlow)のインスタンスを作成
    val myFlow = MyFlow()

    // 購読者(MySubScriber)のインスタンスを作成
    val mySubscriber = MySubScriber<String>()

    // Flowableを作成(MyFlowからデータを生成するFlowable)
    val flow = Flowable.create<String>(myFlow, BackpressureStrategy.BUFFER)

    // Flowableを購読し、別スレッドでデータを受信・処理する
    flow.observeOn(Schedulers.computation())  // 購読者は別スレッドで実行される
        .subscribe(mySubscriber)             // 購読を開始

    Thread.sleep(1000)  // 処理が終了するまでスレッドを待機
}

このサンプルの詳細な説明

  1. データの提供者 (MyFlow)

    • FlowableOnSubscribe<String>を実装しており、データ(0から100までの数字を文字列に変換したもの)を提供します。
    • emitter.onNext()でデータを発射し、すべてのデータが発射された後にonComplete()で完了を通知します。
  2. 購読者 (MySubScriber)

    • Subscriber<String>を実装しており、データの受信、エラー処理、完了通知を行います。
    • onSubscribeで最初のデータを1つだけリクエストし、その後はデータを受け取るたびに次のデータをリクエストしています(subscription?.request(1))。
  3. バックプレッシャー戦略

    • BackpressureStrategy.BUFFERを使用しており、受信側が処理できる速度よりもデータ発行が速い場合、データをバッファに蓄えるようにしています。
  4. 非同期処理

    • flow.observeOn(Schedulers.computation())により、データの処理が別スレッドで行われるため、UIスレッドなど他の処理をブロックしません。

実行結果(例)

購読を開始しました
データを受信: 0
データを受信: 1
データを受信: 2
...
データを受信: 100
すべてのデータを受信しました

このように、Flowableを使うことで、データストリームの発行・購読のモデルを実現できます。サンプル1では基本的な使い方を、サンプル2では独自のFlowableを使った高度な使い方を紹介しています。

Page 22 of 36.

前のページ 次のページ



[添付ファイル]


お問い合わせ

プロフィール

すぺぺぺ

自己紹介

本サイトの作成者。
プログラムは趣味と勉強を兼ねて、のんびり本サイトを作っています。
フレームワークはdjango。
ChatGPTで自動プログラム作成に取り組み中。

サイト/ブログ

https://www.osumoi-stdio.com/novel/

ツイッター

@darkimpact0626