2016-08-25 47 views
0

我正在使用SFTP連接器作爲出站端點。在Mule中使用動態值,其中MEL不受支持

我需要能夠動態設置身份驗證標識文件和密碼。這是因爲我們正在動態選擇我們發送文件的位置。

看來在我使用的Mule版本中(5.2)不支持在這個特定領域中使用MEL。它適用於其他領域,但不適用於身份驗證。

是否有一個簡單的方法可以讓我動態地填充該字段?

我已經考慮擴展SFTP組件,並通過MEL處理getIdentityFile()方法,但這是我想避免的極端解決方案,如果可以的話。

謝謝!

回答

1

讓這個工作有點任務,但它確實可行。

一般的步驟是使用<service-overrides>標記覆蓋特定的組件,最終允許執行Java代碼來評估標識文件爲MEL表達式。

我們覆蓋包含在騾子SFTP組件Github上庫,這裏的類:https://github.com/muleforge/SFTP/tree/master/src/main/java/org/mule/transport/sftp

首先,您需要使用相應的服務,超越您的SFTP連接器:

<sftp:connector name="SEND_SFTP"> 
    <service-overrides dispatcherFactory="customsftp.CustomSftpDispatcherFactory" /> 
</sftp:connector> 

然後創建允許您實現上述邏輯的Java類:

public class CustomSftpDispatcherFactory extends SftpMessageDispatcherFactory { 

    @Override 
    public MessageDispatcher create(OutboundEndpoint endpoint) throws MuleException 
    { 
     return new CustomSftpMessageDispatcher(endpoint); 
    } 
} 

接下來更新SftpMessageDispatcher班。這個類:

public class CustomSftpMessageDispatcher extends AbstractMessageDispatcher 
{ 

    private SftpConnector connector; 
    private SftpUtil sftpUtil; 

    public CustomSftpMessageDispatcher(OutboundEndpoint endpoint) 
    { 
     super(endpoint); 
     connector = (SftpConnector) endpoint.getConnector(); 
     sftpUtil = new CustomSftpUtil(endpoint); 
    } 

    protected void doDisconnect() throws Exception 
    { 
     // no op 
    } 

    protected void doDispose() 
    { 
     // no op 
    } 

    protected void doDispatch(MuleEvent event) throws Exception 
    { 
     String filename = buildFilename(event); 
     InputStream inputStream = generateInputStream(event); 
     if (logger.isDebugEnabled()) 
     { 
      logger.debug("Writing file to: " + endpoint.getEndpointURI() + " [" + filename + "]"); 
     } 

     SftpClient client = null; 
     boolean useTempDir = false; 
     String transferFilename = null; 

     try 
     { 
      String serviceName = (event.getFlowConstruct() == null) 
           ? "UNKNOWN SERVICE" 
           : event.getFlowConstruct().getName(); 
      SftpNotifier notifier = new SftpNotifier(connector, event.getMessage(), endpoint, serviceName); 
      client = connector.createSftpClient(endpoint, notifier); 
      String destDir = endpoint.getEndpointURI().getPath(); 

      if (logger.isDebugEnabled()) 
      { 
       logger.debug("Connection setup successful, writing file."); 
      } 

      // Duplicate Handling 
      filename = client.duplicateHandling(destDir, filename, sftpUtil.getDuplicateHandling()); 
      transferFilename = filename; 

      useTempDir = sftpUtil.isUseTempDirOutbound(); 
      if (useTempDir) 
      { 
       // TODO move to a init-method like doConnect? 
       // cd to tempDir and create it if it doesn't already exist 
       sftpUtil.cwdToTempDirOnOutbound(client, destDir); 

       // Add unique file-name (if configured) for use during transfer to 
       // temp-dir 
       boolean addUniqueSuffix = sftpUtil.isUseTempFileTimestampSuffix(); 
       if (addUniqueSuffix) 
       { 
        transferFilename = sftpUtil.createUniqueSuffix(transferFilename); 
       } 
      } 

      // send file over sftp 
      // choose appropriate writing mode 
      if (sftpUtil.getDuplicateHandling().equals(SftpConnector.PROPERTY_DUPLICATE_HANDLING_APPEND)) 
      { 
       client.storeFile(transferFilename, inputStream, SftpClient.WriteMode.APPEND); 
      } 
      else 
      { 
       client.storeFile(transferFilename, inputStream); 
      } 

      if (useTempDir) 
      { 
       // Move the file to its final destination 
       client.rename(transferFilename, destDir + "/" + filename); 
      } 

      logger.info("Successfully wrote file '" + filename + "' to " + endpoint.getEndpointURI()); 
     } 
     catch (Exception e) 
     { 
      logger.error("Unexpected exception attempting to write file, message was: " + e.getMessage(), e); 

      sftpUtil.setErrorOccurredOnInputStream(inputStream); 

      if (useTempDir) 
      { 
       // Cleanup the remote temp dir from the not fullt completely 
       // transferred file! 
       String tempDir = sftpUtil.getTempDirOutbound(); 
       sftpUtil.cleanupTempDir(client, transferFilename, tempDir); 
      } 
      throw e; 
     } 
     finally 
     { 
      if (client != null) 
      { 
       // If the connection fails, the client will be null, otherwise 
       // disconnect. 
       connector.releaseClient(endpoint, client); 
      } 

      inputStream.close(); 

     } 

    } 

