微信搜索bigdata029 | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 订阅本站 | 赞助作者:赞助作者

Spark动态资源分配-Dynamic Resource Allocation

Spark lxw1234@qq.com 9036℃ 6评论

关键字:spark、资源分配、dynamic resource allocation

Spark中,所谓资源单位一般指的是executors,和Yarn中的Containers一样,在Spark On Yarn模式下,通常使用–num-executors来指定Application使用的executors数量,而–executor-memory和–executor-cores分别用来指定每个executor所使用的内存和虚拟CPU核数。相信很多朋友至今在提交Spark应用程序时候都使用该方式来指定资源。

假设有这样的场景,如果使用Hive,多个用户同时使用hive-cli做数据开发和分析,只有当用户提交执行了Hive SQL时候,才会向YARN申请资源,执行任务,如果不提交执行,无非就是停留在Hive-cli命令行,也就是个JVM而已,并不会浪费YARN的资源。现在想用Spark-SQL代替Hive来做数据开发和分析,也是多用户同时使用,如果按照之前的方式,以yarn-client模式运行spark-sql命令行(http://lxw1234.com/archives/2015/08/448.htm),在启动时候指定–num-executors 10,那么每个用户启动时候都使用了10个YARN的资源(Container),这10个资源就会一直被占用着,只有当用户退出spark-sql命令行时才会释放。

spark-sql On Yarn,能不能像Hive一样,执行SQL的时候才去申请资源,不执行的时候就释放掉资源呢,其实从Spark1.2之后,对于On Yarn模式,已经支持动态资源分配(Dynamic Resource Allocation),这样,就可以根据Application的负载(Task情况),动态的增加和减少executors,这种策略非常适合在YARN上使用spark-sql做数据开发和分析,以及将spark-sql作为长服务来使用的场景。

本文以Spark1.5.0和hadoop-2.3.0-cdh5.0.0,介绍在spark-sql On Yarn模式下,如何使用动态资源分配策略。

YARN的配置

首先需要对YARN的NodeManager进行配置,使其支持Spark的Shuffle Service。

  • 修改每台NodeManager上的yarn-site.xml:

##修改
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
##增加
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>

  • 将$SPARK_HOME/lib/spark-1.5.0-yarn-shuffle.jar拷贝到每台NodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
  • 重启所有NodeManager。

Spark的配置

配置$SPARK_HOME/conf/spark-defaults.conf,增加以下参数:

spark.shuffle.service.enabled true   //启用External shuffle Service服务
spark.shuffle.service.port 7337 //Shuffle Service服务端口,必须和yarn-site中的一致
spark.dynamicAllocation.enabled true  //开启动态资源分配
spark.dynamicAllocation.minExecutors 1  //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30  //每个Application最大并发分配的executor数
spark.dynamicAllocation.schedulerBacklogTimeout 1s 
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
  • 动态资源分配策略:

开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。
之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,而该方式可以在很少次数的申请之后得到满足。

  • 资源回收策略

当application的executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。

使用spark-sql On Yarn执行SQL,动态分配资源

./spark-sql --master yarn-client \
--executor-memory 1G \
-e "SELECT COUNT(1) FROM ut.t_ut_site_log where pt >= '2015-12-09' and pt <= '2015-12-10'"

spark

该查询需要123个Task。

spark

从AppMaster的WEB界面可以看到,总共有31个Executors,其中一个是Driver,既有30个Executors并发执行,而30,正是在spark.dynamicAllocation.maxExecutors参数中配置的最大并发数。如果一个查询只有10个Task,那么只会向Yarn申请10个executors的资源。

需要注意:
如果使用
./spark-sql –master yarn-client –executor-memory 1G
进入spark-sql命令行,在命令行中执行任何SQL查询,都不会执行,原因是spark-sql在提交到Yarn时候,已经被当成一个Application,而这种,除了Driver,是不会被分配到任何executors资源的,所有,你提交的查询因为没有executor而不能被执行。

而这个问题,我使用Spark的ThriftServer(HiveServer2)得以解决。

使用Thrift JDBC方式执行SQL,动态分配资源

首选以yarn-client模式,启动Spark的ThriftServer服务,也就是HiveServer2.

  • 配置ThriftServer监听的端口号和地址
vi $SPARK_HOME/conf/spark-env.sh
export HIVE_SERVER2_THRIFT_PORT=10000
export HIVE_SERVER2_THRIFT_BIND_HOST=0.0.0.0
  • 以yarn-client模式启动ThriftServer
cd $SPARK_HOME/sbin/
./start-thriftserver.sh \
--master yarn-client \
--conf spark.driver.memory=3G \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=30 \
--conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s

启动后,ThriftServer会在Yarn上作为一个长服务来运行:

spark

  • 使用beeline通过JDBC连接spark-sql
cd $SPARK_HOME/bin
./beeline -u jdbc:hive2://localhost:10000 -n lxw1234

spark

执行查询:
select count(1) from ut.t_ut_site_log where pt = ‘2015-12-10′;
该任务有64个Task:

spark

而监控页面上的并发数仍然是30:

spark

执行完后,executors数只剩下1个,应该是缓存数据,其余的全部被回收:

spark

这样,多个用户可以通过beeline,JDBC连接到Thrift Server,执行SQL查询,而资源也是动态分配的。

需要注意的是,在启动ThriftServer时候指定的spark.dynamicAllocation.maxExecutors=30,是整个ThriftServer同时并发的最大资源数,如果多个用户同时连接,则会被多个用户共享竞争,总共30个。

 

这样,也算是解决了多用户同时使用spark-sql,并且动态分配资源的需求了。

Spark动态资源分配官方文档:http://spark.apache.org/docs/1.5.0/job-scheduling.html#dynamic-resource-allocation

您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

 

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Spark动态资源分配-Dynamic Resource Allocation

喜欢 (13)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(6)个小伙伴在吐槽
  1. 我这个这么配置以后用yarn-client方式启动thrif-server发现UI界面资源是涨上去了,但是hadoop页面log有异常YARN executor launch context: env: CLASSPATH -> /usr/lib/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar{{PWD}}{{PWD}}/__spark__.jar$HADOOP_CONF_DIR$HADOOP_COMMON_HOME/*$HADOOP_COMMON_HOME/lib/*$HADOOP_HDFS_HOME/*$HADOOP_HDFS_HOME/lib/*$HADOOP_MAPRED_HOME/*$HADOOP_MAPRED_HOME/lib/*$HADOOP_YARN_HOME/*$HADOOP_YARN_HOME/lib/*$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/* SPARK_LOG_URL_STDERR -> http://datanode01:8042/node/containerlogs/container_1453691835395_0001_01_000006/root/stderr?start=-4096 SPARK_YARN_STAGING_DIR -> .sparkStaging/application_1453691835395_0001 SPARK_YARN_CACHE_FILES_FILE_SIZES -> 183999807 SPARK_USER -> root SPARK_YARN_CACHE_FILES_VISIBILITIES -> PRIVATE SPARK_YARN_MODE -> true SPARK_YARN_CACHE_FILES_TIME_STAMPS -> 1453692302988 SPARK_LOG_URL_STDOUT -> http://datanode01:8042/node/containerlogs/container_1453691835395_0001_01_000006/root/stdout?start=-4096 SPARK_YARN_CACHE_FILES -> hdfs://namenode-cluster/user/root/.sparkStaging/application_1453691835395_0001/spark-assembly-1.5.1-hadoop2.6.0.jar#__spark__.jar command: {{JAVA_HOME}}/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms32768m -Xmx32768m -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.driver.port=56117' '-Dspark.shuffle.service.port=7337' -Dspark.yarn.app.container.log.dir= org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@192.168.85.81:56117/user/CoarseGrainedScheduler --executor-id 3 --hostname datanode01 --cores 1 --app-id application_1453691835395_0001 --user-class-path file:$PWD/__app__.jar 1> /stdout 2> /stderr =============================================================================== 16/01/25 11:31:41 INFO impl.ContainerManagementProtocolProxy: Opening proxy : datanode01:50198 Exception in thread "ContainerLauncher #4" java.lang.Error: org.apache.spark.SparkException: Exception while starting container container_1453691835395_0001_01_000006 on host datanode01 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1116) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.spark.SparkException: Exception while starting container container_1453691835395_0001_01_000006 on host datanode01 at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:123) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:67) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) ... 2 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:120) ... 4 more 不知道是什么原因呢?
    泰格2016-01-25 13:48 回复
    • The auxService:spark_shuffle does not exist 这个是NodeManager没找到spark yarn-shuffle的jar包。 需要放置在NodeManager的CLASSPATH中,并且重启NodeManager。
      lxw1234@qq.com2016-01-25 13:59 回复
      • 你好,我也遇见了这个问题,我用的cdh5.8.4、但是我spark是单独的,版本是2.1.0.我去cdh的/opt/cloudera/parcels/CDH-5.8.4-1.cdh5.8.4.p0.5/lib/hadoop-yarn/lib目录看到有个spark-1.6.0-cdh5.8.4-yarn-shuffle.jar外连接,但是我程序运行后,就是报spark_shuffle does not exist at 异常。
        ccc2017-04-12 18:51 回复
  2. 你好 需要注意那段是什么意思?没看明白 spark-sql 交互式的跑不运行sql么?我看了没问题啊
    战士2016-07-29 15:06 回复
  3. 需要注意 里边的问题,好像在spark 2里边已经解决。我这里打开动态分配,跑client模式的spark-sql可以计算。
    John2016-12-16 14:12 回复
  4. ./spark-sql –master yarn-client –executor-memory 1G 进入spark-sql命令行,在命令行中执行任何SQL查询,都不会执行,原因是spark-sql在提交到Yarn时候,已经被当成一个Application,而这种,除了Driver,是不会被分配到任何executors资源的,所有,你提交的查询因为没有executor而不能被执行。 可以执行的把?
    翟玉勇2017-02-08 17:41 回复