cool-2018-10-21-centos7-spark1.5集群

搭建spark集群

前提: 配置好hadoop环境变量

时间同步

service iptables stop

ntpdate 0.asia.pool.ntp.org

scala安装配置,spark-1.5用到Scala2.11

tar -zxvf scala-2.11.4.tgz

mv scala-2.11.4 /home

vim ~/.bash_profile

export SCALA_HOME=/home/scala-2.11.4
export PATH=$PATH:$SCALA_HOME/bin

source ~/.bash_profile

scala -version

cool-2018-10-21-centos7-spark1.5集群

解压spark压缩包

tar -zxvf spark-1.5.1-bin-hadoop2.4.tgz

mv spark-1.5.1-bin-hadoop2.4 /home/spark-1.5

cd /home/spark-1.5/conf

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_79
export SCALA_HOME=/home/scala-2.11.4
export SPARK_MASTER_IP=192.168.25.121
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

cp slaves.template slaves

vim slaves

hadoop2
hadoop3
hadoop4

home]# scp -r spark-1.5 [email protected]:/home/

home]# scp -r spark-1.5 [email protected]:/home/

home]# scp -r spark-1.5 [email protected]:/home/

master节点在spark的sbin目录下

执行./start-all.sh

测试


1、使用jps和8080端口可以检查集群是否启动成功

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

2、进入spark-shell查看是否正常

./spark-shell

val lines = sc.textFile("file:///root/hello.txt")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val results = pairs.reduceByKey(_+_)
results.collect
results.saveAsTextFile("hdfs://bigdata002:8020/20181026")

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

3、编写java代码运行测试用例

建包,创建一个maven工程

cool-2018-10-21-centos7-spark1.5集群

导入依赖包

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>cn.spark</groupId>
	<artifactId>spark-study-java</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spark-project</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.5.1</version>
		</dependency>

		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
			<version>2.10.6</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-hive_2.10</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.4.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>1.5.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-flume-sink_2.10</artifactId>
			<version>1.5.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-flume-sink_2.10</artifactId>
			<version>1.5.2</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.6</version>
		</dependency>
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20090211</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.4.3</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.4.3</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-annotations</artifactId>
			<version>2.4.3</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.1.41</version>
		</dependency>
		<dependency>
			<groupId>fastutil</groupId>
			<artifactId>fastutil</artifactId>
			<version>5.0.9</version>
		</dependency>
	</dependencies>

	<build>
		<sourceDirectory>src/main/java</sourceDirectory>
		<testSourceDirectory>src/main/test</testSourceDirectory>

		<plugins>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass></mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.2.1</version>
				<executions>
					<execution>
						<goals>
							<goal>exec</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<executable>java</executable>
					<includeProjectDependencies>true</includeProjectDependencies>
					<includePluginDependencies>false</includePluginDependencies>
					<classpathScope>compile</classpathScope>
					<mainClass>com.spark.study.App</mainClass>
				</configuration>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>

		</plugins>
	</build>

</project>
package cn.spark.study;

public class App {
	public static void main(String[] args) {
		System.out.println("Hello World!");
	}
}

因为hadoop集群配置为HA,所以这里用8020端口访问HDFS

package cn.spark.study.core;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCountCluster {
	
	public static void main(String[] args) {
		
		SparkConf conf = new SparkConf()
				.setAppName("WordCountCluster");  
		
		JavaSparkContext sc = new JavaSparkContext(conf);

		JavaRDD<String> lines = sc.textFile("hdfs://hadoop2:8020/spark.txt");
		
		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			
			private static final long serialVersionUID = 1L;
			
			@Override
			public Iterable<String> call(String line) throws Exception {
				return Arrays.asList(line.split(" "));  
			}
			
		});

		JavaPairRDD<String, Integer> pairs = words.mapToPair(
				
				new PairFunction<String, String, Integer>() {

					private static final long serialVersionUID = 1L;
		
					@Override
					public Tuple2<String, Integer> call(String word) throws Exception {
						return new Tuple2<String, Integer>(word, 1);
					}
					
				});
		
		JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
				
				new Function2<Integer, Integer, Integer>() {
					
					private static final long serialVersionUID = 1L;
		
					@Override
					public Integer call(Integer v1, Integer v2) throws Exception {
						return v1 + v2;
					}
					
				});

		wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			
			private static final long serialVersionUID = 1L;
			
			@Override
			public void call(Tuple2<String, Integer> wordCount) throws Exception {
				System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");    
			}
			
		});
		
		sc.close();
	}
	
}

eclipse打包流程

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群

spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar上传到spark集群中

编写wordcount_cluster.sh文件

/home/spark-1.5/bin/spark-submit \
--class cn.spark.study.core.WordCountCluster \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
/root/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

赋权

chmod 777 wordcount_cluster.sh

编写spark.txt文件

hello you
hello me
hello you
hello world
hello you
hello me

上传到hadoop2中,

hadoop fs -put spark.txt /

cool-2018-10-21-centos7-spark1.5集群

执行 ./wordcount_cluster.sh

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群cool-2018-10-21-centos7-spark1.5集群

至此测试结束

yarn-cluster 模式运行

/home/spark-1.5/bin/spark-submit \
--class cn.spark.study.core.WordCountCluster \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
/root/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

cool-2018-10-21-centos7-spark1.5集群

cool-2018-10-21-centos7-spark1.5集群cool-2018-10-21-centos7-spark1.5集群linux突然所有命令都失效了,显示bash: xxxxx: command not found...

出现这个问题是因为系统的环境变量没有正确配置造成的,造成这个原因有很多,比如系统升级,比如不正当操作。解决的方式有两种。

      其一:直接在linux命令行界面输入如下,然后回车(导入环境变量,以及shell常见的命令的存放地址):

     export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin

      其二:如果系统所有命令都不能使用时,还可以使用绝对命令vi打开profile
       /bin/vi  /etc/profile

       在系统的配置文件里添加环境变量地址

       export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin