Table of Contents
1.ObserveOn
2.SubscribeOn
3.使用调度器
检查或设置取消订阅状态
下面的一系列函数作为RxJava的辅助函数,不一定常用,仅仅帮助我们增强代码的功能。
4.Delay
delaySubscription
5.Do
doOnEach
doOnNext
doOnSubscribe
6.Serialize
7.Timeout
8.Using
Home Java javaTutorial An in-depth explanation of RxJava_07 [Multi-threading & auxiliary operations (End)]

An in-depth explanation of RxJava_07 [Multi-threading & auxiliary operations (End)]

Mar 04, 2017 am 10:00 AM

In Android APP, we often need to access the network to obtain data. Requesting network data needs to be operated in a sub-thread. This requirement is broken down as follows:

  1. Network requests are placed in the observer (child thread).

  2. Network request result processing is placed in the observer (main thread).

  3. Subscription (when the network request is completed, it is convenient for the observer to notify the observer)

In order to better realize the above requirements, we Need to know how to use a specific thread to handle the subject and observer. The following article will introduce thread-related operations in RxJava.

1.ObserveOn

Specify the scheduler on which an observer observes this Observable.

In RxJava, to specify the scheduler on which the Observable should call the observer's onNext, onCompleted, and onError methods, you need to use the observeOn operator and pass it a suitable Scheduler.

  • Javadoc: observeOn(Scheduler))

    Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //main
                Log.i(TAG, "call: "+Thread.currentThread().getName());
                subscriber.onNext("Hello Android !");
            }
        })
        .observeOn(Schedulers.io())
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                //RxIoScheduler-2
                Log.i(TAG, "subscribe call: "+Thread.currentThread().getName());
                Log.i(TAG, "subscribe call: "+s);
            }
        });
    Copy after login

The above code is mainly called in the main thread, so it is printed by the observer is the main thread, and the observeOn function is used, causing the observer to jump to the RxIoScheduler-2 thread to run.

I don’t know if you have noticed that Schedulers.io() in the above code specifies the type of child thread. In addition, there are many thread types. As shown in the following table:

##Schedulers.immediate()In The current thread starts executing the task immediately Schedulers.io() is used for IO-intensive tasks, such as asynchronous blocking IO operations. The thread pool of this scheduler will be based on Needs to grow; for ordinary computing tasks, please use Schedulers.computation(); Schedulers.io() defaults to a CachedThreadScheduler, much like a new thread scheduler with a thread cache Schedulers.newThread()Create a new thread for each taskSchedulers.trampoline()When other queued tasks are completed, Start executing in the queue of the current threadAndroidSchedulers.mainThread()The main thread specified by Android

2.SubscribeOn

指定Observable自身在哪个调度器上执行。

ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。

示例代码如下:

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            //RxComputationScheduler-1
            Log.i(TAG, "call: "+Thread.currentThread().getName());
            subscriber.onNext("Hello Android !");
        }
    })
    //指定被观察者在哪个线程中运行
    .subscribeOn(Schedulers.computation())
    //指定观察者在哪个线程中运行
    .observeOn(Schedulers.io())
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            //RxIoScheduler-2
            Log.i(TAG, "subscribe call: "+Thread.currentThread().getName());
            Log.i(TAG, "subscribe call: "+s);
        }
    });
Copy after login

3.使用调度器

除了上面介绍的2个函数,你也可以用它们调度你自己的任务。下面的示例展示了Scheduler.Worker的用法:

//模拟在子线程执行任务
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        //call:---RxNewThreadScheduler-1
        Log.i(TAG, "call:---"+Thread.currentThread().getName());
    }
});
Copy after login

检查或设置取消订阅状态

Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:

final Scheduler.Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            Log.i(TAG, "do your work !");
            //执行完任务后取消订阅状态
            worker.unsubscribe();
        }
    }

});
Copy after login

下面的一系列函数作为RxJava的辅助函数,不一定常用,仅仅帮助我们增强代码的功能。

4.Delay

Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。

示例代码:

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Android");
            //如果发送异常 则直接抛出异常 上面的发送无效
            //subscriber.onError(new NullPointerException("MOCK"));
            subscriber.onNext("Android2");
        }
    })
    //整体延迟2秒
    .delay(2000, TimeUnit.MILLISECONDS)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: " + s);
        }
    });
