2015-04-28 104 views
4

我想用rxJava來實現這個工作流程,但是我確定如果我濫用或做錯了什麼東西。用RXJava處理緩存

  • 用戶要求登錄
  • 如果loginResult在高速緩存中可用,那麼「發出」緩衝LoginResult
  • 否則實際執行請求的web服務和緩存結果,如果一切全成
  • 如果最多發生3次重試錯誤,如果有第4次,則清除緩存。

這是我完整的代碼片段。

public class LoginTask extends BaseBackground<LoginResult> { 
    private static CachedLoginResult cachedLoginResult = new CachedLoginResult(); 
    private XMLRPCClient xmlrpcClient; 
    private UserCredentialsHolder userCredentialsHolder; 

    @Inject 
    public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) { 
    this.xmlrpcClient = client; 
    this.userCredentialsHolder = userCredentialsHolder; 
    } 

    @Override 
    public LoginResult performRequest() throws Exception { 
    return UserApi.login(
     xmlrpcClient, 
     userCredentialsHolder.getUserName(), 
     userCredentialsHolder.getPlainPassword()); 


    } 

    @Override 
    public Observable<LoginResult> getObservable() { 
    return cachedLoginResult.getObservable() 
     .onErrorResumeNext(
      Observable.create(
       ((Observable.OnSubscribe<LoginResult>) subscriber -> { 
        try { 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onNext(performRequest()); // actually performRequest 
        } 
        subscriber.onCompleted(); 
        } catch (Exception e) { 
        subscriber.onError(e); 
        } 
       }) 
      ) 
       .doOnNext(cachedLoginResult::setLoginResult) 
       .retry((attempts, t) -> attempts < 3) 
       .doOnError(throwable -> cachedLoginResult.purgeCache()) 
     ); 
    } 


    private static class CachedLoginResult { 
    private LoginResult lr = null; 
    private long when = 0; 

    private CachedLoginResult() { 
    } 

    public boolean hasCache() { 
     return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis(); 
    } 

    public void setLoginResult(LoginResult lr) { 
     if (lr != null) { 
      this.lr = lr; 
      this.when = System.currentTimeMillis(); 
     } 
    } 

    public void purgeCache() { 
     this.lr = null; 
     this.when = 0; 
    } 

    public Observable<LoginResult> getObservable() { 
     return Observable.create(new Observable.OnSubscribe<LoginResult>() { 
     @Override 
     public void call(Subscriber<? super LoginResult> subscriber) { 
      if (!subscriber.isUnsubscribed()) { 
      if (hasCache()) { 
       subscriber.onNext(lr); 
       subscriber.onCompleted(); 
      } else { 
       subscriber.onError(new RuntimeException("No cache")); 
      } 
      } 
     } 
     }); 
    } 
    } 
} 

因爲我wan't能找到任何類似的例子,我開始「耍」 rxjava僅有1天前我不能確定我的執行。

謝謝你的時間。

回答

2

我覺得這個代碼是好的,好工作:)

你是使用權Observable.createLoginTask,否則造成的通話可能被內部緩存,然後retry將沒有太大的幫助......

這是我認爲無論如何是CachedLoginResultObservable。在這裏,您可以通過使用Observable.justObservable.error實用方法,像簡化代碼:

public Observable<LoginResult> getObservable() { 
    if (hasCache()) { 
     return Observable.just(lr); 
    } else { 
     return Observable.error(new RuntimeException("No cache")); 
    } 
} 

注:just商店,你告訴它的價值在內部散發,使resubscriptions總是會產生這個值。這就是我上面所暗示的,例如你不應該這樣做Observable.just(performRequest()).retry(3),因爲performRequest只會被調用一次。

+0

您好西蒙,正確的,如果「錯了,但是用你。只是和.error會使數值在觀察到創建發射。如果我創建主觀察值並在緩存過期後使用它,會發生什麼?我想它會讓我回到過去30分鐘的舊緩存,現在應該過期了嗎? –

+0

你是對的,只是在你調用getObservable()的時候捕獲緩存的值,所以'Observable.create'或'Observable.defer'實際上是有意義的。也看看Akarnokd的答案;) –

0

如果我理解正確,您想要執行一次登錄並以反應方式緩存結果?如果是這樣,這裏是一個例子,我會怎麼做:

import java.util.concurrent.ThreadLocalRandom; 

import rx.*; 
import rx.schedulers.Schedulers; 
import rx.subjects.AsyncSubject; 


public class CachingLogin { 
    static class LoginResult { 

    } 
    /** Guarded by this. */ 
    AsyncSubject<LoginResult> cache; 
    public Observable<LoginResult> login(String username, String password) { 
     AsyncSubject<LoginResult> c; 
     boolean doLogin = false; 
     synchronized (this) { 
      if (cache == null || cache.hasThrowable()) { 
       cache = AsyncSubject.create(); 
       doLogin = true; 
      } 
      c = cache; 
     } 
     if (doLogin) { 
      Observable.just(1).subscribeOn(Schedulers.io()) 
      .map(v -> loginAPI(username, password)) 
      .retry(3).subscribe(c); 
     } 
     return c; 
    } 
    public void purgeCache() { 
     synchronized (this) { 
      cache = null; 
     } 
    } 
    static LoginResult loginAPI(String username, String password) { 
     if (ThreadLocalRandom.current().nextDouble() < 0.3) { 
      throw new RuntimeException("Failed"); 
     } 
     return new LoginResult(); 
    } 
} 
+0

你好akarnokd,謝謝你的回答。它看起來像在rxjava中做事情的好方法:)如果我在調用「登錄」方法時正確理解它,它將立即觸發loginAPI()(如果緩存不存在)。雖然這在大多數情況下可能是有意義的,但我希望獲得支持真正http調用的observable,以便我可以根據用例使用observeOn和subscribeOn。 –