2015-12-04 60 views
0

最近我正在嘗試在Samza框架上做一些流處理工作。我已成功部署了hello-samza示例。但是,當我試圖寫自己的工作時,我不知道從哪裏開始工作。我已閱讀this document,但我仍不明白。所以任何人都可以幫助我:如何在samza編寫我自己的工作

  1. 什麼是我的代碼的架構(源代碼,lib代碼和配置)。
  2. 將我的代碼推入哪個目錄
  3. 我需要做哪些其他工作才能讓我的代碼運行。

您的建議會幫助我很多,非常感謝!

+0

這就是hello-samza項目的要點 - 顯示1)體系結構是什麼樣子的(源代碼,庫代碼和配置),2)你的代碼被推入哪個目錄它成功部署了它)和3)讓它運行所需的所有代碼。 –

回答

-2

請再閱讀一下這個文檔,再看看hello-samza的例子,如果你將它部署到YARN,再閱讀一下。所有你要找的答案都在那裏。

hello-samza有三個工作。選擇一個,並按照它,配置,啓動腳本等

這裏從HELLO-samza頁如何啓動維基百科料作業的

deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties 

屬性文件顯示了編譯工作/任務代碼除其他事項外。對於維基百科進紙作業/任務的源代碼是在這裏:

https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java

就修改該作業,或複製和修改,讓你去。

+0

Downvoting將不會爲您閱讀文檔 –

0

如果您按照Hello Samza說明進行操作,您將在本地計算機上運行一個運行正常的Zookeeper,Kafka和Yarn/Samza羣集。通過該項目,您可以運行維基百科飼料相關任務來測試事情。

然而,像你一樣,我遇到了一些麻煩,想出適當的目錄結構併爲新任務構建設置(沒有集羣管理的東西)。所以,我創建了hello-samza-base,刪除了hello-samza以外的新任務不需要的所有內容。我在建立新任務的README中包含了說明。

就部署而言,這有點複雜。做一些關於創建Zookeeper,Kafka和Yarn集羣的閱讀。

3

建立自己的工作非常簡單。首先讓你好samza:

git clone https://git.apache.org/samza-hello-samza.git hello-samza 

下一步是通過這些命令來設置系統:

bin/grid bootstrap 

請確保一切是怎麼回事良好的jps

下一步是從pom.xml中刪除apache-rat-plugin,而不是在hello-samza中構建你的項目。

當你remome,你可以添加一個java文件作業在src文件夾(MyTask.java),也是一個.properties文件裏面的配置目錄(My.Task。屬性)

這是一個空樣本作業(MyTask.java)。

package com.samza; 
public class MyTask implements StreamTask { 
    private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka","topicOut"); 
    public void process (IncomingMessageEnvelope envelope,     MessageCollector collector,     
    TaskCoordinator coordinator) throws Exception { 
     // Do something useful 
    } 
} 

不要忘記實施.properties文件。

如果哈瓦非錯誤代碼,與像Maven構建:

mvn clean package 
mkdir -p deploy/samza 
tar -xvf ./samza-job-package/target/samza-job-package-0.10.0-dist.tar.gz -C deploy/samza 

之後,你的服務器正在運行(如果它不是你可以./bin/grid start all)開始,你可以通過deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/MyTask.properties

部署工作

和消費卡夫卡客戶消費 deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outTopic

0

