Prompt: After the article is finished, the table of contents can be automatically generated. How to generate it can refer to the help document on the right.
Article Directory
- Foreword
- What is rxjava?
- II. Use steps
- 1.Bring in the library
- 2.Read in the data
- Sum up
Foreword
This article describes the use of rxjava in project development, as well as detailed code.
What is rxjava?
RxJava is a framework for implementing asynchronous operations based on event streams. Its role is to implement asynchronous operations, similar to AsyncTask in Android. It uses observable sequences on the Java Virtual Machine (JVM) to build asynchronous, event-based programs. RxJava combines the best of the observer pattern, the iterator pattern, and the functional pattern. It was first used by Netflix to reduce the number of REST calls. It was later migrated to the Java platform and has been widely used.
Some of the key features of RxJava include support for Java 8 Lambda expressions, support for asynchronous and synchronous programming, a single dependency, and a clean, elegant code style. In addition, RxJava also solves the problem of “callback hell”. Asynchronous processing no longer requires a set of callbacks, but uses chain calls to complete the callbacks of different threads.
For Android developers, RxJava is often used in conjunction with RxAndroid, a responsive extension for RxJava on the Android platform. However, despite the programming convenience of RxJava, its complexity has caused some developers to have reservations about it.
。
II. Use steps
1.Bring in the library
The code is as follows (example):
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
2.Complex asynchronous processing in rxjava
Requirement: Connect as soon as you enter the page (asynchronous return results: failed, successful, and connecting). When you click the button, there are several States: 1. Connection failed-restart the connection.
1.1 Successful connection — the method of adjusting reading
1.2 Connection failed — UI failed to prompt
- Connecting
2.1 Successful connection — the method of adjusting reading
2.2 Connection failed — UI failed to prompt
- Successful connection — the method of adjusting reading
Implementation mode 1
class MainActivity : AppCompatActivity() {
private val subHandle = SubHandle()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
subHandle.mConsumer = null
subHandle.connect().subscribe()
}
fun btRead(view: View) {
subHandle.handleStatus {
subHandle.read()
}
}
/**
* Just connect to the page,when clicking the button,There are several states:
* 1、Connection failed--Restart connection,
* 1.1 connection succeeded --How to read
* 1.2 Connection failed --UIPrompt failed
* 2、connecting
* 2.1 connection succeeded --How to read
* 2.2 Connection failed --UIPrompt failed
* 3、connection succeeded --How to read
*/
class SubHandle {
var mConsumer: ((Int) -> Unit)? = null
private var status = AtomicInteger(-1) // 0Connection failed 1Connecting 2connection succeeded
private var disposable: Disposable? = null
fun connect(): Observable<Int> {
status.set(1)
Log.e("TAG", "=connect=")
return Observable.interval(5, TimeUnit.SECONDS)
.take(1)
.map {
val random = Random(System.currentTimeMillis())
val randomNumber = random.nextInt(3) // generate a0arrive2random integer between
Log.e("TAG", "==funAoutput$randomNumber")
randomNumber
}
.subscribeOn(Schedulers.io())
.doOnNext {
if (it == 2) {
status.set(2)
mConsumer?.invoke(status.get())
} else {
status.set(0)
Log.e("TAG", "Failed to connect to reader,GiveUIhint")
}
}
}
fun handleStatus(consumer: (Int) -> Unit) {
mConsumer = consumer
when (status.get()) {
0 -> {
Log.e("TAG", "Connection failed,Retrying connection")
disposable?.dispose()
disposable = connect().subscribe()
}
1 -> Log.e("TAG", "connecting")
2 -> mConsumer?.invoke(status.get())
}
}
fun read() {
Log.e("TAG", "Start reading")
}
}
}
Implementation mode 2
class MainActivity : AppCompatActivity() {
private var canRead = false
private var connectStatus = 0 //1 represent SUCC, 2 represent FAIL, 0 represent CONNECTING
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
connect()
}
private fun connect() {
Log.e("TAG", "=connect=")
Thread(Runnable {
Thread.sleep(5000) // hibernate5seconds
Observable.just(randomStatus())
.doOnNext { connectStatus = it }
.filter {
Log.e("TAG", "itstate" + it)
it == 1 && canRead
}
.subscribeOn(Schedulers.io())
.doOnNext { read() }
.subscribe()
}).start()
}
fun btRead(view: View) {
canRead = true
Log.e("TAG", "click button" + connectStatus)
when (connectStatus) {
1 -> read() // 1 represent SUCC
2 -> connect() // 2 represent FAIL
else -> {}
}
}
private fun read() {
Log.e("TAG", "Start reading")
}
private fun randomStatus(): Int {
val random = Random(System.currentTimeMillis())
return random.nextInt(3) //generate a0arrive2random integer between
}
}
3. Processing of multiple continuous pop-ups
Implemented using rxjava:
class MainActivity : AppCompatActivity() {
private val compositeDisposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
@SuppressLint("CheckResult")
fun btRead(view: View) {
Log.e("TAG", "jjjjjj")
showFirstDialog()
.flatMap { showSecondDialog() }
.flatMap { showThirdDialog() }.subscribe({
Log.e("TAG", "3All pop-up windows have been selected and OK")
}, { error ->
Log.e("TAG", "Clicked Cancel$error")
})
}
private fun showFirstDialog(): Observable<Unit> {
return Observable.create<Unit> { emitter ->
val dialog = AlertDialog.Builder(this)
.setMessage("The first pop-up window")
.setPositiveButton("Sure") { _, _ ->
emitter.onNext(Unit) // Send event,Indicates that the OK button was clicked
}
.setNegativeButton("Cancel") { _, _ ->
emitter.onError(Throwable("1Cancel")) // Send error event,Indicates that the cancel button has been clicked
}
.setOnCancelListener {
emitter.onError(Throwable("1Cancel")) // Send error event,Indicates that the return key has been clicked
}
.create()
dialog.show()
emitter.setCancellable { dialog.dismiss() } // Close popup when unsubscribing
}
}
private fun showSecondDialog(): Observable<Unit> {
return Observable.create<Unit> { emitter ->
val dialog = AlertDialog.Builder(this)
.setMessage("The second pop-up window")
.setPositiveButton("Sure") { _, _ ->
emitter.onNext(Unit)
}
.setNegativeButton("Cancel") { _, _ ->
emitter.onError(Throwable("2Cancel"))
}
.setOnCancelListener {
emitter.onError(Throwable("2Cancel"))
}
.create()
dialog.show()
emitter.setCancellable { dialog.dismiss() }
}
}
private fun showThirdDialog(): Observable<Unit> {
return Observable.create<Unit> { emitter ->
val dialog = AlertDialog.Builder(this)
.setMessage("The third pop-up window")
.setPositiveButton("Sure") { _, _ ->
emitter.onNext(Unit)
}
.setNegativeButton("Cancel") { _, _ ->
emitter.onError(Throwable("3Cancel"))
}
.setOnCancelListener {
emitter.onError(Throwable("3Cancel"))
}
.create()
dialog.show()
emitter.setCancellable { dialog.dismiss() }
}
}
}
Coroutine implementation:
fun btRead(view: View) {
lifecycleScope.launch {
try {
showAlertDialog(this@MainActivity, "hint1", "The first pop-up window")
showAlertDialog(this@MainActivity, "hint1", "The second pop-up window")
showAlertDialog(this@MainActivity, "hint1", "The third pop-up window")
} catch (e: Exception) {
Log.e("showAlertDialog", "2222111An exception occurs")
}
}
}
private suspend fun showAlertDialog(context: Context, title: String, message: String): Boolean =
suspendCancellableCoroutine { ctn ->
val activityRef = WeakReference(context as MainActivity)
val alertDialog = AlertDialog.Builder(context)
.setTitle(title)
.setMessage(message)
.setPositiveButton("Sure") { dialog, _ ->
// Logical processing of clicking the OK button
dialog.dismiss()
activityRef.get()?.let {
ctn.resume(true) {}
}
}
.setNegativeButton("Cancel") { dialog, _ ->
// Logic processing for clicking cancel button
dialog.dismiss()
activityRef.get()?.let {
ctn.resumeWithException(Exception(message + "Cancel"))
}
}
.setOnCancelListener {
activityRef.get()?.let {
ctn.resumeWithException(Exception("Cancel masking layer"))
}
}.create()
alertDialog.show()
}
Sum up
RxJava is a Reactive Extensions library based on the Java language. It is used to implement asynchronous programming and streaming processing. It improves the readability and maintainability of the code by processing events and data streams in the form of data sequences.