03. Combine (3) ———— Common types (2)

It will take about 10 minutes to finish reading this article.

Overview

Continuing from the previous article, we will continue to introduce some common types in the Combine framework.

11. AnySubscriber

Subscriber type erasure (Type Erasure) version. Used to encapsulate subscribers and allow you to erase the specific type of Subscriber, thereby increasing the flexibility of the code. Normally, you don’t need to create an AnySubscriber object directly as it is usually used inside the Combine operator and the framework.

An AnySubscriber object can be created by passing a specific subscriber to AnySubscriber’s initialization method. This will encapsulate the original subscriber and do type erasure.

1
2
3
4

let specificSubscriber = MySpecificSubscriber<Int, Error>()
let anySubscriber = AnySubscriber(specificSubscriber)

AnySubscriber can be used to subscribe to Publisher like a normal subscriber, thereby receiving values and completion status in the data stream. You can pass AnySubscriber to operators such as sink and assign.

12. Subject

Subject is a special kind of Publisher and Subscriber. It can be used to create and manage data flows, allowing you to post values manually or send external values into the data flow. Subject acts as a relay for data in Combine, allowing you to add, update, and distribute values in the data flow. It has 2 main types of Subject, each type has a different purpose.

1. PassthroughSubject:

PassthroughSubject is the most common Subject type, which passes the received value directly to its subscribers. It does not cache or replay values, it only delivers the current value to the subscriber.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//Create PassthroughSubject, specify input and error type
let subject = PassthroughSubject<Int, Never>()

// Subscribe to the Subject and process the received value
let cancelable = subject
.sink(receiveValue: { value in
print("Received value: \(value)")
})

//Publish value to Subject
subject.send(1)
subject.send(2)
subject.send(3)

// unsubscribe
cancelable.cancel()

2. CurrentValueSubject:

CurrentValueSubject is similar to PassthroughSubject, but it has an initial value and sends the current value to new subscribers when subscribing. It can be used to represent an observable object with current state.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import Combine

// Create a CurrentValueSubject with an initial value
let subject = CurrentValueSubject<Int, Never>(0)

// Subscribe to the Subject and process the received value
let cancelable = subject
.sink(receiveValue: { value in
print("Received value: \(value)")
})

//Publish value to Subject
subject.send(1)
subject.send(2)
subject.send(3)

// New subscribers will receive the initial value immediately
let newSubscriber = subject
.sink(receiveValue: { value in
print("New subscriber received value: \(value)")
})

// unsubscribe
cancelable.cancel()
newSubscriber.cancel()

The result is:

1
2
3
4
5
Received value: 0
Received value: 1
Received value: 2
Received value: 3
New subscriber received value: 3

*** Note: ***
The @Published property wrapper is actually a property wrapper based on CurrentValueSubject, which enables changes to properties to be automatically published to subscribers.

13. Scheduler

Scheduler is a protocol that defines when and how to execute a closure. It is a very important concept that is used to manage and control the execution of events in time. Scheduler is an abstract type used for scheduling tasks (usually asynchronous tasks), which defines when the task should be executed and on which thread or queue it should be executed. In Combine, Scheduler is usually used for operations such as operators, delays, timers, and subscriptions to ensure the order and timing of events.

1
protocol Scheduler<SchedulerTimeType>

Sample code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import Combine
importFoundation

// Create a scheduler for the background queue
let scheduler = DispatchQueue.global(qos: .background)

//Use the scheduler to execute asynchronous tasks
let cancelable = scheduler.schedule {
print("Task executed on background queue.")
}

// Cancel task
cancelable.cancel()

Combine provides multiple types related to the Scheduler type, each type has different behaviors and uses. Here are some commonly used types:

1. ImmediateScheduler

A Scheduler type. Not introducing any latency or asynchronicity, it executes the task immediately on the current thread. This is useful for performing synchronous tasks or simulating immediate execution when testing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import Combine

// Create an ImmediateScheduler scheduler
let scheduler = ImmediateScheduler.shared

//Use the scheduler to execute tasks
let cancelable = scheduler.schedule {
print("Task executed immediately.")
}

//The task will be executed immediately without delay

// Cancel task
cancelable.cancel()

2. SchedulerTimeIntervalConvertible

