Spark学习之2:Worker启动流程

Spark学习之2:Worker启动流程
1. 启动脚本
sbin/start-slaves.sh
1. # Launch the slaves
2. if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
3.  exec "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" 1 "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
4. else
5.  if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
6.    SPARK_WORKER_WEBUI_PORT=8081
7.  fi
8.  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
9.    "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" $(( $i + 1 ))  "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT" --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
10.  done
11. fi
假设每个节点启动⼀个Worker。
具体执⾏:
1.  exec "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" 1 "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
该语句分为两部分:
(1)
1. exec "$sbin/slaves.sh" cd "$SPARK_HOME"
登录到worker服务器并cd到SPARK_HOME⽬录。
(2)
1. "$sbin/start-slave.sh" 1 "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
在worker服务器执⾏sbin/start-slave.sh脚本。
参数“1”代码worker的编号,⽤来区分不同worker实例的⽇志⽂件。如:
火筒式加热炉
1. spark-xxx-org.apache.spark.deploy.worker.Worker-1-CentOS-0
2.out
2. spark-xxx-org.apache.spark.deploy.worker.Worker-1.pid
其中“Worker-1”中的“1”就代表worker编号。
这个参数并不会传⼊Worker类。传⼊Worker类的参数为:
spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT。
2. Worker.main
1.  def main(argStrings: Array[String]) {
2.    ister(log)
3.    val conf = new SparkConf
4.    val args = new WorkerArguments(argStrings, conf)
5.    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, s,
6.      , args.masters, args.workDir)
7.    actorSystem.awaitTermination()
8.  }
main函数的职责:
(1)创建WorkerArguments对象并初始化其成员;
(2)调⽤startSystemAndActor⽅法,创建ActorSystem对象并启动Worker actor;
安全门卡2.1. WorkerArguments
1.  var cores = inferDefaultCores()
ca12142.  var memory = inferDefaultMemory()
(1)计算默认核数
(2)计算默认内存⼤⼩
1.  List)
2.
3.  // This mutates the SparkConf, so all accesses to it must be made after this line
4.  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
(1)parse⽅法负责解析启动脚本所带的命令⾏参数;
(2)loadDefaultSparkProperties负责从配置⽂件中加载spark运⾏属性,默认⽽配置⽂件为f;
2.2. startSystemAndActor
1.    val (actorSystem, boundPort) = ateActorSystem(systemName, host, port,
2.      conf = conf, securityManager = securityMgr)
3.    val masterAkkaUrls = masterUrls.AkkaUrl(_, AkkaUtils.protocol(actorSystem)))
4.    actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
5.      masterAkkaUrls, systemName, actorName,  workDir, conf, securityMgr), name = actorName)
(1)通过ateActorSystem创建ActorSystem对象
(2)创建Worker actor并启动
3. Worker Actor
3.1. 重要数据成员
1.  val executors = new HashMap[String, ExecutorRunner]
2.  val finishedExecutors = new HashMap[String, ExecutorRunner]
3.  val drivers = new HashMap[String, DriverRunner]
4.  val finishedDrivers = new HashMap[String, DriverRunner]
5.  val appDirectories = new HashMap[String, Seq[String]]
6.  val finishedApps = new HashSet[String]
3.2. Worker.preStart
1.    createWorkDir()
2.    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
3.    shuffleService.startIfEnabled()
4.    webUi = new WorkerWebUI(this, workDir, webUiPort)
5.    webUi.bind()
6.    registerWithMaster()
(1)创建Worker节点⼯作⽬录;
(2)监听RemotingLifecycleEvent事件,它⼀个trait:
1. sealed trait RemotingLifecycleEvent extends Serializable {
2.  def logLevel: Logging.LogLevel
3. }
Worker只处理了DisassociatedEvent消息
(3)创建并启动WorkerWebUI
(4)向Master进⾏注册,registerWithMaster将调⽤tryRegisterAllMasters⽅法向Master节点发送注册消息3.3. isterWithMaster
1.    registrationRetryTimer match {
2.      case None =>
3.        registered = false
4.        tryRegisterAllMasters()
5.        connectionAttemptCount = 0
6.        registrationRetryTimer = Some {
7.          context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
8.            INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
9.        }
10.      case Some(_) =>
11.        logInfo("Not spawning another attempt to register with the master, since there is an" +
12.          " attempt scheduled already.")
13.    }
(1)调⽤tryRegisterAllMasters⽅法向Master发起注册消息;
(2)创建注册重试定时器,通过向⾃⼰(Worker Actor)发送ReregisterWithMaster消息;
3.3.1. RegisterAllMasters
双氧水稳定剂
1.    for (masterAkkaUrl <- masterAkkaUrls) {
2.      logInfo("Connecting to master " + masterAkkaUrl + "...")
3.      val actor = context.actorSelection(masterAkkaUrl)
4.      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
5.    }
(1)创建Master Actor远程引⽤;
(2)向Master发送RegisterWorker消息;如果注册成功,Master将向Worker发送RegisteredWorker消息。workerId是⼀个字符串,定义:
1.  val workerId = generateWorkerId()
2.  ...
3.  def generateWorkerId(): String = {
4.    "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
5.  }
格式:worker-时间-主机名-端⼝
3.4. Worker消息处理
3.4.1. RegisteredWorker消息
此消息表⽰Worker向Master注册成功消息;该消息处理的主要⽬的是启动⼼跳发送定时器。
1.    case RegisteredWorker(masterUrl, masterWebUiUrl) =>
2.      logInfo("Successfully registered with master " + masterUrl)
3.      registered = true
4.      changeMaster(masterUrl, masterWebUiUrl)
5.      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
6.      if (CLEANUP_ENABLED) {
7.        logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
8.        context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
9.          CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
10.      }
(1)设置注册状态;
(2)调⽤changeMaster⽅法
(3)创建⼼跳发送定时器,向⾃⼰(Worker Actor)发送SendHeartbeat消息;
3.4.1.1. Worker.changeMaster
1.    // activeMasterUrl it's a valid Spark url since we receive it from master.
2.    activeMasterUrl = url
3.    activeMasterWebUiUrl = uiUrl
4.    master = context.actorSelection(
5.      AkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system)))
6.    masterAddress = AkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system))
7.    connected = true
8.    // Cancel any outstanding re-registration attempts because we found a new master
9.    registrationRetryTimer.foreach(_.cancel())
10.    registrationRetryTimer = None
职责:
(1)创建Master远程引⽤并赋值给master;
(2)将连接状态设置为true;
(3)取消registrationRetryTimer定时器;
3.4.2. SendHeartbeat消息
1.    case SendHeartbeat =>
2.      if (connected) { master ! Heartbeat(workerId) }
向master发送Heartbeat消息。
3.4.3. ReregisterWithMaster消息展示架制作
1.    case ReregisterWithMaster =>
2.      reregisterWithMaster()
reregisterWithMaster⽅法职责:
(1)如果已经注册成功,取消registrationRetryTimer定时器;
(2)如果注册失败,从新向master发送RegisterWorker消息;初始默认重连次数为6,最⼤重连次数为16。验证码自动输入
1.  // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
2.  // Afterwards, the next 10 attempts are between 30 and 90 seconds.
3.  // A bit of randomness is introduced so that not all of the workers attempt to reconnect at
4.  // the same time.
5.  val INITIAL_REGISTRATION_RETRIES = 6
6.  val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
前6次和后10次采⽤不同的周期。
4. 启动结束
到此,Worker节点就启动完成,它定时向Master节点发送⼼跳。在SparkSubmit提交Application时,将接收Master发送的启动Executor消息,由Executor和Driver进⾏消息通信。

本文发布于:2024-09-21 05:32:18,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/185873.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   启动   注册   发送   创建
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议