RxJava

Reactive 글 모음

  1. Iterable와 Observable의 개념
  2. Reactive History
  3. Reactive Streams
  4. RxJava
  5. Spring Reactive
  6. Reactive PublishOn, SubscribeOn

RxJava는…

RxJava는 Reactive Extensions에서 JVM으로 확장된 라이브러리입니다. Reactive Extensions에 대한 내용은 Reactive History에서 정리하였습니다.
RxJava는 Reactive Streams 표준은 아닙니다. 그러므로 Reactive Streams와 뿌리 자체는 다르게 보는게 맞습니다. 물론 하는 역활, 즉 Reactive한 프로그래밍을 하기 위한 목적은 같습니다만 RxJava는 Reactive Extentions를 주도한 Microsoft에서 Jvm으로 옮긴 Netflix가 주도하여 개발되었다고 할수 있습니다. RxJava와 비슷하게 이름이 지어진것들을 보면 Rxjs, RxAndroid, RxSwift등등이 있는데 모두 Reactive Extentions를 뿌리로 두고 해당 환경에 맞도록 바뀌었다고 할수 있습니다.

RxJava를 사용해보기

Iterable와 Observable의 개념 에서는 Iterable과 Observable이 Duality하다고 설명했습니다. Duality에 대한 설명이 저 포스팅에서는 약했지만 아직도 저 개념을 명확하게 설명할 자신은 없군요.. 처리하려는 기능(또는 처리 결과)은 같지만 처리과정에 있어서 서로 상반되는 부분을 duality라고 하면 설명은 부족하지만 이해하기 위한 최소한은 되는지 모르겠군요.
그럼 RxJava의 기본을 살펴보겠습니다

2-3회에 거쳐 Observable한 코드들과 Reactive Stream포스팅을 준비하다 보니 Observable한 코드는 저는 익숙해지는데 글을 보시는 분들은 어떤지 모르겠습니다.
이 코드에서 Obersavable은 Java의 Observable이 아닙니다. RxJava의 Observable입니다.(라이브러리 세팅은 다루지 않겠습니다) 자바의 Observable과의 차이점을 살펴보자면 제네릭타입이 추가가 되었군요, 해당 코드에서는 String인것을 보니 Observer에게 String을 보내주려나봅니다. Observable.create를 통해 Observable을 만들어냅니다. 이 안에는 Observable.OnSubscribe를 인수로 받는데 OnSubscribe는 어디선가 많이 본 코드이군요.(Reactive Streams 를 보시길)
call메소드를 implements받는데 Subscriber를 파라미터로 받습니다. 이부분도 Reactive Streams와 매우 비슷한 구조를 가져가는데 개념적으로 어느것이든 이해하신다면 무엇을 쓰시든 빨리 적응하실수 있으실거 같습니다.
해당 코드에서는 helloWorld를 onNext로 보내줍니다(점점 소름돋습니다… Reactive Streams와 정말 비슷한녀석이군요…)
그리고 마지막으로 onCompleted를 함으로서 처리를 끝내줍니다(더이상 소름돋지 않겠습니다)

Subscriber도 generic으로 타입을 받아주는군요, Subscriber는 onNext, onCompleted, onError를 받습니다. Observable의 문제점을 RxJava는 이렇게 해결해주고 있군요. 마지막으로 observable에 subscriber를 subscribe메소드로 등록해줍니다.

실행을 한다면 ‘Hello, World!’가 onNext에서 출력되겠군요…

해당 코드는 단순히 String값을 보내주는 Observable입니다. 이런 경우 더 단순하게 Observable을 만들수 있습니다

 Observable myObservable = Observable.just("Hello World");

조금더 코드를 다이어트 시키기 위해 이번에는 subscribe 메소드를 보겠습니다

subscribe메소드는 몇가지 오버로드된 메소드들이 있습니다.
우리는 Subscriber를 넣어주는 메소드를 사용했군요, 그 이외에 Action1를 통한 각각 메소드를 구현하는 방법이 있군요. Action1은 람다식을 사용할때 많이 사용됩니다. 위에 코드는 onNext만 쓰기 때문에 Action1이 하나만 들어가는 메소드를 람다를 써서 코드를 줄여준다면 이렇게 되겠군요

이게 무슨 Observable하지…??? or 지금까지 Observable을 만들고 Observer를 만들어서 쓰는 코드들을 쭉 봐왔는데 이건 그런게 없는데요… 하시는 분들은 이코드는 위의 코드가 리팩토링되었다는것을 다시한번 느끼시기 바랍니다. RxJava는 처음 코드를 이와 같이 만들어 낼수 있습니다.

만약 String으로 넘어오는 데이터 앞에 MillisTime을 찍고 싶다면?

이렇게 하면 Observable이 어떤 String데이터를 보냈을 때 Subscriber는 그 문자열 앞에 Long한 시간을 찍어줄것입니다

이정도면 아주 훌륭한 코드입니다. 하지만 데이터 처리와 출력을 다르게 하고 싶은 경우가 있을수 있습니다.(이정도도 충분한데…..)
이런경우 map을 이용하여 처리하면 됩니다.
map은 Observable의 메소드를 말하는것입니다. map은 Func1을 파라미터로 받는데 Func1은 자바에서 functional을 쓰려고 할때 씁니다. 즉 람다를 쓰라는 뜻이죠.
map을 이용하여 데이터를 처리하는부분과 출력 부분을 분리시키면 다음과 같은 결과가 됩니다.

map은 몇개든 써도 상관이 없습니다. 만약 지금 위와 같이 변경된 문자열의 length를 구해본다면

이렇게 쉽게 추가할수 있습니다. 기능이 분리되고 객체화 되는것이 보이십니까. 이 처리들이 Observable하게 처리되고 있지만 체감상 느껴지지 않으신가요??

Iterable한 Rxjava

이번에는 조금더 나아가 array한 데이터를 사용해보겠습니다. 이 섹션을 시작하기전 공통으로 사용할 컨셉과 코드상에 메소드를 하나 정의하겠습니다.
우리는 검색포탈 엔진을 만들고 있습니다. 키워드를 던지면 해당 키워드가 있는 URL들을 받아 처리하려고 합니다. 우리는 query라는 메소드를 정의하여 이렇게 개발하였습니다.

원래라면 엄청난 알고리즘(?)을 이용하여 데이터를 가져와야 하는데 우리는 저 세개의 url을 무조건 줄수 있는 Observable을 리턴하기로 하죠.

Subscribe에서는 저 url들을 하나하나 출력해보는 코드를 작성해보죠

이렇게 하면 어떻게 될까요.
query라는 메소드에서는 네이버, 구글, 카카오가 들어있는 Observable을 리턴해줄것이죠, 그리고 urls는 Observable이 들고 있는 객체(Arrays.asList한 데이터, 즉 List타입의 객체겠죠)가 넘어오겠죠, “Hello world”를 보내면 String값이 넘어올것이고, List를 보내면 List가 오겠죠

위에서는 List를 for each로 돌려서 출력했군요.. for문이 맘에 들지 않습니다. collection같은 순차적인 데이터는 java stream을 쓰든 어떻게 하든 바꾸라고 엄마가 그랬는데 말이죠…

Observable에는 from이라는 메소드를 통해 Iterable한 데이터를 처리할 수 있습니다

query는 우리가 정의한거로 봤을때 Observable이고 그 안에 subscribe에 또 Observable을 썼네요. 재미있는 구조입니다. 이 안에 Observable.from을 통해 Iterable한 데이터를 처리하고 있습니다.
자 어렵지만 머리속으로 상상해보겠습니다. Observable.from으로 만들어진 아이는 Subscriber이 등록이 되면 Subscriber의 onNext에게 Iterable의 데이터를 하나씩 꺼내서 보내줍니다. 그러면 Subscriber는 onNext에 구현된 System.ont.println을 처리하겠군요…
Obserable하게 처리되는 구조가 상상이 되시나요… (엄청 중요합니다 모른다면 저 코드를 Observable한 코드로 다시 돌아가 코드를 작성하셔서 꼭 확인하시면서 이해해보시길 바랍니다. 전 이해했으니 안하구 넘어감~)

이 코드에도 문제는 있습니다. 아까처럼 이 subscribe는 print만 하고 싶은데 iterable한 데이터 저놈 때문에 코드가 늘어났습니다. 아까는 map을 통해서 해결했습니다. 이번에는 flatMap을 사용해야 합니다.

flatMap을 이용하여 다시한번 기존 subscribe는 출력만을 담당하고 flatMap에서 iterable한 데이터를 뭔가 잘 처리해줬습니다.

String을 사용할때 사용했던 map과 flatMap은 어떤경우 사용해야 할까요…
이 답을 알기 위해 flatMap의 예제는 람다로 처리하지 않았습니다. 큰 차이점은 Iterable을 리턴하는 flatMap과, 자신의 타입(“hello world”일경우 String)을 리턴하는 map이 차이점이겠습니다.
이게 무슨 상황이냐면 Observable은 자신에게 등록된 데이터를 subscriber에 보내줍니다. 위에 단순히 네이버 구글 카카오가 등록된 List가 등록이 되었다면 네이버, 구글, 카카오를 보내는게 아니라 이것들이 들어있는 리스트 객체를 던져버린거죠. 우리는 하나하나 받고 싶었는데 말이죠.. 이런 경우 from은 각각의 요소를 분해해 늘여놓은 하나의 Observable을 만들어냅니다. 그렇게 해서 분해한 데이터를 하나하나 보내게 되는 것이죠.

즉 flatMap을 사용한다면 이론상 iterable한 데이터든, 뭐든 Observable하게만 만들어줄수 있다면 위와 같은 코드를 만들어 낼수 있다는 것입니다. 이것이 지금처럼 String array이든, 파일을 readLine하였든, 데이터베이스에서의 recordset(java에서는 resultSet이라고 하나요..)이든 다 된다는거죠

우리가 아까 했던 시간을 앞에 붙여주는것을 한다면 이렇게 되겠네요

filter