Copy after login

delaySubscription

还有一个操作符delaySubscription让你你可以延迟订阅原始Observable。它结合搜一个定义延时的参数。

Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Android");
        }
    })
    //延迟2秒订阅
    .delaySubscription(2,TimeUnit.SECONDS)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i(TAG, "call: " + s);
        }
    });
Copy after login

5.Do

此操作符可以认为是监听器的一种,它监听onNext()事件和subcribe()事件,会在此两个事件前被调用。此函数分别为:doOnEach(),doOnNext(),doOnSubscribe().

doOnEach

doOnEach操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。

示例代码:

    Observable
        .just("Hello","Android")
        //每发送一次 就会现在Observer的onNext()中调用一次
        .doOnEach(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: ");

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: "+s);
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, "call: " + s);
            }
        });
Copy after login

输出:

com.m520it.rxjava I/IT520: onNext: Hello
com.m520it.rxjava I/IT520: call: Hello
com.m520it.rxjava I/IT520: onNext: Android
com.m520it.rxjava I/IT520: call: Android
com.m520it.rxjava I/IT520: onCompleted:
Copy after login

类似的函数还有:

  • Javadoc: doOnEach(Action1))

  • Javadoc: doOnEach(Observer))

doOnNext

doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。

示例代码

Observable.just(1, 2, 3)
    .doOnNext(new Action1<Integer>() {
        @Override
        public void call(Integer item) {
            Log.i(TAG, "doOnNext call: "+item);
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });
Copy after login

输出

com.m520it.rxjava I/IT520: doOnNext call: 1
com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: doOnNext call: 2
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: doOnNext call: 3
com.m520it.rxjava I/IT520: call: 3
Copy after login

doOnSubscribe

doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。

Javadoc: doOnSubscribe(Action0))

Observable.just(1, 2, 3)
    // 订阅之前调用
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG, "doOnSubscribe call ");
        }
    })
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });
Copy after login

输出

 I/IT520: doOnSubscribe call 
 I/IT520: call: 1
 I/IT520: call: 2
 I/IT520: call: 3
Copy after login

6.Serialize

强制一个Observable连续调用并保证行为正确

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。

RxJava提供具有此功能的函数为serialize()

7.Timeout

如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。

RxJava中的实现为timeout函数,我们可以使用该函数作为网络请求的超时异常处理。

示例代码:

//每次发送之后 下一次发送不能超过2秒 如果超过则跳转到onError()
Observable.interval(2, TimeUnit.SECONDS)
        .timeout(2, TimeUnit.SECONDS)
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.i(TAG, "call: "+aLong);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                //call throwable: null
                Log.i(TAG, "call throwable: " + throwable.getLocalizedMessage());
            }
        });
Copy after login

8.Using

Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。

using(Func0,Func1,Action1)操作符接受三个参数:

  1. 一个用于创建一次性资源的工厂函数

  2. 一个用于创建Observable的工厂函数

  3. 一个用于释放资源的函数

当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。

示例代码:

final Observable<Long> observable = Observable.using(new Func0<String>() {
    //创建一次性资源
    @Override
    public String call() {
        return "Hello Android !";
    }
}, new Func1<String, Observable<Long>>() {
    //创建被观察者
    @Override
    public Observable<Long> call(String s) {
        Log.i(TAG, "Func1 call: " + s);
        return Observable.interval(1, TimeUnit.SECONDS);
    }
}, new Action1<String>() {
    //用于销毁一次性资源
    @Override
    public void call(String s) {
        Log.i(TAG, "Action1 call: " + s);
    }
});

observable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted: ");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext: "+aLong);
        //取消订阅后 才能执行被观察者的销毁资源方法
        unsubscribe();
    }
});
Copy after login

在安卓APP中,我们经常需要通过访问网络获取数据,请求网络数据需要在子线程中操作,以下将这需求进行分解:

  1. 将网络请求放在被观察者中(子线程)。

  2. 网络请求结果处理放在观察者中(主线程)。

  3. 订阅(当网络请求完成后,方便被观察者通知观察者)

为了更好的实现上面的需求,我们需要知道如何在被观察者与观察者如何使用特定的线程来处理。下面的文章将介绍RxJava中线程相关的操作。

 以上就是深入浅出RxJava_07[多线程&辅助操作(完)]的内容,更多相关内容请关注PHP中文网(www.php.cn)!


Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