SchedulerTimeIntervalConvertible is a protocol. It defines a type that converts time intervals (time units) into specific time units for use in the Combine scheduler.
In Combine, time units are usually expressed as seconds (TimeInterval), but different schedulers may use different time units, such as milliseconds or microseconds. Therefore, SchedulerTimeIntervalConvertible provides a general method for converting time units to those appropriate for a specific scheduler.
The protocol defines the following methods:

1
2
3
4
5
6
7
protocol SchedulerTimeIntervalConvertible {
static func seconds(_ s: Int) -> Self
static func seconds(_ s: Double) -> Self
static func milliseconds(_ ms: Int) -> Self
static func microseconds(_ us: Int) -> Self
static func nanoseconds(_ ns: Int) -> Self
}

These methods allow you to convert time units (seconds, milliseconds, microseconds, nanoseconds) to time units suitable for the scheduler and create a type that implements the SchedulerTimeIntervalConvertible protocol. This is typically used in the context of a custom scheduler to ensure time unit consistency.

Here is an example that demonstrates how to use the SchedulerTimeIntervalConvertible protocol to create a custom time unit type:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import Combine

// Customize time unit type and implement SchedulerTimeIntervalConvertible protocol
struct MyTimeUnit: SchedulerTimeIntervalConvertible {
let value: Int

static func seconds(_ s: Int) -> MyTimeUnit {
return MyTimeUnit(value: s)
}

static func seconds(_ s: Double) -> MyTimeUnit {
return MyTimeUnit(value: Int(s))
}

static func milliseconds(_ ms: Int) -> MyTimeUnit {
return MyTimeUnit(value: ms)
}

static func microseconds(_ us: Int) -> MyTimeUnit {
return MyTimeUnit(value: us)
}

static func nanoseconds(_ ns: Int) -> MyTimeUnit {
return MyTimeUnit(value: ns)
}
}

// Use custom time unit type
let myTimeUnit = MyTimeUnit.seconds(2)
print(myTimeUnit.value) // Output: 2

14. ObservableObject

ObservableObject is a protocol in the SwiftUI framework built on Combine. It is used to create observable objects and is usually used to build responsive interfaces. It is one of the core concepts of data-driven interfaces in SwiftUI.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import SwiftUI
import Combine

//Create a class that conforms to the ObservableObject protocol
class MyViewModel: ObservableObject {
// Mark observable properties using the @Published property wrapper
@Published var name: String = "John"
@Published var age: Int = 30
}

struct ContentView: View {
// Bind the ViewModel to the view using the @ObservedObject property wrapper
@ObservedObject var viewModel = MyViewModel()

var body: some View {
VStack {
Text("Name: \(viewModel.name)")
Text("Age: \(viewModel.age)")
Button("Change Name") {
// Modify the properties in ViewModel and the interface will automatically update
viewModel.name = "Alice"
}
}
}
}

In the above example, we have created a MyViewModel class that follows the ObservableObject protocol and marked two observable properties using the @Published property wrapper. Then, in the view ContentView, we use the @ObservedObject property wrapper to bind the viewModel to the view, making it an observable object. When properties in the viewModel are modified on button click, the view automatically updates to reflect the changes.

15. AsyncPublisher/AsyncThrowingPublisher

AsyncPublisher/AsyncThrowingPublisher It is a Publisher that exposes its elements in the form of (throwing) asynchronous sequences. It is defined as follows:

1
struct AsyncPublisher<P> where P : Publisher, P.Failure == Never

The sample code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import Combine

// Create a simple publisher that will never fail (Failure == Never)
let simplePublisher = Just(1)

// Use AsyncPublisher to wrap the publisher to make it an asynchronous sequence
let asyncSequence = simplePublisher.values

// Use for-await-in syntax to traverse the elements of an asynchronous sequence
async {
do {
for try await element in asyncSequence {
print("Received element: \(element)")
}
} catch {
print("Error: \(error)")
}
}

16. CustomCombineIdentifierConvertible

It is a protocol in the Combine framework that is used to help uniquely identify publisher chains. This protocol is typically used when creating custom Subscriber or Subscription types so that development tools can uniquely identify these publisher chains within your application.

If you create a custom subscription or subscriber type, you need to implement this protocol so that development tools can uniquely identify the publisher chain in your application. If your type is a class, Combine provides you with an implementation of combineIdentifier. If your type is a struct, set the identifier as follows:

1
let combineIdentifier = CombineIdentifier()

Where CombineIdentifier is the unique identifier used to identify the publisher’s information flow. Let’s look at a sample code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import Combine

// Custom subscriber type, implement CustomCombineIdentifierConvertible protocol
class MySubscriber<Input, Failure: Error>: Subscriber, CustomCombineIdentifierConvertible {
typealias ReceiveValueClosure = (Input) -> Void
typealias CompletionClosure = (Subscribers.Completion<Failure>) -> Void

private let receiveValue: ReceiveValueClosure
private let receiveCompletion: CompletionClosure

var combineIdentifier: CombineIdentifier // Used to uniquely identify subscriptions

init(receiveValue: @escaping ReceiveValueClosure, receiveCompletion: @escaping CompletionClosure) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
self.combineIdentifier = CombineIdentifier()
}

func receive(subscription: Subscription) {
subscription.request(.unlimited) // Request an unlimited number of elements
}

func receive(_ input: Input) -> Subscribers.Demand {
receiveValue(input)
return .unlimited
}

func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
}
}

//Create a publisher
let publisher = Just(42)

//Create a custom subscriber instance
let subscriber = MySubscriber<Int, Never>(
receiveValue: { value in
print("Received value: \(value)")
},
receiveCompletion: { completion in
switch completion {
case .finished:
print("Subscription completed successfully.")
case .failure(let error):
print("Subscription failed with error: \(error)")
}
}
)

// Subscribe to the publisher
publisher.subscribe(subscriber)

17. The Protocol of Subscription

Subscription is a protocol that represents the subscription relationship between Subscriber and Publisher. Specifically, Subscription describes how to manage subscriptions, including operations such as canceling subscriptions and requesting elements.

1
2
3
public protocol Subscription: Cancellable, CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}

request(:) method: By calling the request(:) method, a subscriber can request to receive elements from the publisher. The request(_:) method accepts a parameter demand, which is a Subscribers.Demand enumeration value, indicating how many elements the subscriber wants to receive from the publisher. Publishers should send elements based on subscriber demand.

The Subscription protocol is a very important part of Combine. It is responsible for managing the life cycle of subscriptions and the delivery of elements. Subscribers can use the request(_:) method to control the rate at which elements are received, while the cancel() method is used to cancel a subscription when it is no longer needed.

In Combine, there is usually no need to manually implement the Subscription protocol because Combine provides many built-in operators and types to handle subscriptions. However, if you need to create a custom subscription, you may need to implement the Subscription protocol to define the behavior of the subscription.

Below is a simple sample code that demonstrates how to create a custom subscription that conforms to the Subscription protocol and use it to manage the subscription’s lifecycle and request elements.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import Combine

// Custom subscription type, consistent with Subscription protocol
class MySubscription<Input, Failure: Error>: Subscription {
private var subscriber: AnySubscriber<Input, Failure>?

init<S>(_ subscriber: S) where S : Subscriber, S.Input == Input, S.Failure == Failure {
self.subscriber = AnySubscriber(subscriber)
}

func request(_ demand: Subscribers.Demand) {
// Simulate sending elements to subscribers
for i in 1...demand.max {
_ = subscriber?.receive(i as! Input)
}
}

func cancel() {
// unsubscribe
subscriber = nil
}
}

//Create a custom publisher
struct MyPublisher: Publisher {
typealias Output = Int
typealias Failure = Never

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
// Create a custom subscription and pass it to the subscriber
let subscription = MySubscription(subscriber)
subscriber.receive(subscription: subscription)
}
}

//Create a custom subscriber
class MySubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never

func receive(subscription: Subscription) {
//Operation at the beginning of subscription
print("Subscription started.")
subscription.request(.max(5)) // Request up to 5 elements
}

func receive(_ input: Input) -> Subscribers.Demand {
// Process the received elements
print("Received element: \(input)")
return .none
}

func receive(completion: Subscribers.Completion<Failure>) {
//Operation when subscription is completed
print("Subscription completed.")
}
}

//Create a custom publisher instance
let publisher = MyPublisher()

//Create a custom subscriber instance
let subscriber = MySubscriber()

// Subscribe to the publisher
publisher.subscribe(subscriber)

Reference

[1] https://developer.apple.com/documentation/combine