2017-04-17 25 views
0

我使用spring集成aws來輪詢S3資源並從S3存儲桶獲取文件並使用spring集成來處理它們。 下面是我有:實現spring-integration-aws的問題

AmazonS3 amazonS3 = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); 

@Bean 
IntegrationFlow fileReadingFlow() { 
    return IntegrationFlows 
       .from(s3InboundFileSynchronizingMessageSource(), 
         e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS))) 
      .handle(receiptProcessor()) 
      .get(); 
} 
@Bean 
public S3InboundFileSynchronizer s3InboundFileSynchronizer() { 
    S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(amazonS3); 
    synchronizer.setDeleteRemoteFiles(false); 
    synchronizer.setPreserveTimestamp(true); 
    synchronizer.setRemoteDirectory(s3BucketName.concat("/").concat(s3InboundFolder)); 
    synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.dat\\.{0,1}\\d{0,2}")); 
    return synchronizer; 
} 

@Bean 
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() { 
    S3InboundFileSynchronizingMessageSource messageSource = 
      new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer()); 
    messageSource.setAutoCreateLocalDirectory(false); 
    messageSource.setLocalDirectory(new File(inboundDir)); 
    messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>()); 
    return messageSource; 
} 

,我的S3存儲和重點是:

bucketName = shipmentReceipts 
key = receipts/originalReceipts/inbound/receipt1.dat 

,所以我面臨的2個問題與此實現:
1. inboundDir文件夾名稱是使用s3key將其重命名爲不同的路徑名稱,從而導致FileNotFoundException