我創建Samza工作通過Maven的Eclipse項目離別的結果。其中,在T加載0.9.2版本的依賴他的pom.xml這個文件的內容(我有一些版本的問題,所以你可能有一些工作):

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
     <modelVersion>4.0.0</modelVersion> 
     <groupId>com.acio.samza</groupId> 
     <artifactId>samzafroga</artifactId> 
     <version>0.0.1</version> 
     <name>samzafroga</name> 
     <dependencies> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-api</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-core_2.10</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-log4j</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-shell</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-yarn_2.10</artifactId> 
      <exclusions> 
      <exclusion> 
       <artifactId>jdk.tools</artifactId> 
       <groupId>jdk.tools</groupId> 
      </exclusion> 
     </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-kv_2.10</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-kv-rocksdb_2.10</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.samza</groupId> 
      <artifactId>samza-kafka_2.10</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.10</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.schwering</groupId> 
      <artifactId>irclib</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-api</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.codehaus.jackson</groupId> 
      <artifactId>jackson-jaxrs</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.eclipse.jetty</groupId> 
      <artifactId>jetty-server</artifactId> 
      <version>${jettyVersion}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.eclipse.jetty</groupId> 
      <artifactId>jetty-webapp</artifactId> 
      <version>${jettyVersion}</version> 
     </dependency> 
     </dependencies> 
     <properties> 
     <!-- maven specific properties --> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <samza.version>0.8.0</samza.version> 
     <jettyVersion>7.6.16.v20140903</jettyVersion> 
     </properties> 
     <repositories> 
     <repository> 
      <id>apache-releases</id> 
      <url>https://repository.apache.org/content/groups/public</url> 
     </repository> 
     <repository> 
      <id>scala-tools.org</id> 
      <name>Scala-tools Maven2 Repository</name> 
      <url>https://oss.sonatype.org/content/groups/scala-tools</url> 
     </repository> 
     </repositories> 

     <pluginRepositories> 
     <pluginRepository> 
      <id>scala-tools.org</id> 
      <name>Scala-tools Maven2 Repository</name> 
      <url>http://scala-tools.org/repo-releases</url> 
     </pluginRepository> 
     </pluginRepositories> 
     <build> 
     <pluginManagement> 
      <plugins> 
      <plugin> 
       <groupId>org.apache.rat</groupId> 
       <artifactId>apache-rat-plugin</artifactId> 
       <version>0.9</version> 
       <configuration> 
       <excludes> 
        <exclude>*.patch</exclude> 
        <exclude>**/target/**</exclude> 
        <exclude>*.json</exclude> 
        <exclude>.vagrant/**</exclude> 
        <exclude>.git/**</exclude> 
        <exclude>*.md</exclude> 
        <exclude>docs/**</exclude> 
        <exclude>config/**</exclude> 
        <exclude>bin/**</exclude> 
        <exclude>.gitignore</exclude> 
        <exclude>**/.cache/**</exclude> 
        <exclude>deploy/**</exclude> 
        <exclude>**/.project</exclude> 
       </excludes> 
       </configuration> 
      </plugin> 
      </plugins> 
     </pluginManagement> 
     <plugins> 

      <!-- plugin to build the tar.gz file filled with examples --> 
      <plugin> 
      <artifactId>maven-assembly-plugin</artifactId> 
      <version>2.3</version> 
      <configuration> 
       <descriptors> 
       <descriptor>src/assembly/bin.xml</descriptor> 
       </descriptors> 
       <descriptorRefs> 
       <descriptorRef>jar-with-dependencies</descriptorRef> 
       </descriptorRefs> 
      </configuration> 
      <executions> 
       <execution> 
       <id>make-assembly</id> 
       <phase>package</phase> 
       <goals> 
        <goal>single</goal> 
       </goals> 
       </execution> 
      </executions> 
      </plugin> 
     </plugins> 
     </build> 
     <dependencyManagement> 
     <dependencies> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-api</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-core_2.10</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-log4j</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-shell</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-yarn_2.10</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-kv_2.10</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-kv-rocksdb_2.10</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.samza</groupId> 
       <artifactId>samza-kafka_2.10</artifactId> 
       <version>${samza.version}</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.kafka</groupId> 
       <artifactId>kafka_2.10</artifactId> 
       <version>0.8.2.0</version> 
      </dependency> 
      <dependency> 
       <groupId>org.schwering</groupId> 
       <artifactId>irclib</artifactId> 
       <version>1.10</version> 
      </dependency> 
      <dependency> 
       <groupId>org.slf4j</groupId> 
       <artifactId>slf4j-api</artifactId> 
       <version>1.6.2</version> 
      </dependency> 
      <dependency> 
       <groupId>org.slf4j</groupId> 
       <artifactId>slf4j-log4j12</artifactId> 
       <version>1.6.2</version> 
      </dependency> 
      <dependency> 
       <groupId>org.codehaus.jackson</groupId> 
       <artifactId>jackson-jaxrs</artifactId> 
       <version>1.8.5</version> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.hadoop</groupId> 
       <artifactId>hadoop-hdfs</artifactId> 
       <version>2.6.0</version> 
      </dependency> 
     </dependencies> 
     </dependencyManagement> 
    </project> 

工作的基本代碼是這一個:

package xxxx; 

import java.util.Map; 

import org.apache.samza.config.Config; 
import org.apache.samza.system.IncomingMessageEnvelope; 
import org.apache.samza.system.OutgoingMessageEnvelope; 
import org.apache.samza.system.SystemStream; 
import org.apache.samza.task.MessageCollector; 
import org.apache.samza.task.StreamTask; 
import org.apache.samza.task.TaskContext; 
import org.apache.samza.task.TaskCoordinator; 

public class Redirect implements StreamTask { 
     private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "samzaout"); 

    public void process(IncomingMessageEnvelope envelope, 
     MessageCollector collector, 
     TaskCoordinator coordinator) 
    { 
     String msg = (String)envelope.getMessage(); 
     // Transformation 

     String outmsg = "xxx-" + msg + "-xxx"; 
     collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg)); 
    } 
} 

一旦你編譯好後,你需要將它編組成一個jar文件,並將它放在一個可以訪問所有samza節點,web或hdfs的位置。

從屬性文件引用此屬性文件,您必須創建它才能啓動它。在porject網頁中查找示例。