2016-11-18 19 views
0

我想分塊文本文件(可以說,一個日誌文件),只選擇一定的編號。 (可以說,我們正在將日誌文件拆分爲更小的文件)。我寫了這個代碼在命令行式風格:使用Java 8流分塊文本文件

package utils; 

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.function.Consumer; 

public class FileUtils { 

    public static void main(String[] args) { 
     readFileInChunks("D:\\demo.txt", 10000, System.out::println); 
    } 

    public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) { 
     try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { 
      StringBuilder lines = new StringBuilder(); 

      String line, firstLine = null; 
      int i; 
      for (i = 0; (line = br.readLine()) != null; i++) { 
       if (firstLine == null) 
        firstLine = line; 

       lines.append(line + "\n"); 

       if ((i + 1) % chunkSize == 0) { 
        processor.accept(lines); 
        lines = new StringBuilder(firstLine + "\n"); 
       } 
      } 

      if (lines.toString() != "") { 
       processor.accept(lines); 
      } 

      br.close(); 

     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

} 

這些年來,我花了迭代式的編碼,我不能拿出基於Java的8流功能的風格此方法的實現。

是否有可能使readFileInChunks方法返回大塊的Stream<String>?或者,以功能性的方式實施readFileInChunks

+2

你並不需要調用'br.close()'手動在聲明'br'內'try(...)'語句。這是語言功能的全部目的。 – Holger

回答

2

首先,選擇了合適的工具。如果你想以文本塊的形式來處理文本文件,那麼以文件塊的形式讀取文件要簡單得多,而不是直接讀取文件,以便稍後重新組裝這些文件。如果您希望將塊縮放到行邊界,則搜索最靠近塊邊界的換行符更簡單,而不是處理所有換行符。

public static void readFileInChunks(
    String filePath, int chunkSize, Consumer<? super CharSequence> processor) { 

    CharBuffer buf=CharBuffer.allocate(chunkSize); 
    try(FileReader r = new FileReader(filePath)) { 
     readMore: for(;;) { 
      while(buf.hasRemaining()) if(r.read(buf)<0) break readMore; 
      buf.flip(); 
      int oldLimit=buf.limit(); 
      for(int p=oldLimit-1; p>0; p--) 
       if(buf.charAt(p)=='\n' || buf.charAt(p)=='\r') { 
        buf.limit(p+1); 
        break; 
       } 
      processor.accept(buf); 
      buf.position(buf.limit()).limit(oldLimit); 
      buf.compact(); 
     } 
     if(buf.position()>0) { 
      buf.flip(); 
      processor.accept(buf); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

這段代碼可能看起來更復雜一些,但它是免費複製。如果您想讓消費者保留對接收對象的引用或執行併發處理,只需將processor.accept(buf);行更改爲processor.accept(buf.toString());,以便它不會將實際緩衝區傳遞給使用者。如果您想提供與流相同的功能,則這是強制性的。對於甲流,循環必須被轉換成一個功能可根據要求提供下一個項目:

public static Stream<String> fileInChunks(
     String filePath, int chunkSize) throws IOException { 

    FileChannel ch=FileChannel.open(Paths.get(filePath), StandardOpenOption.READ); 
    CharsetDecoder dec = Charset.defaultCharset().newDecoder(); 
    long size = (long)(ch.size()*dec.averageCharsPerByte()); 
    Reader r = Channels.newReader(ch, dec, chunkSize); 
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<String>(
      (size+chunkSize-1)/chunkSize, Spliterator.ORDERED|Spliterator.NONNULL) { 
     CharBuffer buf=CharBuffer.allocate(chunkSize); 
     public boolean tryAdvance(Consumer<? super String> processor) { 
      CharBuffer buf=this.buf; 
      if(buf==null) return false; 
      boolean more=true; 
      while(buf.hasRemaining() && more) try { 
       if(r.read(buf)<0) more=false; 
      } catch(IOException ex) { throw new UncheckedIOException(ex); } 
      if(more) { 
       buf.flip(); 
       int oldLimit=buf.limit(); 
       for(int p=oldLimit-1; p>0; p--) 
        if(buf.charAt(p)=='\n' || buf.charAt(p)=='\r') { 
         buf.limit(p+1); 
         break; 
        } 
       processor.accept(buf.toString()); 
       buf.position(buf.limit()).limit(oldLimit); 
       buf.compact(); 
       return true; 
      } 
      this.buf=null; 
      if(buf.position()>0) { 
       buf.flip(); 
       processor.accept(buf.toString()); 
       return true; 
      } 
      return false; 
     } 
    }, false); 
} 
0

一個東西,你可能做的是有一個自定義的收集器構建這些塊,然後將它們發送給消費者,這樣的例子(編譯沒有,只是一個樣本):

private static final class ToChunksCollector<T> implements Collector<T, List<StringBuilder>, List<StringBuilder>> { 

    private final int chunkSize; 

    public ToChunksCollector(int chunkSize) { 
     this.chunkSize = chunkSize; 
    } 

    @Override 
    public Supplier<List<StringBuilder>> supplier() { 
     return ArrayList::new; 
    } 

    @Override 
    public BiConsumer<List<StringBuilder>, T> accumulator() { 
     return (list, line) -> { 
      if (list.size() == 0) { 
       list.add(new StringBuilder()); 
      } 
      StringBuilder lastBuilder = list.get(list.size() - 1); 
      String[] linesInCurrentBuilder = lastBuilder.toString().split("\n"); 
      // no more room 
      if (linesInCurrentBuilder.length == chunkSize) { 
       String lastLine = linesInCurrentBuilder[chunkSize - 1]; 
       StringBuilder builder = new StringBuilder(); 
       builder.append(lastLine).append("\n"); 
       list.add(builder); 
      } else { 
       lastBuilder.append(line).append("\n"); 
      } 
     }; 
    } 

    @Override 
    public BinaryOperator<List<StringBuilder>> combiner() { 
     return (list1, list2) -> { 
      list1.addAll(list2); 
      return list1; 
     }; 
    } 

    @Override 
    public Function<List<StringBuilder>, List<StringBuilder>> finisher() { 
     return Function.identity(); 
    } 

    // TODO add the relevant characterics 
    @Override 
    public Set<java.util.stream.Collector.Characteristics> characteristics() { 
     return EnumSet.noneOf(Characteristics.class); 
    } 

} 

然後用法:

public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) { 
    try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { 

     List<StringBuilder> builder = br.lines().collect(new ToChunksCollector<>(chunkSize)); 
     builder.stream().forEachOrdered(processor); 

    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 
1

您可以自定義一個迭代器並構建基於其流:

public static Stream<String> readFileInChunks(String filePath, int chunkSize) throws IOException { 
    BufferedReader br = new BufferedReader(new FileReader(filePath)); 

    Iterator<String> iter = new Iterator<String>() { 
     String nextChunk = null; 

     @Override 
     public boolean hasNext() { 
      StringBuilder sb = new StringBuilder(); 
      for (int i = 0; i < chunkSize; i++) { 
       try { 
        String nextLine = br.readLine(); 
        if (nextLine == null) break; 
        sb.append(nextLine).append("\n"); 
       } catch (IOException e) { 
        throw new UncheckedIOException(e); 
       } 
      } 
      if (sb.length() == 0) { 
       nextChunk = null; 
       return false; 
      } else { 
       nextChunk = sb.toString(); 
       return true; 
      } 
     } 

     @Override 
     public String next() { 
      if (nextChunk != null || hasNext()) { 
       String chunk = nextChunk; 
       nextChunk = null; 
       return chunk; 
      } else { 
       throw new NoSuchElementException(); 
      } 
     } 
    }; 
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
      iter, Spliterator.ORDERED | Spliterator.NONNULL), false) 
      .onClose(() -> { 
       try { 
        br.close(); 
       } catch (IOException e) { 
        throw new UncheckedIOException(e); 
       } 
      }); 
} 

另一種選擇是使用protonpack庫,它提供了zipWithIndex方法:

public static Stream<String> readFileInChunks(String filePath, int chunkSize) throws IOException { 
    return new TreeMap<>(StreamUtils.zipWithIndex(Files.lines(Paths.get(filePath))) 
      .collect(Collectors.groupingBy(el -> el.getIndex()/chunkSize))) 
      .values().stream() 
      .map(list -> list.stream() 
        .map(el -> el.getValue()) 
        .collect(Collectors.joining("\n"))); 
} 

第二種解決方案是更緊湊,但它收集的所有行地圖,而將它們分組(然後把它們拷貝到一個TreeMap中,以便按正確的順序),因此不適合處理非常大的文件。

1

我已經創建並測試使用Java 8一個解決方案是以下:

package com.grs.stackOverFlow.pack01; 

import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
import java.util.List; 
import java.util.Optional; 
import java.util.function.Consumer; 

    public class FileUtils { 
     private static long processed=1; 

     public static void main(String[] args) throws IOException { 
      readFileInChunks("src/com/grs/stackOverFlow/pack01/demo.txt", 3, System.out::println); 
     } 

     public static void readFileInChunks(String filePath, int chunkSize, Consumer<StringBuilder> processor) throws IOException { 

      List<String> lines = Files.readAllLines(Paths.get(filePath)); 
      String firstLine=lines.get(0); 

      long splitCount=lines.size()<chunkSize?1:lines.size()/chunkSize; 

      for(int i=1;i<=splitCount;i++){ 
       Optional<String> result=lines.stream() 
        .skip(processed) 
        .limit(chunkSize) 
        .reduce((a,b) -> {processed++; return a+ "\n"+ b;}); 
       //reduce increments processed one less time as it starts with 2 element at a time 
       processed++; 
       processor.accept(new StringBuilder("chunk no. = " + i + "\n" + firstLine+ "\n"+ result.orElse(""))); 
      } 

     } 

    } 
+0

您正在閱讀文件的所有行,因此您不會分塊。 –

+0

@標記它取決於你傳遞給method.OP的處理器使用了相同的邏輯,所以我使用了相同的方法。 – grsdev7

+0

@ grsdev7,實際上它給了我'OutOfMemoryError'。無論如何,謝謝你的回答。 –