이번에는 filter에 대해 알아보겠습니다. RxJava에서는 filter를 이용해서 데이터를 골라낼수 있습니다.
만약 데이터가 null이 들어온다면 이건 걸러내고 싶습니다. 이런 경우

filter에서 null을 제거한 데이터만 subscriber에게 전달하게 됩니다

take
기존 Reactive Streams를 보면 subscription에 request에 몇개를 달라 파라미터를 줍니다
RxJava에는 take를 이용하여 처리합니다

take에 5를 줬다면 5개를 요청하겠군요. 5번 호출되고 onComplete가 호출되고 끝날것입니다. 우리는 onComplete를 구현하지 않았기 때문에 그냥 끝나겠네요..

doOnXX
다음과 같이 doOnNext를 사용해볼수도 있습니다. doOnNext는 OnNext가 실행될 때 호출이 됩니다. 지금같은 경우 url을 보내기 전에 saveTitle이 먼저 호출되고 그 다음에 subscriber의 onNext가 되겠죠.

doOnNext이외에도 다양한 기능이 있으니 참고하면 좋을거 같습니다.

RxJava에서 Exception처리

제가 본 레퍼런스에서는 최종적인 예제로 Exception처리를 설명해줬습니다.

만약 Observable에서 날라오는 데이터에 대한 유효성 체크를 본다면 해당 코드와 같은 모델이 되지 않을까 합니다. 지금은 s라는 variable에 잔뜩 붙겠습니다만 해당 데이터의 에러여부 전처리나 에러 판단, 후 처리 등을 모두 할수 있을것입니다.

하지만 RxJava는 에러처리도 깔끔하게 만들어 낼수 있습니다

해당코드에는 onErrorReturn을 보셔야 합니다. 만약 에러가 날경우 어떻게 처리해야 할까요.. 코드에서 보면 onNext를 두번 잘 호출합니다만, onError를 강제로 발생시키는 코드가 있습니다. onError가 난다면 원래라면 onError가 호출되어야 하지만 이 코드에서는 onErrorReturn이 받아서 에러일경우 “return”이라는 문자열을 onNext로 보내줍니다.

해당코드에는 onErrorResumeNext가 구현되어 있는데요, 여기서는 에러가 날경우 Observable을 리턴해서 이 Observable이 진행되도록 합니다. 실행결과를 보면

이 출력됩니다. 이 코드를 보고 순간 앗 그러면 onError 다음에 또 onNext를 하면 어떻게 되지? 라는 궁금증을 아주 살짝 했지만 원래 onError나 onComplete가 되면 Observable은 데이터를 더이상 보내지 않는다는 스펙이 있기 때문에 접었습니다. 그래도 궁금하신 분들은 해보시면 됩니다.

만약 서버가 잠깐 끊겨서 그런것이라던가, 큐가 너무 쌓여서 잠깐 안된것이라면? 다시 시도하는 코드가 있어야죠…

retry를 이용해서 에러가 날경우 재시도를 합니다. retry는 숫자를 인수로 받을수 있는데 이경우에는 n번 재시도를 하며 없으면 무한으로 재시도를 합니다.
retry의 조건을 조금 다양하게 만들고 싶으시다면 다음과 같이 구현하셔도 됩니다

retryWhen을 이용하여 디테일한 시간도 정할수 있습니다

정리

RxJava의 기본부터 Iterable한 데이터를 처리하는 법을 알아보았습니다. 또 Exception을 처리하는 방법도 알게 되었습니다. RxJava는 Java진영에서 가장 많이 사용되고 있는 Reactive Programming 라이브러리입니다. 저는 처음에는 리엑티브 프로그래밍을 하면 거대한 데이터를 처리하기에 적합한 방식으로만 개념을 이해하고 있었습니다. 하지만 api를 익히고 많은 에제들을 보면서 stream을 처리하는 좋은 방법으로 많이 와닿고 있습니다. 최근 java8이(java8이 이제 최근은 아니죠…) 나오면서 스트림, 람다를 적용할수 있게 되었고 이는 functional programming을 할수 있도록 하고 있습니다. 지금 작성한 코드들은 우리가 비즈니스 로직을 처리하는데 많이 쓰이는 아주 간단한 작업들입니다. 이런 작업들을 RxJava를 이용하여 개발한다면 우리가 작성한 코드의 구조화와 가독성이 충분히 증가할수 있으리라 생각합니다.

참고

RxJava와 Java8 stream이 매우 비슷한 모델을 가지고 갑니다.
이 차이점이 궁금했고 sof가 설명해줍니다.
http://stackoverflow.com/questions/30216979/difference-between-java-8-streams-and-rxjava-observables

코드는 github에 올려놨지만 개인 스터디에서 작성한 코드이며 포스팅을 작성할때 코드가 좀 바꼈는데…
push를 날려야 하나 말아야 하나 모르겠습니다.
우선 링크
https://github.com/devload/RxJavaExample/tree/master/src/main/java

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Google photo

Google의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

%s에 연결하는 중

WordPress.com 제공.

위로 ↑

%d 블로거가 이것을 좋아합니다: