Combine Framework-Part 2

Transforming Operators, Filtering Operators

I am trying to learn combine from Combine: Asynchronous Programming with Swift. It’s a short note of this book. I must suggest going through this book. The initial reactive programmer may find it a good book to start with the combine.

In Combine, methods that perform an operation on values coming from a publisher are called operators.

Each Combine operator actually returns a publisher. Generally speaking, that publisher receives the upstream values, manipulates the data, and then sends that data downstream.

Transforming Operators

collect()

collect will buffer a stream of individual values into an array of those values once the upstream publisher completes. It will then emit that array downstream.

["A", "B", "C", "D", "E"].publisher
.collect()
.sink(receiveCompletion: { print($0) }, receiveValue: { print($0) })
.store(in: &subscriptions)

Note: Be careful when working with collect() and other buffering operators that do not require specifying a count or limit. They will use an unbounded amount of memory to store received values.

You can also give a certain number in your collection. It will distribute all emitted value into those number set.

.collect(2)

The output will be

[A, B], [C, D], [E]

Mapping values

map(_:)

It works just like Swift’s standard map, except that it operates on values emitted from a publisher.

[123, 4, 56].publisher
.map { $0*10 }.sink(receiveValue: { print($0) })
.store(in: &subscriptions)

Here we multiply each value with 10.

Map key paths

The map family of operators also includes three versions that can map into one, two, or three properties of a value using key paths. Their signatures are as follows.

tryMap(_:)

Several operators, including map, have a counterpart try operator that will take a closure that can throw an error. If you throw an error, it will emit that error downstream.

Just("Directory name that does not exist")
.tryMap { try
FileManager.default.contentsOfDirectory(atPath: $0) }.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)

Here we try to get the content of a directory. As no directory exist by this name so it will send an error which our tryMap will pass to the downstream subscriber or publisher.

Flattening publishers

flatMap(maxPublishers:_:)

The flatMap operator can be used to flatten multiple upstream publishers into a single downstream publisher

A common use case for flatMap in Combine is when you want to pass elements emitted by one publisher to a method that itself returns a publisher, and ultimately subscribe to the elements emitted by that second publisher

func publisherProducer(data:[Int]) -> AnyPublisher<Int, Never>{Just(
data.reduce(0) { sum, currentValue in
sum + currentValue
}).eraseToAnyPublisher()
}
[1,2,3,4,5].publisher
.collect()
.flatMap(publisherProducer)
.sink {
print($0)
}

Here we use flatmap to get the sum whichever collect just give to us.

Note:- flatMap flattens the output from all received publishers into a single publisher. This can pose a memory concern, because it will buffer as many publishers as you send it to update the single publisher it emits downstream.

Replacing upstream output

replaceNil(with:)

As the name suggest, it will replace nil value with the value which you provide.

["A", nil, "C"].publisher
.eraseToAnyPublisher()
.replaceNil(with: "-") // 2
.sink(receiveValue: { print($0) }) // 3
.store(in: &subscriptions)
//Output
A
-
C

Here we replace nil value with “_”

replaceEmpty(with:)

You can use the replaceEmpty(with:) operator to replace — or really, insert — a value if a publisher completes without emitting a value.

let empty = Empty<Int, Never>()empty
.replaceEmpty(with: 10)
.sink {
print($0)
}.store(in: &subscribers)

Here, empty publisher do not publish anything. So our replaceEmpty insert 10 and pass it to the subscriber.

scan(_:_:)

It will provide the current value emitted by an upstream publisher to a closure, along with the last value returned by that closure.

var dailyGainLoss: Int { .random(in: -10...10) }let august2019 = (0..<22)
.map { _ in dailyGainLoss }
.publisher
august2019
.scan(50) { latest, current in
max(0, latest + current)
}
.sink(receiveValue: { _ in })
.store(in: &subscriptions)
}

Here the initial latest value is 50. Which we pass in scan parameter.

Filtering Operators

filter

It takes a closure returning a Bool and only passes down values that match the provided predicate.

(1...10).publisher.filter{$0.isMultiple(of: 3)}.sink {
print($0)
}
//Output = 3,6,9

removeDuplicates

As the name suggest it will remove duplicates.

["a","a","b","c"].publisher
.removeDuplicates()
.sink{
print($0)
}
//a b c

Compacting and ignoring

compactMap

Quite often, you’ll find yourself dealing with a publisher emitting Optional values. Or even more commonly, you’ll want to perform some operation on your values that might return nil, but who wants to handle all those nils ?!

["a","10","1","c"].publisher
.compactMap{
Int($0)
}
.sink{
print($0)
}
//output = 10, 1

