RxSwift

[RxSwift] Time Based Operators

thoonk: 2022. 7. 18. 15:09
반응형

Time Based Operators 에 관해 정리한 내용을 기록합니다.

 

Time Based Operators

 

Interval 

주어진 시간 간격을 두고 주기마다 방출되는 Operator

completed 되지 않고 무한한 시퀀스를 생성한다.

disposed 하지 않으면 구독 이후 계속 반복하여 방출한다.

 

public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
    return Timer(
        period: period, // 이벤트가 방출되는 주기
        scheduler: scheduler // 스케쥴러 설정
    )
}

Observable<Int>
    .interval(.seconds(3), scheduler: MainScheduler.instance)
    .take(3)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

// 3초에 한 번씩 방출됨.
/*
0
1
2
*/

 

Timer 

주어진 시간 후에 특정 이벤트를 방출하는 Operator

 

public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType) -> Observable<Element> {
    return Timer(
        dueTime: dueTime, // 처음 값이 방출되기까지의 시간
        period: period, // 이벤트가 방출되는 주기
        scheduler: scheduler // 타이머가 작동될 스케쥴러
    )
}
	
Observable<Int>
    .timer(
        .seconds(1),
        period: .seconds(1),
        scheduler: MainScheduler.instance
    )
    .take(5)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

/*
0
1
2
3
4
5
*/

 

Timeout

특정 시간을 초과하면 에러를 방출하는 Operator 

 

public func timeout(
	_ dueTime: RxTimeInterval, // 초과 시간 설정
	scheduler: SchedulerType // 작동할 스케쥴러
) -> Observable<Element> {
		return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.timeout), scheduler: scheduler)
}

let button = UIButton(type: .system)
button.setTitle("눌러주세요", for: .normal)
button.sizeToFit()

PlaygroundPage.current.liveView = button

