rx java - RxJava Zip with priorities -
here's problem. have number of async operations result gets aggregated single 1 further processed. however, not operations equal , error handling different based on operation fails.
to elaborate, let's have operations a, b , c. if fails, need end processing if b or c fail, continue processing others normal.
currently, achieve using count down latches , lot of state management taking close hundred lines of code. move rxjava based implementation. first thought trying observable.zip
operator treats observables equals , not true in case. other idea chaining calls, works, means operations won't start @ same time resulting in greater overall time.
can guide me on how achieve this?
use .onerrorresumenext
:
observable<t> a, b, c; observable.zip( a, b.onerrorresumenext(t -> observable.just(null)), c.onerrorresumenext(t -> observable.just(null)), (x, y, z) -> <your aggregation>) ...
representing errored observables null
you. use optional
:
observable.zip( a, b.map(x -> optional.of(x)) .onerrorresumenext(t -> observable.just(optional.empty())), c.map(x -> optional.of(x)) .onerrorresumenext(t -> observable.just(optional.empty())), (x, y, z) -> <your aggregation>)
if b
, c
correspond external service calls might want ignore them if take long replacing b
b.timeout(5, timeunit.seconds)
. concise eh!
Comments
Post a Comment