Combine Framework- Part 1

Publisher and Subscriber

Rejaul Hasan
8 min readSep 9, 2021

I am not an expert on writing code in a declarative way. I am trying to learn combine from Combine: Asynchronous Programming with Swift. It’s a short note of this book. I must suggest going through this book. The initial reactive programmer may find it a good book to start with the combine.

Publisher

A publisher publishes or emits events that can include values of interest. It only emits two kinds of events

  1. Values, also referred to as elements.
  2. A completion event.

A publisher can emit zero or more values but only one completion event, which can either be a normal completion event or an error. Once a publisher emits a completion event, it’s finished and can no longer emit any more events.

Subscriber

Subscriber is a protocol that defines the requirements for a type to be able to receive input from a publisher. A publisher only emits an event when there’s at least one subscriber.

Subscribing with Sink

It provides 2 closures. One is for value and another is for completion closure.

Just Publisher

It’s a publisher that emits its output to each subscriber once and then finishes.

assign(to:on:)

It helps you to assign publisher emitted value into any class object property.

example(of: "assign(to:on:)") {
class SomeObject {
var value: String = "" {
didSet {print(value)
}
} }
let object = SomeObject()
let publisher = ["Hello", "world!"].publisher
_ = publisher
.assign(to: \.value, on: object)
}

Here publishers emmit string values and assign catch those values into value property of SomeObject class.

Republishing with assign(to:)

example(of: "assign(to:)") {
class SomeObject {
@Published var value = 0
}
let object = SomeObject()
object.$value
.sink {
print($0)
}
(0..<10).publisher
.assign(to: &object.$value)
}
  1. Define and create an instance of a class with a property annotated with the @Published property wrapper, which creates a publisher for value in addition to being accessible as regular property.
  2. Use the $ prefix on the @Published property to gain access to its underlying publisher, subscribe to it, and print out each value received.
  3. Create a publisher of numbers and assign each value it emits to the value publisher of object. Note the use of & to denote an inout reference to the property.

The assign(to:) operator doesn’t return an AnyCancellable token, because it manages the lifecycle internally and cancels the subscription when the @Published property deinitializes.

Cancellable

When a subscriber is done and no longer wants to receive values from a publisher, it’s a good idea to cancel the subscription to free up resources and stop any corresponding activities from occurring, such as network calls.

Subscriptions return an instance of AnyCancellable as a “cancellation token,” which makes it possible to cancel the subscription when you’re done with it. AnyCancellable conforms to the Cancellable protocol, which requires the cancel() method exactly for that purpose.

Publish protocol internal code

public protocol Publisher {
associatedtype Output
associatedtype Failure : Error
func receive<S>(subscriber: S)
where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
extension Publisher {
public func subscribe<S>(_ subscriber: S)
where S : Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
  1. We have output and failure which match with subscriber input and failure.
  2. we have a subscribe method that takes a subscriber as a parameter. So subscribers use this method to request a subscription.
  3. receive is called internally by the publisher from the subscribe method and it attached the subscriber with publisher. Generate a subscription for the subscriber.

Subscriber protocol internal code

public protocol Subscriber: CustomCombineIdentifierConvertible {associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
}
  1. As expected, here also have Input and Failure. Which matches with publisher Output and Failure.
  2. The publisher calls receive(subscription:) on the subscriber to give it the subscription.
  3. The publisher calls receive(_:) on the subscriber to send it a new value that the publisher just published.
  4. The publisher calls receive(completion:) on the subscriber to tell it that it has finished producing values, either normally or due to an error.

Subscription protocol

public protocol Subscription: Cancellable,
CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}

The subscriber calls request(_:) to indicate it is willing to receive more values, up to a max number or unlimited.

In Subscriber, notice that receive(_:) returns a Demand. Even though the max number of values a subscriber is willing to receive is specified when initially calling subscription.request(_:) in receive(_:), you can adjust that max each time a new value is received. It works in an additive way. Every time you give some value through receive(_:) it will add with your previous value. You can not give a negative value.

Creating a custom subscriber

final class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(3))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
}

Here we create a custom Subscriber class IntSubscriber. We confirm that input will be Int and it will not produce any kind of error(Failure = Never). Let’s create a publisher which publishes Int value. Then we will create an IntSubscriber object(subscriber) that subscribes to our publisher.

let publisher = (1...6).publisher
let subscriber = IntSubscriber()
publisher.subscribe(subscriber)

You did not receive a completion event. This is because the publisher has a finite number of values, and you specified a demand of .max(3), and you publisher emmit 6 values so it can not finish.

func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .unlimited
}

This time you will get completion. Because you are saying that the subscriber will receive unlimited value.

Future Publisher

Future can be used to asynchronously produce a single result and then complete

func test(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never>{
Future<Int, Never> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
promise(.success(integer + 1))
}
}
}

Here we define a function test that returns Future publisher. Let’s have a look at Future.

