Spark实战

发布时间 2023-06-02 07:12:07作者: strongmore

WordCount程序

这个需求就是类似于我们在学习MapReduce的时候写的案例

需求这样的:读取文件中的所有内容,计算每个单词出现的次数

注意:由于Spark支持Java、Scala这些语言,目前在企业中大部分公司都是使用Scala语言进行开发,个别公司会使用java进行开发,为了加深对Spark的理解,针对课程中的案例,都会先基于Scala代码进行详细的讲解,然后再使用java代码重新实现一遍。

image

注意:由于目前我们下载的spark的安装包中使用的scala是2.11的,所以在这里要选择对应的scala 2.11版本的依赖。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

scala的sdk使用2.11.12版本。

Scala代码开发

import org.apache.spark.{SparkConf, SparkContext}

object WordCountScala {

  def main(args: Array[String]): Unit = {
    //第一步:创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("WordCountScala") //设置任务名称
      .setMaster("local") //local表示在本地执行
    val sc = new SparkContext(conf)
    //第二步:加载数据
    val linesRDD = sc.textFile("C:\\D-myfiles\\testjar\\spark\\hello.txt")
    //第三步:对数据进行切割,把一行数据切分成一个一个的单词
    val wordsRDD = linesRDD.flatMap(line => line.split(" "))
    //第四步:迭代words,将每个word转化为(word,1)这种形式
    val pairRDD = wordsRDD.map(word => (word, 1))
    //第五步:根据key(其实就是word)进行分组聚合统计
    val wordCountRDD = pairRDD.reduceByKey((a, b) => a + b)
    //第六步:将结果打印到控制台
    wordCountRDD.foreach(wordCount => println(wordCount._1 + "--" + wordCount._2))
    //第七步:停止SparkContext
    sc.stop()

  }
}

注意:由于此时我们在代码中设置的Master为local,表示会在本地创建一个临时的spark集群运行这个代码,这样有利于代码调试

linesRDD中的数据是这样的:

hello you
hello me

wordsRDD中的数据是这样的:

hello
you
hello
me

pairRDD 中的数据是这样的

(hello,1)
(you,1)
(hello,1)
(me,1)

wordCountRDD 中的数据是这样的

(hello,2)
(you,1)
(me,1)

Java代码开发

import java.util.Arrays;

public class WordCountJava {
    public static void main(String[] args) {
        //第一步:创建SparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("WordCountScala") //设置任务名称
                .setMaster("local");//local表示在本地执行
        JavaSparkContext sc = new JavaSparkContext(conf);
        //第二步:加载数据
        JavaRDD<String> linesRDD = sc.textFile("C:\\D-myfiles\\testjar\\spark\\hello.txt");
        //第三步:对数据进行切割,把一行数据切分成一个一个的单词
        JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        //第四步:迭代words,将每个word转化为(word,1)这种形式
        JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
        //第五步:根据key(其实就是word)进行分组聚合统计
        JavaPairRDD<String, Integer> wordCountRDD = pairRDD.reduceByKey(Integer::sum);
        //第六步:将结果打印到控制台
        wordCountRDD.foreach(wordCount -> System.out.println(wordCount._1 + "--" + wordCount._2));
        //第七步:停止SparkContext
        sc.stop();
    }
}

任务提交

针对任务的提交有这么几种形式

使用idea

直接在idea中执行,方便在本地环境调试代码,我们刚才使用的就是这种方式

使用spark-submit

使用spark-submit提交到集群执行,实际工作中会使用这种方式,那接下来我们需要把我们的代码提交到集群中去执行

这个时候就需要对代码打包了

<build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.11</scalaCompatVersion>
                    <scalaVersion>2.11.12</scalaVersion>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意:这里面的scala版本信息要使用2.11,因为这个spark安装包中依赖的scala是2.11的。

然后把spark-core依赖的作用域设置为provided,修改代码中的输入文件路径信息,因为这个时候无法读取windows中的数据了,把代码修改成动态接收输入文件路径,还需要将 setMaster("local") 注释掉,后面我们会在提交任务的时候动态指定master信息