ignoreOutput

sometimes you just want to know is publisher finish his task or not. You don’t care about what the publisher emit. So, you can use ignoreOutput then.

["a","10","1","c"].publisher
.ignoreOutput()
.sink {
print($0)
} receiveValue: {
print($0)
}

So, when publisher finish its emission, ignoreOutput() publisher just call its completion block. Do not pass any value to its downstream publisher or subscriber.

Finding values

first(where:)

you use them to find and emit only the first value matching the provided predicate, respectively.

["a","10","1","c"].publisher
.first{
$0 == "1"
}
.sink {
print($0)
} receiveValue: {
print($0)
}

As soon as first(where:) finds a matching value, it sends a cancellation through the subscription, causing the upstream to stop emitting values. So, when it get the “1” it will cancel subscription and the publisher do not have any other subscription. We all know publisher stop publishing value when it’s does not have any subscription. So, it stops emitting value.

last(where:)

purpose is to find the last value matching a provided predicate

["a","10","1","c"].publisher
.last{
$0 == "1"
}
.sink {
print($0)
} receiveValue: {
print($0)
}

As opposed to first(where:), this operator is greedy since it must wait for all values to emit to know whether a matching value has been found. For that reason, the upstream must be a publisher that completes at some point.

Note:- Upstream publisher need to complete. Otherwise it will not publish anything because it do not know it’s last match.

Dropping values

you can use it when you want to ignore values from one publisher until a second one starts publishing, or if you want to ignore a specific amount of values at the start of the stream.

dropFirst

The dropFirst operator takes a count parameter — defaulting to 1 if omitted — and ignores the first count values emitted by the publisher. Only values emitted after count values have been emitted will be allowed through.

["a","10","1","c"].publisher
.dropFirst(2)
.sink {
print($0)
} receiveValue: {
print($0)
}
//output -> 1 c

drop(while:)

This is another extremely useful variation that takes a predicate closure and ignores any values emitted by the publisher until the first time that predicate is met. As soon as the predicate is met, values begin to flow through the operator

["a","10","1","c"].publisher
.drop(while: {
$0 != "1"
})
.sink {
print($0)
} receiveValue: {
print($0)
}

So, it say drop all value till you find “1”. When you find “1” start emitting from there.

drop(untilOutputFrom:)

It skips any values emitted by a publisher until a second publisher starts emitting values, creating a relationship between them

let secondPublisher = PassthroughSubject<Void, Never>()
let firstPublisher = PassthroughSubject<Int, Never>()
firstPublisher
.drop(untilOutputFrom: secondPublisher)
.sink {
print($0)
} receiveValue: {
print($0)
}.store(in: &subscribers)
firstPublisher.send(1)
firstPublisher.send(2)
firstPublisher.send(3)
secondPublisher.send()
firstPublisher.send(10)
firstPublisher.send(11)
//output 10, 11

So, first 3 value are dropped because we do not send initial data to secondPublisher.

Limiting values

The prefix family of operators is similar to the drop family and provides prefix(_:), prefix(while:) and prefix(untilOutputFrom:). However, instead of dropping values until some condition is met, the prefix operators take values until that condition is met.

prefix(_:)

It will take values only up to the provided amount and then complete

(1...10).publisher
.prefix(3)
.sink {
print($0)
}
// output = 1, 2, 3

It is also a lazy operator. So, when it gets first 3 value prefix will send a cancel request to the publisher. So publisher will not pass any further value.

prefix(while:)

It takes a predicate closure and lets values from the upstream publisher through as long as the result of that closure is true. As soon as the result is false, the publisher will complete

(1...10).publisher
.prefix(while: {
$0 < 3
})
.sink {
print($0)
}
//Output = 1, 2

prefix(untilOutputFrom:)

prefix(untilOutputFrom:) takes values until a second publisher emits.

let secondPublisher = PassthroughSubject<Void, Never>()
let firstPublisher = PassthroughSubject<Int, Never>()
firstPublisher
.prefix(untilOutputFrom: secondPublisher)
.sink {
print($0)
} receiveValue: {
print($0)
}.store(in: &subscribers)
firstPublisher.send(1)
firstPublisher.send(2)
firstPublisher.send(3)
secondPublisher.send()
firstPublisher.send(10)
firstPublisher.send(11)

Those who follow or read comment down if you have questions. I am planning to write on testing from iOS Unit Testing by Example. They write for UIKit. I will try to write it in SwiftUI and using Combine. I also love to write more algorithm topics soon. So stay positive and we will learn many things together. If you want to connect, knock me on LinkedIn. Happy coding and stay safe.