final public class Future<Output, Failure> : Publisher where Failure : Error {public typealias Promise = (Result<Output, Failure>) -> Voidpublic init(_ attemptToFulfill: @escaping (@escaping Future<Output, Failure>.Promise) -> Void)final public func receive<S>(subscriber: S) where Output == S.Input, Failure == S.Failure, S : Subscriber}

So Future init has a closure as a parameter and it has typealias name Promise which is a closure. This is the one he is capable to publish or emmit.

let’s use our publisher

let future = test(integer: 1, afterDelay: 3)future
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)

We get Future publisher and subscribe to it using sink. The output is

2
finished

So, it just provides one output then it calls the completion handler.

Subject

  1. PassthroughSubject: enable you to publish new values on demand. They will happily pass along those values and a completion event.
  2. CurrentValueSubject: You must initialize current value subjects with an initial value; new subscribers immediately get that value or the latest value published by that subject.

A subject acts as a go-between to enable non-Combine imperative code to send values to Combine subscribers.

enum MyError: Error {case test }final class StringSubscriber: Subscriber {
typealias Input = String
typealias Failure = MyError
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: String) -> Subscribers.Demand {
print("Received value", input)
return input == "World" ? .max(1) : .none
}
func receive(completion: Subscribers.Completion<MyError>) {
print("Received completion", completion)
}
}
let subscriber = StringSubscriber()let subject = PassthroughSubject<String, MyError>()
subject.subscribe(subscriber)
let subscription = subject
.sink( receiveCompletion: { completion in
print("Received completion (sink)", completion)
}, receiveValue: { value in
print("Received value (sink)", value)
})

Here we create a PassthroughSubject and 2 subscriber subscribes to this subject.

Passthrough subjects enable you to publish new values on demand. They will happily pass along those values and a completion event. As with any publisher, you must declare the type of values and errors it can emit in advance; subscribers must match those types to its input and failure types in order to subscribe to that passthrough subject.

So, you can pass data to subscribers using send() method

subject.send("Hello")
subject.send("World")

After calling completion() whatever you send your subscriber will not get anything. The publisher has the rules that it calls completion block 1 time only and before calling it, you can pass several values. You can see the first section of this article for it.

subject.send(completion: .finished)
subject.send("How about another one?")

So the subscriber will not get “How about another one?”.

Cancel a subscriber

As we talk about earlier that each subscriber returns an AnyCancellable object. You can use it to cancel your subscription manually or you can store it in a collection of AnyCancellable sets. The collection will then automatically cancel each subscription added to it when the collection is about to be deinitialized.

var subscriptions = Set<AnyCancellable>()
let subject = CurrentValueSubject<Int, Never>(0)
subject
.sink(receiveValue: { print($0) })
.store(in: &subscriptions) // 4
}

Here, subscriptions is the collection object. You store your subscription into it. When it deallocates, all subscriptions also cancel out.

Now, send some value

subject.send(1)
subject.send(2)

Unlike a passthrough subject, you can ask a current value subject for its value at any time.

print(subject.value)

Calling send(_:) on a current value subject is one way to send a new value. Another way is to assign a new value to its value property.

subject.value = 3
print(subject.value)

can you also assign a completion event to the value property?

subject.value = .finished

Nope! That produces an error. A CurrentValueSubject’s value property is meant for just that: values. Completion events must still be sent using send(_:).

subject.send(completion: .finished)

Type erasure

when you want to let subscribers subscribe to receive events from a publisher without being able to access additional details about that publisher.

let subject = PassthroughSubject<Int, Never>()
let publisher = subject.eraseToAnyPublisher()
publisher
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)
subject.send(0)

So, Here you create a PassthroughSubject the hide its type by using eraseToAnyPublisher. Then you sink and send your value.

AnyPublisher is a type-erased struct that conforms the Publisher protocol. Type erasure allows you to hide details about the publisher that you may not want to expose to subscribers or downstream publishers.

AnyCancellable is a type-erased class that conforms to Cancellable, which lets callers cancel the subscription without being able to access the underlying subscription to do things like request more items.

One example of when you would want to use type erasure for a publisher is when you want to use a pair of public and private properties, to allow the owner of those properties to send values on the private publisher, and let outside callers only access the public publisher for subscribing but not be able to send values.

AnyPublisher does not have a send(_:) operator, so new values cannot be added to that publisher.

Please read to understand more. Part-2, Part-3, Part-4.

Those who follow or read comment down if you have questions. I am planning to write on testing from iOS Unit Testing by Example. They write for UIKit. I will try to write it in SwiftUI and using Combine. I also love to write more algorithm topics soon. So stay positive and we will learn many things together. If you want to connect, knock me on LinkedIn. Happy coding and stay safe.

--

--

Rejaul Hasan

I work as a Sr. software engineer for iOS platform. Available to discuss about any good opportunities or projects.