//第一步:创建SparkContext
val conf = new SparkConf()
conf.setAppName("WordCountScala")//设置任务名称
 //.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf);
//第二步:加载数据
var path = "C:\D-myfiles\testjar\spark\hello.txt"
if(args.length==1){
 path = args(0)
}
val linesRDD = sc.textFile(path)
//第一步:创建SparkContext:
//注意,针对java代码需要获取JavaSparkContext
SparkConf conf = new SparkConf();
conf.setAppName("WordCountJava");
 //.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//第二步:加载数据
String path = "C:\D-myfiles\testjar\spark\hello.txt";
if(args.length==1){
 path = args[0];
}
JavaRDD<String> linesRDD = sc.textFile(path);

由于spark-submit命令后面的参数有点多,所以在这我们最好是写一个脚本去提交任务

spark-submit \
--class com.imooc.bigdata.spark.WordCountScala \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--num-executors 1 \
bigdata-spark.jar \
hdfs://bigdata01:9000/hello.txt

此时任务会被提交到YARN集群中,可以看到任务执行成功了。

但是注意:此时想要查看foreach中打印的结果目前是看不到的,需要通过spark自己的任务界面才能看到,现在只有在任务运行中的时候,我们才能进到spark的任务界面,任务执行结束之后我们就进不去了,一会我们再解决这个问题。

使用spark-shell

这种方式方便在集群环境中调试代码

有一些代码对环境没有特殊依赖的时候我们可以直接使用第一种方式,在idea中调试代码

但是有时候代码需要依赖线上的一些环境,例如:需要依赖线上的数据库中的数据,由于权限问题,我们在本地是无法连接的,这个时候想要调试代码的话,可以选择使用spark-shell的方式,直接在线上服务器中开启一个spark的交互式命令行窗口

默认是使用local模式开启本地集群,在spark-shell命令行中sparkContex是已经创建好的了,可以直接通过sc使用

注意:使用spark-shell的时候,也可以选择指定开启本地spark集群,或者连接standalone集群,或者使用on yarn模式,都是可以的

bin/spark-shell
scala> val linesRDD = sc.textFile("/hello.txt")
linesRDD: org.apache.spark.rdd.RDD[String] = /hello.txt MapPartitionsRDD[5] at t                                                                                                                                                             extFile at <console>:24

scala> val wordsRDD = linesRDD.flatMap(line => line.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <                                                                                                                                                             console>:25

scala> val pairRDD = wordsRDD.map(word => (word, 1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at                                                                                                                                                              <console>:25

scala> val wordCountRDD = pairRDD.reduceByKey((a, b) => a + b)
wordCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduce                                                                                                                                                             ByKey at <console>:25

scala> wordCountRDD.foreach(wordCount => println(wordCount._1 + "--" + wordCount                                                                                                                                                             ._2))
[Stage 0:=======================================>                   (2 + 1) / 3]                                                                                                                                                             me--1
you--1
hello--2
scala>

文件路径为hdfs路径

我们尝试使用on yarn模式,这个时候执行的代码就是在YARN上执行的了

bin/spark-shell --master yarn --deploy-mode client

输入代码和上面一样,但最终结果不会打印到控制台。如果想要使用spark-shell连接spark的standalone集群的话,只需要通过–master参数指定集群主节点的url信息即可。

Spark historyServer

刚才我们使用on yarn模式的时候会发现看不到输出的日志信息,这主要是因为没有开启spark的historyserver,我们只开启了hadoop的historyserver。

先对 spark-defaults.conf.template 重命名,去掉后缀,增加以下内容

spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=http://bigdata01:18080

注意:在哪个节点上启动spark的historyserver进程,spark.yarn.historyServer.address的值里面就指定哪个节点的主机名信息

在spark-env.sh中增加以下内容

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs"

启动spark的history服务

sbin/start-history-server.sh

重新使用on yarn模式向集群提交任务,查看spark的任务界面

spark-submit \
--class com.imooc.bigdata.spark.WordCountScala \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--num-executors 1 \
bigdata-spark.jar \
hdfs://bigdata01:9000/hello.txt

参考

Spark client mode 和 cluster mode 的区别