    private InputStream generateInputStream(MuleEvent event) 
    { 
     Object data = event.getMessage().getPayload(); 
     // byte[], String, or InputStream payloads supported. 

     byte[] buf; 
     InputStream inputStream; 

     if (data instanceof byte[]) 
     { 
      buf = (byte[]) data; 
      inputStream = new ByteArrayInputStream(buf); 
     } 
     else if (data instanceof InputStream) 
     { 
      inputStream = (InputStream) data; 
     } 
     else if (data instanceof String) 
     { 
      inputStream = new ByteArrayInputStream(((String) data).getBytes()); 
     } 
     else 
     { 
      throw new IllegalArgumentException(
        "Unexpected message type: java.io.InputStream, byte[], or String expected. Got " 
        + data.getClass().getName()); 
     } 
     return inputStream; 
    } 

    private String buildFilename(MuleEvent event) 
    { 
     MuleMessage muleMessage = event.getMessage(); 
     String outPattern = (String) endpoint.getProperty(SftpConnector.PROPERTY_OUTPUT_PATTERN); 
     if (outPattern == null) 
     { 
      outPattern = (String) muleMessage.getProperty(SftpConnector.PROPERTY_OUTPUT_PATTERN, connector.getOutputPattern()); 
     } 
     String filename = connector.getFilenameParser().getFilename(muleMessage, outPattern); 
     if (filename == null) 
     { 
      filename = (String) event.getMessage().findPropertyInAnyScope(SftpConnector.PROPERTY_FILENAME, 
                      null); 
     } 
     return filename; 
    } 

    protected MuleMessage doSend(MuleEvent event) throws Exception 
    { 
     doDispatch(event); 
     return event.getMessage(); 
    } 
} 

該類的唯一重要的部分是在構造函數中,構建自定義SFTP UTIL:

public class CustomSftpUtil extends SftpUtil { 

    private SftpConnector connector; 
    private MuleContext context; 

    public CustomSftpUtil(ImmutableEndpoint endpoint) { 
     super(endpoint); 

     connector = (SftpConnector) endpoint.getConnector(); 
     context = endpoint.getMuleContext(); 

     handleIdentityFile(); 
     handlePassphrase(); 
    } 

    private void handleIdentityFile() { 
     String evaluatedExpression = evaluateMelExpression(connector.getIdentityFile()); 
     connector.setIdentityFile(evaluatedExpression); 
    } 

    private void handlePassphrase() { 
     String evaluatedExpression = evaluateMelExpression(connector.getPassphrase()); 
     connector.setPassphrase(evaluatedExpression); 
    } 

    private String evaluateMelExpression(String melExpression) {   
     return context.getExpressionManager().evaluate(melExpression, RequestContext.getEvent()).toString(); 
    } 
} 

這個類的唯一重要的部分是身份文件的評估(和口令)作爲MEL表達式,這允許我們通過MEL表達式動態設置身份文件和密碼。