protected void copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory, 
     Session<F> session) throws IOException { 
    String remoteFileName = this.getFilename(remoteFile); 
    String localFileName = **this.generateLocalFileName(remoteFileName);** 
    String remoteFilePath = remoteDirectoryPath != null 
      ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName) 
      : remoteFileName; 
    if (!this.isFile(remoteFile)) { 
     if (this.logger.isDebugEnabled()) { 
      this.logger.debug("cannot copy, not a file: " + remoteFilePath); 
     } 
     return; 
    } 

    **File localFile = new File(localDirectory, localFileName);** 
    if (!localFile.exists()) {........ 

所以它結束了尋找一個文件路徑C:\ SpringAws \ S3inbound \收據\ originalReceipts \入境\ receipt1.dat其中它不找到,並給出我AbstractInboundFileSynchronizer.java文件追蹤這對下面的代碼那FileNotFoundException錯誤。相反,它應該僅僅被複制到本地目錄C:\ SpringAws \ S3inbound \ receipt1.dat

  • 同時拉動S3對象我注意到它被拉動所有對象shipmentReceipts/receipts下代替shipmentReceipts/receipts/originalReceipts/inbound 在進一步調試我發現,在S3Session.java下面的代碼片段是負責IT:

    @Override 
    public S3ObjectSummary[] list(String path) throws IOException { 
    Assert.hasText(path, "'path' must not be empty String."); 
    String[] bucketPrefix = path.split("/"); 
    Assert.state(bucketPrefix.length > 0 && bucketPrefix[0].length() >= 3, 
         "S3 bucket name must be at least 3 characters long."); 
    
    String bucket = resolveBucket(bucketPrefix[0]); 
    
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest() 
         .withBucketName(bucket); 
    if (bucketPrefix.length > 1) { 
        **listObjectsRequest.setPrefix(bucketPrefix[1]);** 
    } 
    
    /* 
    For listing objects, Amazon S3 returns up to 1,000 keys in the response. 
    If you have more than 1,000 keys in your bucket, the response will be truncated. 
    You should always check for if the response is truncated. 
    */ 
    ObjectListing objectListing; 
    List<S3ObjectSummary> objectSummaries = new ArrayList<>(); 
    do {...... 
    
  • 它集前綴一切後,第斜線/遇到。 我如何減輕這些?謝謝!

    回答

    0

    嵌套路徑的首要問題是一個已知問題,並已在RecursiveDirectoryScanner中修復了最新的5.0 M3https://spring.io/blog/2017/04/05/spring-integration-5-0-milestone-3-available

    同時你必須指定LocalFilenameGeneratorExpression爲:

    Expression expression = PARSER.parseExpression("#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this"); 
    synchronizer.setLocalFilenameGeneratorExpression(expression); 
    

    S3ObjectSummary包含key作爲不帶bucket完整路徑。

    第二個「嵌套路徑」問題已通過:https://github.com/spring-projects/spring-integration-aws/issues/45修復。此修復程序在1.1.0.M1可用:https://spring.io/blog/2017/03/09/spring-integration-extension-for-aws-1-1-0-m1-available

    +0

    謝謝阿爾喬姆!我確實使用了spring-integration-aws的1.1.0.M1版本,但仍然編寫了自己的類來解決上述問題。 – user5758361

    +0

    我正在使用Spring集成5.0.0.M4和Spring集成AWS 1.1.0.M2,並且在使用像'abc/def /'這樣的存儲桶名稱時仍然存在相同的問題。有關解決方法,請參閱下面的答案。我是流媒體,所以沒有可以操縱的本地文件名。 –

    +0

    您是否介意提出GH問題,並提供更多詳細信息以從我們這邊複製?謝謝 –

    0

    按阿爾喬姆,我沒有使用彈簧集成-AWS的最新里程碑版本,但發現它更容易編寫擴展AbstractInboundFileSynchronizer解決我的問題的自定義類。 繼承人的類我創建:

    public class MyAbstractInboundFileSynchronizer extends AbstractInboundFileSynchronizer<S3ObjectSummary> { 
    
    private volatile String remoteFileSeparator = "/"; 
    private volatile String temporaryFileSuffix = ".writing"; 
    private volatile boolean deleteRemoteFiles; 
    private volatile boolean preserveTimestamp; 
    private volatile FileListFilter<S3ObjectSummary> filter; 
    private volatile Expression localFilenameGeneratorExpression; 
    private volatile EvaluationContext evaluationContext; 
    
    @Override 
    public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) { 
        super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression); 
        this.localFilenameGeneratorExpression = localFilenameGeneratorExpression; 
    } 
    
    @Override 
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) { 
        super.setIntegrationEvaluationContext(evaluationContext); 
        this.evaluationContext = evaluationContext; 
    } 
    
    @Override 
    public void setRemoteFileSeparator(String remoteFileSeparator) { 
        super.setRemoteFileSeparator(remoteFileSeparator); 
        this.remoteFileSeparator = remoteFileSeparator; 
    } 
    
    public MyAbstractInboundFileSynchronizer() { 
        this(new S3SessionFactory()); 
    } 
    
    public MyAbstractInboundFileSynchronizer(AmazonS3 amazonS3) { 
        this(new S3SessionFactory(amazonS3)); 
    } 
    
    /** 
    * Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances. 
    * @param sessionFactory The session factory. 
    */ 
    public MyAbstractInboundFileSynchronizer(SessionFactory<S3ObjectSummary> sessionFactory) { 
        super(sessionFactory); 
        setRemoteDirectoryExpression(new LiteralExpression(null)); 
        setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource")); 
    } 
    
    @Override 
    public final void setRemoteDirectoryExpression(Expression remoteDirectoryExpression) { 
        super.setRemoteDirectoryExpression(remoteDirectoryExpression); 
    } 
    
    @Override 
    public final void setFilter(FileListFilter<S3ObjectSummary> filter) { 
        super.setFilter(filter); 
    } 
    
    @Override 
    protected boolean isFile(S3ObjectSummary file) { 
        return true; 
    } 
    
    @Override 
    protected String getFilename(S3ObjectSummary file) { 
        if(file != null){ 
         String key = file.getKey(); 
         String fileName = key.substring(key.lastIndexOf('/')+1); 
         return fileName; 
        } 
        else return null; 
    } 
    
    @Override 
    protected long getModified(S3ObjectSummary file) { 
        return file.getLastModified().getTime(); 
    } 
    
    @Override 
    protected void copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile, File localDirectory, 
                 Session<S3ObjectSummary> session) throws IOException { 
        String remoteFileName = this.getFilename(remoteFile); 
        //String localFileName = this.generateLocalFileName(remoteFileName); 
        String localFileName = remoteFileName; 
        String remoteFilePath = remoteDirectoryPath != null 
          ? (remoteDirectoryPath + remoteFileName) 
          : remoteFileName; 
        if (!this.isFile(remoteFile)) { 
         if (this.logger.isDebugEnabled()) { 
          this.logger.debug("cannot copy, not a file: " + remoteFilePath); 
         } 
         return; 
        } 
    
        File localFile = new File(localDirectory, localFileName); 
        if (!localFile.exists()) { 
         String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix; 
         File tempFile = new File(tempFileName); 
         OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); 
         try { 
          session.read(remoteFilePath, outputStream); 
         } 
         catch (Exception e) { 
          if (e instanceof RuntimeException) { 
           throw (RuntimeException) e; 
          } 
          else { 
           throw new MessagingException("Failure occurred while copying from remote to local directory", e); 
          } 
         } 
         finally { 
          try { 
           outputStream.close(); 
          } 
          catch (Exception ignored2) { 
          } 
         } 
    
         if (tempFile.renameTo(localFile)) { 
          if (this.deleteRemoteFiles) { 
           session.remove(remoteFilePath); 
           if (this.logger.isDebugEnabled()) { 
            this.logger.debug("deleted " + remoteFilePath); 
           } 
          } 
         } 
         if (this.preserveTimestamp) { 
          localFile.setLastModified(getModified(remoteFile)); 
         } 
        } 
    } 
    } 
    

    我也更新了LocalFilenameGeneratorExpression按阿爾喬姆。謝謝!

    +0

    我使用Spring集成5.0.0.M4和Spring集成AWS 1.1.0.M2,並且在使用像abc/def /這樣的存儲桶名稱時仍然存在相同的問題。請參閱我的答案以獲得解決方法。 –

    0

    @ user5758361你嵌套路徑描述的第一個問題也可以通過重寫S3FileInfo解決:

    public class S3FileInfo extends org.springframework.integration.aws.support.S3FileInfo { 
        private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(S3ObjectSummary.class); 
    
        public S3FileInfo(S3ObjectSummary s3ObjectSummary) { 
         super(s3ObjectSummary); 
        } 
    
        @Override 
        public String getFilename() { 
         return FilenameUtils.getName(super.getFilename()); 
        } 
    
        @Override 
        public String toJson() { 
         try { 
          return OBJECT_WRITER.writeValueAsString(super.getFileInfo()); 
         } catch (JsonProcessingException e) { 
          throw new UncheckedIOException(e); 
         } 
        } 
    } 
    

    toJson是重寫,以避免對某些對象NPE。

    使用它的流:

    public class S3StreamingMessageSource extends org.springframework.integration.aws.inbound.S3StreamingMessageSource { 
        public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template) { 
         super(template, null); 
        } 
    
        public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template, 
                Comparator<AbstractFileInfo<S3ObjectSummary>> comparator) { 
         super(template, comparator); 
        } 
    
        @Override 
        protected List<AbstractFileInfo<S3ObjectSummary>> asFileInfoList(Collection<S3ObjectSummary> collection) { 
         return collection.stream() 
           .map(S3FileInfo::new) 
           .collect(toList()); 
        } 
    } 
    

    順便說一句,我使用Spring集成5.0.0.M4和Spring集成AWS 1.1.0.M2,並用水桶時,仍然有同樣的問題像abc/def/