button.rx.tap
    .do(onNext: {
        print("tapped")
    })
    .timeout(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

// 시간 초과되기 전 버튼 클릭했을 때
// tapped
// ()

// 시간이 초과되었을 때 에러 이벤트 방출
// Unhandled error happened: Sequence timeout.

 

Buffer

Observable 에서 방출하는 이벤트를 주기적으로 한번에 묶어서 Array로 방출하는 Operator

 

public func buffer(
    timeSpan: RxTimeInterval, // 이벤트를 모으는 시간 설정
    count: Int, // 이벤트를 모을 최대 개수
    scheduler: SchedulerType // 작동될 스케쥴러
    ) -> Observable<[Element]> {
    BufferTimeCount(
        source: self.asObservable(), 
        timeSpan: timeSpan, 
        count: count, 
        scheduler: scheduler
    )
}

Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .buffer(timeSpan: .seconds(6), count: 2, scheduler: MainScheduler.instance)
    .take(3)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

/*
[0, 1]
[2, 3]
[4, 5]
*/

 

Window

Observable에서 방출되는 이벤트를 주기적이고 개별적인 Observable 타입으로 방출

동작은 Buffer와 비슷하지만, 반환 타입이 다르다. 

 

public func window(
    timeSpan: RxTimeInterval, 
    count: Int, 
    scheduler: SchedulerType
) -> Observable<Observable<Element>> {
    return WindowTimeCount(
        source: self.asObservable(), 
        timeSpan: timeSpan, 
        count: count, 
        scheduler: scheduler
    )
}

Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .window(
        timeSpan: .seconds(2),
        count: 2,
        scheduler: MainScheduler.instance
    )
    .take(5)
    .subscribe(onNext: {
        $0.subscribe(onNext: { event in
            print(event)
        }, onCompleted: {
            print("end subdividing")
        })
        .disposed(by: bag)
    })
    .disposed(by: bag)


/*
0
end subdividing
1
2
end subdividing
3
4
end subdividing
5
6
end subdividing
7
8
end subdividing
*/

 

Debounce

특정 시간 동안 연속적인 이벤트가 발생했을 때, 다른 이벤트가 발생되지 않는다면 가장 마지막 이벤트가 방출됨.

주로 텍스트 필드 입력에 사용됨. (자동완성, 연관검색어 노출)

 

public func debounce(
	_ dueTime: RxTimeInterval,
	scheduler: SchedulerType
) -> Observable<Element> {
		return Debounce(
			source: self.asObservable(),
			dueTime: dueTime, 
			scheduler: scheduler
	)
}

let button = UIButton(type: .system)
button.setTitle("Debounce", for: .normal)
button.sizeToFit()

var count = 0

button.rx.tap
    .debounce(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe(onNext: { _ in
        count += 1
        print(count)
    })
    .disposed(by: bag)

PlaygroundPage.current.liveView = button

/*
1
2
3

...

*/

 

Throttle

첫 이벤트를 바로 방출 후에 특정 시간 동안의 연속된 이벤트에 대해 마지막 이벤트만 방출

버튼 입력 중복 방지, 스크롤 빨리 내리면 3초의 간격으로 몇 천 개의 데이터 요청할 때 사용

 

public func throttle(
		_ dueTime: RxTimeInterval, // 첫 방출 후 블락할 시간
		latest: Bool = true, // latest: true -> 디폴트, latest: falsee -> 한 번만 반영                                                                                    
		scheduler: SchedulerType // 작동할 스케쥴러
) -> Observable<Element> {
        Throttle(
					source: self.asObservable(),
					dueTime: dueTime,
					latest: latest,
					scheduler: scheduler
			)
    }

let button = UIButton(type: .system)
button.setTitle("Throttle", for: .normal)
button.sizeToFit()

var count = 0

button.rx.tap
    .throttle(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe(onNext: { _ in
        count += 1
        print(count)
    })
    .disposed(by: bag)

PlaygroundPage.current.liveView = button

/*
1
2
3

...

*/

위 코드에서 처음 버튼 눌렀을 때, 이벤트 방출 후 3초 동안 입력을 받지 않고 3초 지난 후 마지막 이벤트를 한 번 방출함.

 

Delay 

특정 시간 동안 시퀀스 자체를 지연시키는 Operator

public func delay(
	_ dueTime: RxTimeInterval, // 지연될 시간
	scheduler: SchedulerType // 작동할 스케쥴러
) -> Observable<Element> {
	return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}

Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .delay(.seconds(3), scheduler: MainScheduler.instance)
		.take(3)
    .subscribe(onNext: {
        print($0)
    })
    
    .disposed(by: bag)

// 3초 뒤 
// 0
// 1
// 2

 

DelaySubscription

특정 시간 동안 구독 시점을 지연시키는 Operator

 

public func delaySubscription(
	_ dueTime: RxTimeInterval,
	scheduler: SchedulerType
) -> Observable<Element> {
        DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}

let delaySource = PublishSubject<String>()

var delayCnt = 0
let delayTimerSource = DispatchSource.makeTimerSource()
delayTimerSource.schedule(deadline: .now(), repeating: .seconds(1))
delayTimerSource.setEventHandler {
    delayCnt += 1
    delaySource.onNext("\(delayCnt)")
}
delayTimerSource.resume()

delaySource
    .delaySubscription(.seconds(5), scheduler: MainScheduler.instance)
    .take(5)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

// 구독 시점 5초 뒤로 지연
// 6
// 7
// 8
// 9 
// 10

Delay와 비슷하지만 구독 시점을 지연시키는 점에서 다르다. 

 

Replay

이벤트를 방출한 이후에 구독했을 때, 해당 시퀀스를 다시 방출해주는 Operator

 

public func replay(_ bufferSize: Int) -> ConnectableObservable<Element> {
		self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
}

let replaySubject = PublishSubject<String>()
let replayOperator = replaySubject
    .replay(2)
replayOperator.connect()

replaySubject.onNext("RxSwift")
replaySubject.onNext("Hellow")
replaySubject.onNext("World")

replayOperator
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

replaySubject.onNext("End.")

// Hello 
// World
// End.

replayOperator를 구독했을 때, replay의 버퍼만큼 이벤트가 방출되는 것을 확인할 수 있다.

‘End.’는 구독 시점 이후에 발생한 이벤트이므로 버퍼와 상관없이 방출된다.

 

ReplayAll

Replay operator와 작동이 유사하지만 버퍼 크기와 상관 없이 모든 이벤트를 다시 방출해주는 Operator

버퍼 크기가 없어 메모리 이슈 발생의 위험성이 존재한다.

 

let doctorStrange = PublishSubject<String>()
let timeStone = doctorStrange.replayAll()
timeStone.connect()

doctorStrange.onNext("Dormammu")
doctorStrange.onNext("I've come to bargain.")
doctorStrange.onNext("Dormammu")
doctorStrange.onNext("I've come to bargain.")

timeStone
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: bag)

// Dormammu
// I've come to bargain.
// Dormammu
// I've come to bargain.

 

 

부족한 점 피드백해주시면 감사합니다👍

반응형