C++ function exceptions and multithreading: error handling in concurrent environments C++ function exceptions and multithreading: error handling in concurrent environments May 04, 2024 pm 04:42 PM

Function exception handling in C++ is particularly important for multi-threaded environments to ensure thread safety and data integrity. The try-catch statement allows you to catch and handle specific types of exceptions when they occur to prevent program crashes or data corruption.

How to implement multi-threading in PHP? How to implement multi-threading in PHP? May 06, 2024 pm 09:54 PM

PHP multithreading refers to running multiple tasks simultaneously in one process, which is achieved by creating independently running threads. You can use the Pthreads extension in PHP to simulate multi-threading behavior. After installation, you can use the Thread class to create and start threads. For example, when processing a large amount of data, the data can be divided into multiple blocks and a corresponding number of threads can be created for simultaneous processing to improve efficiency.

How can concurrency and multithreading of Java functions improve performance? How can concurrency and multithreading of Java functions improve performance? Apr 26, 2024 pm 04:15 PM

Concurrency and multithreading techniques using Java functions can improve application performance, including the following steps: Understand concurrency and multithreading concepts. Leverage Java's concurrency and multi-threading libraries such as ExecutorService and Callable. Practice cases such as multi-threaded matrix multiplication to greatly shorten execution time. Enjoy the advantages of increased application response speed and optimized processing efficiency brought by concurrency and multi-threading.

How do PHP functions behave in a multi-threaded environment? How do PHP functions behave in a multi-threaded environment? Apr 16, 2024 am 10:48 AM

In a multi-threaded environment, the behavior of PHP functions depends on their type: Normal functions: thread-safe, can be executed concurrently. Functions that modify global variables: unsafe, need to use synchronization mechanism. File operation function: unsafe, need to use synchronization mechanism to coordinate access. Database operation function: Unsafe, database system mechanism needs to be used to prevent conflicts.

Usage of JUnit unit testing framework in multi-threaded environment Usage of JUnit unit testing framework in multi-threaded environment Apr 18, 2024 pm 03:12 PM

There are two common approaches when using JUnit in a multi-threaded environment: single-threaded testing and multi-threaded testing. Single-threaded tests run on the main thread to avoid concurrency issues, while multi-threaded tests run on worker threads and require a synchronized testing approach to ensure shared resources are not disturbed. Common use cases include testing multi-thread-safe methods, such as using ConcurrentHashMap to store key-value pairs, and concurrent threads to operate on the key-value pairs and verify their correctness, reflecting the application of JUnit in a multi-threaded environment.

How to deal with shared resources in multi-threading in C++? How to deal with shared resources in multi-threading in C++? Jun 03, 2024 am 10:28 AM

Mutexes are used in C++ to handle multi-threaded shared resources: create mutexes through std::mutex. Use mtx.lock() to obtain a mutex and provide exclusive access to shared resources. Use mtx.unlock() to release the mutex.

Challenges and countermeasures of C++ memory management in multi-threaded environment? Challenges and countermeasures of C++ memory management in multi-threaded environment? Jun 05, 2024 pm 01:08 PM

In a multi-threaded environment, C++ memory management faces the following challenges: data races, deadlocks, and memory leaks. Countermeasures include: 1. Use synchronization mechanisms, such as mutexes and atomic variables; 2. Use lock-free data structures; 3. Use smart pointers; 4. (Optional) implement garbage collection.

Challenges and strategies for testing multi-threaded programs in C++ Challenges and strategies for testing multi-threaded programs in C++ May 31, 2024 pm 06:34 PM

Multi-threaded program testing faces challenges such as non-repeatability, concurrency errors, deadlocks, and lack of visibility. Strategies include: Unit testing: Write unit tests for each thread to verify thread behavior. Multi-threaded simulation: Use a simulation framework to test your program with control over thread scheduling. Data race detection: Use tools to find potential data races, such as valgrind. Debugging: Use a debugger (such as gdb) to examine the runtime program status and find the source of the data race.

See all articles
Scheduler typeEffect
Schedulers.computation( ) Used for computing tasks, such as event loops or callback processing, not for IO operations (please use Schedulers.io() for IO operations); the default number of threads is equal to the number of processors
Schedulers.from(executor)Use the specified Executor as the scheduler