2015-02-09 18 views
18

this blog,他給回調地獄this(複製/粘貼下面的代碼)的例子。但是,沒有提到如何通過使用Reactive Extensions消除問題。如何編寫Observables以避免給定的嵌套和依賴回調?

所以這裏F3取決於F1完成,F4和F5取決於F2完成。

  1. 想知道什麼是Rx的功能等價物。
  2. 如何在Rx中表示F1,F2,F3,F4和F5應該全部被異步拉出?

注:我目前想換我周圍的Rx頭,所以我沒有試着問這個問題之前解決這個例子。

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicReference; 

public class CallbackB { 

    /** 
    * Demonstration of nested callbacks which then need to composes their responses together. 
    * <p> 
    * Various different approaches for composition can be done but eventually they end up relying upon 
    * synchronization techniques such as the CountDownLatch used here or converge on callback design 
    * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>. 
    */ 
    public static void run() throws Exception { 
     final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
     /* the following are used to synchronize and compose the asynchronous callbacks */ 
     final CountDownLatch latch = new CountDownLatch(3); 
     final AtomicReference<String> f3Value = new AtomicReference<String>(); 
     final AtomicReference<Integer> f4Value = new AtomicReference<Integer>(); 
     final AtomicReference<Integer> f5Value = new AtomicReference<Integer>(); 

     try { 
      // get f3 with dependent result from f1 
      executor.execute(new CallToRemoteServiceA(new Callback<String>() { 

       @Override 
       public void call(String f1) { 
        executor.execute(new CallToRemoteServiceC(new Callback<String>() { 

         @Override 
         public void call(String f3) { 
          // we have f1 and f3 now need to compose with others 
          System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5")); 
          // set to thread-safe variable accessible by external scope 
          f3Value.set(f3); 
          latch.countDown(); 
         } 

        }, f1)); 
       } 

      })); 

      // get f4/f5 after dependency f2 completes 
      executor.execute(new CallToRemoteServiceB(new Callback<Integer>() { 

       @Override 
       public void call(Integer f2) { 
        executor.execute(new CallToRemoteServiceD(new Callback<Integer>() { 

         @Override 
         public void call(Integer f4) { 
          // we have f2 and f4 now need to compose with others 
          System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5")); 
          // set to thread-safe variable accessible by external scope 
          f4Value.set(f4); 
          latch.countDown(); 
         } 

        }, f2)); 
        executor.execute(new CallToRemoteServiceE(new Callback<Integer>() { 

         @Override 
         public void call(Integer f5) { 
          // we have f2 and f5 now need to compose with others 
          System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5)); 
          // set to thread-safe variable accessible by external scope 
          f5Value.set(f5); 
          latch.countDown(); 
         } 

        }, f2)); 
       } 

      })); 

      /* we must wait for all callbacks to complete */ 
      latch.await(); 
      System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get())); 
     } finally { 
      executor.shutdownNow(); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      run(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    private static final class CallToRemoteServiceA implements Runnable { 

     private final Callback<String> callback; 

     private CallToRemoteServiceA(Callback<String> callback) { 
      this.callback = callback; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(100); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call("responseA"); 
     } 
    } 

    private static final class CallToRemoteServiceB implements Runnable { 

     private final Callback<Integer> callback; 

     private CallToRemoteServiceB(Callback<Integer> callback) { 
      this.callback = callback; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(40); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call(100); 
     } 
    } 

    private static final class CallToRemoteServiceC implements Runnable { 

     private final Callback<String> callback; 
     private final String dependencyFromA; 

     private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) { 
      this.callback = callback; 
      this.dependencyFromA = dependencyFromA; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(60); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call("responseB_" + dependencyFromA); 
     } 
    } 

    private static final class CallToRemoteServiceD implements Runnable { 

     private final Callback<Integer> callback; 
     private final Integer dependencyFromB; 

     private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) { 
      this.callback = callback; 
      this.dependencyFromB = dependencyFromB; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(140); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call(40 + dependencyFromB); 
     } 
    } 

    private static final class CallToRemoteServiceE implements Runnable { 

     private final Callback<Integer> callback; 
     private final Integer dependencyFromB; 

     private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) { 
      this.callback = callback; 
      this.dependencyFromB = dependencyFromB; 
     } 

     @Override 
     public void run() { 
      // simulate fetching data from remote service 
      try { 
       Thread.sleep(55); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      callback.call(5000 + dependencyFromB); 
     } 
    } 

    private static interface Callback<T> { 
     public void call(T value); 
    } 
} 
+0

