路径:/core/src/main/scala//deploy/Master/
提交应用程序,submit的路径:
路径:/core/src/main/scala//deploy/总结:
Executor在集群中分散启动,有利于task计算的数据本地化。
默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
结论演示使用Spark-submit提交任务演示。也可以使用spark-shell
默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。
./spark-submit--masterspark://node01:7077--./lib/
运行结果
2.在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。
./spark-submit--masterspark://node01:7077--./lib/
运行结果
3.内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。
./spark-submit--masterspark://node01:7077--./lib/
4.--total-executor-cores集群中共使用多少cores
注意:一个进程不能让集群多个节点共同启动。
./spark-submit--masterspark://node01:7077--exec./lib/任务调度源码分析Action算子开始分析
任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。
划分stage,以taskSet形式提交任务DAGScheduler类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:
二次排序在项目中添加一个文件
排序前文件中内容
编写代码
{defmain(args:Array[String]):Unit={valsconf=newSparkConf().setAppName("SecondSort").setMaster("local")valsc=newSparkContext(sconf)vallines=("")valpairs={x=(newSecondSortKey(("")(0).toInt,("")(1).toInt),x)}valsortedPairs=(false)//valsortedPairs=(_._1,false)(_._2).foreach{println}()}}classSecondSortKey(valfirst:Int,valsecond:Int)extsOrdered[SecondSortKey]withSerializable{defcompare(that:SecondSortKey):Int={if(==0)}}运行效果
topN和分组取topNtopN需求:获取成绩单中,成绩排在前五的学生信息
在项目中添加一个文件,在文件中以K,V(K表示成绩,V表示姓名)对的形式添加数据,如下图:
编写代码
{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("TopN").setMaster("local")valsc=newSparkContext(conf)vallines=("")vallineList=(x=((",")(0),x))valsortRdd=(false)valresultRDD=(x=x._2)for((5)){println(a)}()}}运行结果
分组取topN需求:给每个班级的学生成绩排序
在项目中添加一个文件,在文件中编写K,V(K表示班级,V表示成绩)格式的数据,如下图
编写代码
{SparkContext,SparkConf}objectSparkGroupTopN{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("GroupTopN").setMaster("local")valsc=newSparkContext(conf)vallines=("")vallineList=(x=(("\t")(0),("\t")(1))).groupByKey()valtopList=(x={vart=List[Int]()for(a-x._2){t=t.::()}println(x._1){x=-x}.take(3)}){println}}}运行结果
免责声明:本文章如果文章侵权,请联系我们处理,本站仅提供信息存储空间服务如因作品内容、版权和其他问题请于本站联系