cool-2018-10-21-centos7-spark1.5集群
搭建spark集群
前提: 配置好hadoop环境变量
时间同步
service iptables stop
ntpdate 0.asia.pool.ntp.org
scala安装配置,spark-1.5用到Scala2.11
tar -zxvf scala-2.11.4.tgz
mv scala-2.11.4 /home
vim ~/.bash_profile
export SCALA_HOME=/home/scala-2.11.4
export PATH=$PATH:$SCALA_HOME/bin
source ~/.bash_profile
scala -version
解压spark压缩包
tar -zxvf spark-1.5.1-bin-hadoop2.4.tgz
mv spark-1.5.1-bin-hadoop2.4 /home/spark-1.5
cd /home/spark-1.5/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_79
export SCALA_HOME=/home/scala-2.11.4
export SPARK_MASTER_IP=192.168.25.121
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
cp slaves.template slaves
vim slaves
hadoop2
hadoop3
hadoop4
home]# scp -r spark-1.5 [email protected]:/home/
home]# scp -r spark-1.5 [email protected]:/home/
home]# scp -r spark-1.5 [email protected]:/home/
master节点在spark的sbin目录下
执行./start-all.sh
测试
1、使用jps和8080端口可以检查集群是否启动成功
2、进入spark-shell查看是否正常
./spark-shell
val lines = sc.textFile("file:///root/hello.txt")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val results = pairs.reduceByKey(_+_)
results.collect
results.saveAsTextFile("hdfs://bigdata002:8020/20181026")
3、编写java代码运行测试用例
建包,创建一个maven工程
导入依赖包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.spark</groupId>
<artifactId>spark-study-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spark-project</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/main/test</testSourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.spark.study.App</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
package cn.spark.study;
public class App {
public static void main(String[] args) {
System.out.println("Hello World!");
}
}
因为hadoop集群配置为HA,所以这里用8020端口访问HDFS
package cn.spark.study.core;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class WordCountCluster {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("WordCountCluster");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("hdfs://hadoop2:8020/spark.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
}
});
sc.close();
}
}
eclipse打包流程
将spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar上传到spark集群中
编写wordcount_cluster.sh文件
/home/spark-1.5/bin/spark-submit \
--class cn.spark.study.core.WordCountCluster \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
/root/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
赋权
chmod 777 wordcount_cluster.sh
编写spark.txt文件
hello you
hello me
hello you
hello world
hello you
hello me
上传到hadoop2中,
hadoop fs -put spark.txt /
执行 ./wordcount_cluster.sh
至此测试结束
yarn-cluster 模式运行
/home/spark-1.5/bin/spark-submit \
--class cn.spark.study.core.WordCountCluster \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
/root/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
linux突然所有命令都失效了,显示bash: xxxxx: command not found...
出现这个问题是因为系统的环境变量没有正确配置造成的,造成这个原因有很多,比如系统升级,比如不正当操作。解决的方式有两种。
其一:直接在linux命令行界面输入如下,然后回车(导入环境变量,以及shell常见的命令的存放地址):
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
其二:如果系统所有命令都不能使用时,还可以使用绝对命令vi打开profile
/bin/vi /etc/profile
在系统的配置文件里添加环境变量地址
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin