04. RxSwift (4) ———— Advanced Operations

It will take about 5 minutes to finish reading this article.

1. Combination Operations

A combination operation is the synthesis of multiple Observable streams into a single Observable stream.

1.1 startWith

It is used to insert a specified initial element before the sequence of elements of the Observable sequence, and then emit the elements of the original Observable sequence in turn. In other words, a specific event message is emitted before the event message is emitted. For example, if you emit events 2 , 3 and then I startWith(1), then 1 will be emitted first, then 2 , 3.

1
2
3
4
5
6
7
8
9
10
func example15() {
Observable.of("2", "3")
.startWith("1")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Result:
1
2
3

1.2 merge

Used to merge multiple Observable sequences into a new Observable sequence while maintaining the order in which elements of all original Observable sequences are emitted (corresponding events are emitted according to the timeline).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func example16() {
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

subject1.onNext("?️")
subject1.onNext("?️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("?")
subject2.onNext("③")
}
//Result:
?️
?️


?

1.3 zip

Used to merge the elements of multiple Observable sequences into a new element one by one in order, and emit these new elements in sequence. In other words, the zip operator waits for all participating Observable sequences to emit an element, then merges these elements one-to-one in order into a new element, and then emits the new element.

zip binds up to 8 Observable streams and processes them together. Note that Zip is an event corresponding to an event in another stream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func example17() {
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

stringSubject.onNext("?️")
stringSubject.onNext("?️")

intSubject.onNext(1)

intSubject.onNext(2)

stringSubject.onNext("?")
intSubject.onNext(3)
}
//Compress stringSubject and intSubject together for joint processing
//Results:
? 1
? 2
? 3

1.4 combineLatest

Used to combine the latest elements of multiple Observable sequences into a new tuple, and emit this new tuple when any of the original Observable sequences emits a new element. Note: It binds no more than 8 Observable streams. The difference from Zip is that combineLatest is that the event of one stream corresponds to the latest event of another stream. Both events will be the latest events. You can compare the following figure with that of Zip.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

func example18() {
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

stringSubject.onNext("?️")
stringSubject.onNext("?️")
intSubject.onNext(1)
intSubject.onNext(2)
stringSubject.onNext("?")
}
//Results:
? 1
? 2
? 2

2. Transformation Operations

2.1 map

Used to apply a transformation function to each element in an Observable sequence and emit the transformed elements as a new Observable sequence.

1
2
3
4
5
6
7
8
9
10
11
func example19() {
//Multiply each element by itself
Observable.of(1, 2, 3)
.map { $0 * $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
1
4
9

2.2 flatMap

Used to map each element in the original Observable sequence into a new Observable sequence, then merge these new Observable sequences into a single Observable sequence, and emit elements sequentially in the order in which they were emitted. flatMap has an unpacking action, please see the code analysis.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
struct Player {
var score: Variable<Int> //There is a Variable inside
}

func example20() {

let ?? = Player(score: Variable(80))
let ?? = Player(score: Variable(90))
let ? = Player(score: Variable(550))

let player = Variable(??) //Convert player to Variable

player.asObservable() //Unboxing and converting into a sequence that can be monitored
.flatMap { $0.score.asObservable() } // flatMap has an unpacking action. $0 should have been a BehaviorSubject type, but the score was directly accessed. So I guess flatMap performed onNext unpacking on behaviorSubject to get the data.
.subscribe(onNext: { print($0) })
.addDisposableTo(disposeBag)

??.score.value = 85

player.value = ?? //Changing the value is equivalent to adding another sequence. Both sequences can be received.

??.score.value = 95
??.score.value = 222
player.value = ?

??.score.value = 100
}
//Results:
80
85
90
95
222
550
100

2.3 flatMapLatest

It is used to map the elements in the original Observable sequence into a new Observable sequence, and only emits the elements of the latest mapped Observable, ignoring the old mapped Observable. Change the above example to flatMapLatest and the result is:

1
2
3
4
80
85
90
550

2.4 scan

Used to process the elements of an observable sequence one by one according to the specified accumulation rules and generate a new observable sequence that emits the accumulated results of each step.

1
2
3
4
5
6
7
8
9
10
11
12
13
func example21() {

Observable.of(10, 100, 1000)
.scan(1) { aggregateValue, newValue in
aggregateValue + newValue
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
11
111
1111

3. Filtering And Constraint Operations

3.1 filter

Used to select elements that meet specified conditions from an Observable sequence, and then emit these elements as a new Observable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func example22() {
Observable.of(
"1", "2", "3",
"?", "?", "?",
"4", "5", "6")
.filter {
$0 == "?"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
?
?
?

3.2 distinctUntilChanged

It is used to filter out consecutive repeated elements and emit only those elements that are not identical to the previous element. This operation is similar to RAC.

1
2
3
4
5
6
7
8
func example23() {
Observable.of("?", "?", "?", "?", "?", "?", "?")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
?

3.3 elementAt

Used to obtain the element at the specified index position from an Observable sequence and emit it as a new Observable. This operator takes an index value as a parameter and returns a new Observable that emits only the element at the specified index in the original sequence and then completes immediately.

1
2
3
4
5
6
7
8
func example24() {
Observable.of("1", "2", "3", "4", "5", "6")
.element(at:3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
4

3.4 single

Used to get a single element from an Observable sequence and emit it as a new Observable. This operator is typically used to ensure that the Observable emits only one element, and it will generate an error event if the Observable emits multiple elements or no elements.

1
2
3
4
5
6
7
8
9
10
func example25() {
Observable.of("?", "?", "?", "?", "?", "?")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

}
//Results:
?
Unhandled error happened: Sequence contains more than one element. //Single signal exceeds one

3.5 take

Used to take a specified number of elements from an Observable sequence and emit them as a new Observable. This operator accepts an integer parameter, indicating the number of elements to be removed.

1
2
3
4
5
6
7
8
9
10
11

func example26() {
Observable.of("1", "2", "3", "4", "5", "6")
.take(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
1
2
3

3.6 takeLast

This operator accepts an integer parameter indicating the number of last elements to be obtained, i.e. it only handles the last few event signals.

1
2
3
4
5
6
7
8
9
10
func example27() {
Observable.of("1", "2", "3", "4", "5", "6")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
4
5
6

3.7 takeWhile

Used to obtain elements from an Observable sequence based on a certain condition until the condition is no longer met, and emit these elements as a new Observable. This operator accepts a closure argument that returns a Boolean value indicating whether the condition is met.

1
2
3
4
5
6
7
8
9
10
func example28() {
Observable.of(1, 2, 3, 4, 5, 6)
.take(while: {$0 < 4 })
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Results:
1
2
3

3.8 takeUntil

Used to get elements from an Observable sequence if the specified conditions are met, and emit the elements as a new Observable until another specified Observable emits elements or completes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func example29() {
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
.take(until:referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("?")
sourceSequence.onNext("?")
sourceSequence.onNext("?")
referenceSequence.onNext("?") //Stop receiving messages
sourceSequence.onNext("?")
sourceSequence.onNext("?")
sourceSequence.onNext("?")
}
//Results:
next(?)
next(?)
next(?)
completed

3.9 skip

Used to skip a specified number of elements from an Observable sequence and emit the remaining elements as a new Observable.

1
2
3
4
5
6
7
8
9
10
11
func example30() {
Observable.of("1", "2", "3", "4", "5", "6")
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Result:
3
4
5
6

3.10 skipWhile

Used to skip elements in an Observable sequence if specified conditions are met, and then emit the remaining elements as a new Observable. This operator accepts a closure argument that returns a Boolean value indicating whether the current element should be skipped.

1
2
3
4
5
6
7
8
9
10
11

func example31() {
Observable.of(1, 2, 3, 4, 5, 6)
.skip(while: { $0 < 4 } )
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Result:
4
5
6

3.11 skipUntil

Used to skip elements in an original Observable before waiting for another Observable to emit elements. Specifically, skipUntil accepts an Observable as a parameter. When this parameter Observable emits elements, skipUntil will start emitting elements in the original Observable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

func example32() {
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
.skip(until: referenceSequence)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")

referenceSequence.onNext("4")

sourceSequence.onNext("5")
sourceSequence.onNext("6")
sourceSequence.onNext("7")
}
//Result:
5
6
7

4. Math Operations

4.1 toArray

It is used to collect all the elements in an Observable sequence into an array and emit the array as a single-element Observable. In other words, it converts a sequence of multiple elements into an array containing all the elements and emits that array.

1
2
3
4
5
6
7
8
func example33() {
Observable.range(start: 1, count: 10)
.toArray()
.subscribe { print($0) }
.disposed(by: disposeBag)
}
//Result:
success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

4.2 reduce

It is used to accumulate (reduce) elements in an Observable sequence according to specified rules, and emit the final result as a new Observable.

1
2
3
4
5
6
7
8
9

func example34() {
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
//Result:
1111

4.3 Concat

concat will combine multiple sequences into one sequence, and the event of the next sequence will not start until the previous sequence emits the completed event.

Before the first sequence is completed, events emitted by the second sequence will be ignored, but the last event emitted by the second sequence before completion will be received.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func example35() {
let subject1 = BehaviorSubject(value: "?")
let subject2 = BehaviorSubject(value: "?")

let variable = Variable(subject1)

variable.asObservable()
.concat()
.subscribe { print($0) }
.disposed(by: disposeBag)

subject1.onNext("?")
subject1.onNext("?")

variable.value = subject2

subject2.onNext("?") //1 will be ignored before completion
subject2.onNext("teng") //1 will be ignored before completion
subject2.onNext("fei") //The last one before 1 is completed will be received
subject1.onCompleted()
subject2.onNext("?")
}
//Results:
next(?)
next(?)
next(?)
next(fei)
next(?)

5. Connectable Operations

Connectable Operators are a special class of operators that are used to control situations where multiple observers share the same underlying data source. These operators can convert a normal Observable into a connectable Observable, and then use the connect() method to allow multiple observers to subscribe to this connectable Observable so that they share the same data. Here are some common connectivity operations:

5.1 publish

Convert a normal sequence into a connectable sequence, and a normal Observable into a connectable Observable. It does not start emitting elements immediately, but waits for the connect() method to be called.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
let disposeBag = DisposeBag()

let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.publish() // Use publish to make the Observable connectable

// first observer
observable
.subscribe(onNext: { value in
print("Observer 1: \(value)")
})
.disposed(by: disposeBag)

// Connect after some delay
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
observable.connect() // Start emitting elements
}

// second observer
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
observable
.subscribe(onNext: { value in
print("Observer 2: \(value)")
})
.disposed(by: disposeBag)
}

5.2 replay

Converts a normal Observable to a connectable Observable and caches the elements it emits so that subsequent observers can obtain previously emitted elements.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
let disposeBag = DisposeBag()

let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.replay(2) // Use replay to cache the latest 2 elements

// first observer
observable
.subscribe(onNext: { value in
print("Observer 1: \(value)")
})
.disposed(by: disposeBag)

// Connect after some delay
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
observable.connect() // Start emitting elements
}

// second observer
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
observable
.subscribe(onNext: { value in
print("Observer 2: \(value)")
})
.disposed(by: disposeBag)
}

5.3 multicast

Convert a normal Observable to a joinable Observable. Send it through the characteristic subject, such as PublishSubject, or replaySubject, behaviorSubject, etc. Different Subjects will have different results. Use the connect() method to start emitting elements.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
let disposeBag = DisposeBag()

let subject = PublishSubject<Int>()

let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.multicast(subject) // Use multicast and specify Subject

// first observer
observable
.subscribe(onNext: { value in
print("Observer 1: \(value)")
})
.disposed(by: disposeBag)

// Connect after some delay
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
observable.connect() // Start emitting elements
}

// second observer
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
observable
.subscribe(onNext: { value in
print("Observer 2: \(value)")
})
.disposed(by: disposeBag)
}

6. Error Handling

6.1 catchErrorJustReturn

Used to replace the error’s Observable sequence with a predefined default value when an error is encountered, and then continue subscribing to the original Observable. It can be used to handle errors without interrupting the flow of the entire Observable and provide a default value to replace the erroneous element. To put it simply, when encountering an error event, return a default value and end.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let disposeBag = DisposeBag()

let numbers = Observable.of(1, 2, 3, 4, 5)

numbers
.map { value -> Int in
if value == 3 {
throw NSError(domain: "CustomErrorDomain", code: 0, userInfo: nil)
}
return value
}
.catchErrorJustReturn(-1) // Emit the default value -1 when an error is encountered
.subscribe(onNext: { element in
print("The processed element is: \(element)")
}, onError: { error in
print("An error occurred: \(error)")
})
.disposed(by: disposeBag)

6.2 catchError

Operator for handling errors, which allows you to perform custom error handling logic when an error is encountered, that is, capture the error for processing and return a new Observable to continue subscribing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
let disposeBag = DisposeBag()

let numbers = Observable.of(1, 2, 3, 4, 5)

numbers
.map { value -> Int in
if value == 3 {
throw NSError(domain: "CustomErrorDomain", code: 0, userInfo: nil)
}
return value
}
.catchError { error in
print("Error caught: \(error)")
return Observable.of(-1) // Return a new Observable to continue subscribing
}
.subscribe(onNext: { element in
print("The processed element is: \(element)")
})
.disposed(by: disposeBag)

6.3 retry

Used to try to resubscribe to the original Observable when an error is encountered to continue receiving elements. If an error occurs, the retry operator resubscribes to the original Observable and continues to emit elements until the maximum number of attempts is reached or no more errors occur. Especially commonly used when network requests fail.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
let disposeBag = DisposeBag()

var attempts = 0

let observable = Observable<Int>.create { observer in
attempts += 1
if attempts < 4 {
observer.onError(NSError(domain: "CustomErrorDomain", code: 0, userInfo: nil))
} else {
observer.onNext(10)
observer.onCompleted()
}
return Disposables.create()
}

observable
.retry(3) // Retry up to 3 times
.subscribe(
onNext: { element in
print("The received element is: \(element)")
},
onError: { error in
print("An error occurred: \(error)")
},
onCompleted: {
print("Operation completed")
}
)
.disposed(by: disposeBag)

7.Debug

7.1 debug

Operator for debugging, which allows you to insert debugging information into the Observable stream to track the Observable’s life cycle, element emission and subscription events, etc. Typically used when developing and debugging RxSwift code to better understand the behavior of Observables.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
let disposeBag = DisposeBag()

let observable = Observable<Int>.create { observer in
observer.on(.next(1))
observer.on(.next(2))
observer.on(.completed)
return Disposables.create()
}

observable
.debug("Observable")
.subscribe(
onNext: { element in
print("The received element is: \(element)")
},
onError: { error in
print("An error occurred: \(error)")
},
onCompleted: {
print("Operation completed")
}
)
.disposed(by: disposeBag)

7.2 RxSwift.Resources.total

Property used to get the current RxSwift resource count. It is used to monitor the number of resources created and released by RxSwift in the current application. These resources include Observable, Disposable, Operators, etc. Resource counts can help you detect potential resource leak issues or unreasonable resource usage.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let disposeBag = DisposeBag()

// Create some Observables and Disposables
let observable = Observable<Int>.just(1)
let disposable = observable.subscribe()

// Get the current RxSwift resource count
let resourceCount = RxSwift.Resources.total

print("Current RxSwift resource count: \(resourceCount)")

// Remember to release resources to avoid resource leaks
disposable.dispose()

// Get the RxSwift resource count again
let updatedResourceCount = RxSwift.Resources.total

print("Updated RxSwift resource count: \(updatedResourceCount)")

Reference

[1] https://github.com/ReactiveX/RxSwift
[2] https://www.jianshu.com/p/f4ad780cc7a2
[3] https://www.jianshu.com/p/d35a99815806
[4] https://zhuanlan.zhihu.com/p/25939562