最近我正在嘗試在Samza框架上做一些流處理工作。我已成功部署了hello-samza示例。但是,當我試圖寫自己的工作時,我不知道從哪裏開始工作。我已閱讀this document,但我仍不明白。所以任何人都可以幫助我:如何在samza編寫我自己的工作
- 什麼是我的代碼的架構(源代碼,lib代碼和配置)。
- 將我的代碼推入哪個目錄
- 我需要做哪些其他工作才能讓我的代碼運行。
您的建議會幫助我很多,非常感謝!
最近我正在嘗試在Samza框架上做一些流處理工作。我已成功部署了hello-samza示例。但是,當我試圖寫自己的工作時,我不知道從哪裏開始工作。我已閱讀this document,但我仍不明白。所以任何人都可以幫助我:如何在samza編寫我自己的工作
您的建議會幫助我很多,非常感謝!
請再閱讀一下這個文檔,再看看hello-samza的例子,如果你將它部署到YARN,再閱讀一下。所有你要找的答案都在那裏。
hello-samza有三個工作。選擇一個,並按照它,配置,啓動腳本等
這裏從HELLO-samza頁如何啓動維基百科料作業的
deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties
屬性文件顯示了編譯工作/任務代碼除其他事項外。對於維基百科進紙作業/任務的源代碼是在這裏:
就修改該作業,或複製和修改,讓你去。
Downvoting將不會爲您閱讀文檔 –
如果您按照Hello Samza說明進行操作,您將在本地計算機上運行一個運行正常的Zookeeper,Kafka和Yarn/Samza羣集。通過該項目,您可以運行維基百科飼料相關任務來測試事情。
然而,像你一樣,我遇到了一些麻煩,想出適當的目錄結構併爲新任務構建設置(沒有集羣管理的東西)。所以,我創建了hello-samza-base,刪除了hello-samza以外的新任務不需要的所有內容。我在建立新任務的README中包含了說明。
就部署而言,這有點複雜。做一些關於創建Zookeeper,Kafka和Yarn集羣的閱讀。
建立自己的工作非常簡單。首先讓你好samza:
git clone https://git.apache.org/samza-hello-samza.git hello-samza
下一步是通過這些命令來設置系統:
bin/grid bootstrap
請確保一切是怎麼回事良好的jps
下一步是從pom.xml中刪除apache-rat-plugin,而不是在hello-samza中構建你的項目。
當你remome,你可以添加一個java文件作業在src文件夾(MyTask.java),也是一個.properties文件裏面的配置目錄(My.Task。屬性)
這是一個空樣本作業(MyTask.java)。
package com.samza;
public class MyTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka","topicOut");
public void process (IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator) throws Exception {
// Do something useful
}
}
不要忘記實施.properties文件。
如果哈瓦非錯誤代碼,與像Maven構建:
mvn clean package
mkdir -p deploy/samza
tar -xvf ./samza-job-package/target/samza-job-package-0.10.0-dist.tar.gz -C deploy/samza
之後,你的服務器正在運行(如果它不是你可以./bin/grid start all)
開始,你可以通過deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/MyTask.properties
和消費卡夫卡客戶消費 deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outTopic
我創建Samza工作通過Maven的Eclipse項目離別的結果。其中,在T加載0.9.2版本的依賴他的pom.xml這個文件的內容(我有一些版本的問題,所以你可能有一些工作):
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.acio.samza</groupId>
<artifactId>samzafroga</artifactId>
<version>0.0.1</version>
<name>samzafroga</name>
<dependencies>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-shell</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-yarn_2.10</artifactId>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
<artifactId>irclib</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jettyVersion}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jettyVersion}</version>
</dependency>
</dependencies>
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<samza.version>0.8.0</samza.version>
<jettyVersion>7.6.16.v20140903</jettyVersion>
</properties>
<repositories>
<repository>
<id>apache-releases</id>
<url>https://repository.apache.org/content/groups/public</url>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>https://oss.sonatype.org/content/groups/scala-tools</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.9</version>
<configuration>
<excludes>
<exclude>*.patch</exclude>
<exclude>**/target/**</exclude>
<exclude>*.json</exclude>
<exclude>.vagrant/**</exclude>
<exclude>.git/**</exclude>
<exclude>*.md</exclude>
<exclude>docs/**</exclude>
<exclude>config/**</exclude>
<exclude>bin/**</exclude>
<exclude>.gitignore</exclude>
<exclude>**/.cache/**</exclude>
<exclude>deploy/**</exclude>
<exclude>**/.project</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- plugin to build the tar.gz file filled with examples -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>src/assembly/bin.xml</descriptor>
</descriptors>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-log4j</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-shell</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-yarn_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
<artifactId>irclib</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
工作的基本代碼是這一個:
package xxxx;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
public class Redirect implements StreamTask {
private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "samzaout");
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator)
{
String msg = (String)envelope.getMessage();
// Transformation
String outmsg = "xxx-" + msg + "-xxx";
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg));
}
}
一旦你編譯好後,你需要將它編組成一個jar文件,並將它放在一個可以訪問所有samza節點,web或hdfs的位置。
從屬性文件引用此屬性文件,您必須創建它才能啓動它。在porject網頁中查找示例。
這就是hello-samza項目的要點 - 顯示1)體系結構是什麼樣子的(源代碼,庫代碼和配置),2)你的代碼被推入哪個目錄它成功部署了它)和3)讓它運行所需的所有代碼。 –