我不是一個Rx主,但是從他給的一個會話:你可以通過組成Observables來避免回調地獄。如果@benjchristensen在身邊 - 他可能會提供更多細節。 – alfasin 2015-02-09 03:35:13

+2

@alfasin我學習了足夠多的RxJava知道它應該通過構成observable來解決。這裏的問題是如何組成:-) – 2015-02-09 03:36:30

+0

在這種情況下 - 我會將問題從「如何避免回撥地獄?」中更改。到「如何撰寫observables?」) – alfasin 2015-02-09 03:39:49

回答

27

我所引用的博客的原作者有關回調和Java期貨的帖子。這是一個使用flatMap,zip和merge異步執行服務組合的例子。

它提取一個用戶對象,然後同時提取Social和PersonalizedCatalog數據,然後對於PersonalizedCatalog中的每個視頻同時獲取Bookmark,Rating和Metadata,將它們拉到一起,並將所有響應合併爲漸進流輸出服務器發送的事件。

return getUser(userId).flatMap(user -> { 
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user) 
      .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
        video -> { 
         Observable<Bookmark> bookmark = getBookmark(video); 
         Observable<Rating> rating = getRatings(video); 
         Observable<VideoMetadata> metadata = getVideoMetadata(video); 
         return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m)); 
        })); 

    Observable<Map<String, Object>> social = getSocial(user).map(s -> { 
     return s.getDataAsMap(); 
    }); 

    return Observable.merge(catalog, social); 
}).flatMap(data -> { 
    String json = SimpleJson.mapToJson(data); 
    return response.writeStringAndFlush("data: " + json + "\n"); 
}); 

這個例子可以在運行應用程序上下文在https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33

可以看出因爲我不可能提供所有的信息,在這裏你還可以找到表現形式的解釋(與鏈接到視頻)在https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32

+0

謝謝。我認爲getBookmark(...)等實現內部使用subscribeOn(Schedulers.io())正確嗎?而且,zip操作在調用線程上運行。 – 2015-02-09 23:29:01

+1

是的,他們可以使用subscribeOn與IO調度程序來阻止IO異步。在Netflix,我們通常使用Hystrix爲我們提供相關服務,並提供批量,超時,回退,指標等。如果IO是非阻塞的,例如通過Netty,那麼subscribeOn是不必要的。 – benjchristensen 2015-02-10 05:20:34

5

根據你的代碼。假設遠程呼叫使用Observable完成。

Observable<Integer> callRemoveServiceA() { /* async call */ } 

/* .... */ 

Observable<Integer> callRemoveServiceE(Integer f2) { /* async call */ } 

你想要什麼:

  • 呼叫serviceA然後調用serviceBserviceA
  • 呼叫serviceC的結果,然後調用serviceDserviceEserviceC
  • serviceE結果結果和serviceD,建立新值
  • 顯示屏,serviceB

隨着RxJava結果的新價值,你會與此代碼實現這一點:

Observable<Integer> f3 = callRemoveServiceA() // call serviceA 
      // call serviceB with the result of serviceA 
      .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC 
        // call serviceD and serviceE then build a new value 
        .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5)); 

// compute the string to display from f3, and the f4, f5 pair 
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5) 
      // display the value 
      .subscribe(System.out::println); 

這裏的重要組成部分,是利用flapMapzip(或zipWith

你可以在flapMap更多的信息在這裏:When do you use map vs flatMap in RxJava?

+0

RWith我可以假設,一旦訂閱,f4Andf5可以比f3更早執行。提供的服務A和B需要更長的時間?我問,因爲有了期貨,服務A和B應該在別人可以跑之前回來。 – 2015-02-09 12:18:03

+0

是的,如果每個遠程服務的呼叫都是異步的。 – dwursteisen 2015-02-09 13:08:20