-
Notifications
You must be signed in to change notification settings - Fork 418
[CELEBORN-2261] Parallelize reserve slots #3598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,13 @@ package org.apache.celeborn.service.deploy.worker | |
|
|
||
| import java.io.IOException | ||
| import java.util.{ArrayList => jArrayList, HashMap => jHashMap, List => jList, Set => jSet} | ||
| import java.util.concurrent._ | ||
| import java.util.concurrent.{CopyOnWriteArrayList, _} | ||
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray, AtomicReference} | ||
| import java.util.function.BiFunction | ||
| import java.util.function.{BiConsumer, BiFunction, Consumer, Supplier} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.util.Try | ||
|
|
||
| import io.netty.util.{HashedWheelTimer, Timeout, TimerTask} | ||
| import org.roaringbitmap.RoaringBitmap | ||
|
|
@@ -62,6 +63,7 @@ private[deploy] class Controller( | |
| var partitionLocationInfo: WorkerPartitionLocationInfo = _ | ||
| var timer: HashedWheelTimer = _ | ||
| var commitThreadPool: ThreadPoolExecutor = _ | ||
| var reserveSlotsThreadPool: ThreadPoolExecutor = _ | ||
| var commitFinishedChecker: ScheduledExecutorService = _ | ||
| var asyncReplyPool: ScheduledExecutorService = _ | ||
| val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate | ||
|
|
@@ -86,6 +88,10 @@ private[deploy] class Controller( | |
| asyncReplyPool = worker.asyncReplyPool | ||
| shutdown = worker.shutdown | ||
|
|
||
| reserveSlotsThreadPool = | ||
| Executors.newFixedThreadPool(conf.workerReserveSlotsIoThreadPoolSize).asInstanceOf[ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use Celeborn's util to create a thread pool instead of raw juc classes, to properly set no daemon, name prefix, exception handler, etc.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe use when something goes wrong, we will lost full stacktrace if we run the task in another thread. |
||
| ThreadPoolExecutor] | ||
|
|
||
| commitFinishedChecker = worker.commitFinishedChecker | ||
| commitFinishedChecker.scheduleWithFixedDelay( | ||
| new Runnable { | ||
|
|
@@ -193,111 +199,145 @@ private[deploy] class Controller( | |
| context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, msg)) | ||
| return | ||
| } | ||
| val primaryLocs = new jArrayList[PartitionLocation]() | ||
| try { | ||
| for (ind <- 0 until requestPrimaryLocs.size()) { | ||
| var location = partitionLocationInfo.getPrimaryLocation( | ||
| shuffleKey, | ||
| requestPrimaryLocs.get(ind).getUniqueId) | ||
| if (location == null) { | ||
| location = requestPrimaryLocs.get(ind) | ||
| val writer = storageManager.createPartitionDataWriter( | ||
| applicationId, | ||
| shuffleId, | ||
| location, | ||
| splitThreshold, | ||
| splitMode, | ||
| partitionType, | ||
| rangeReadFilter, | ||
| userIdentifier, | ||
| partitionSplitEnabled, | ||
| isSegmentGranularityVisible) | ||
| primaryLocs.add(new WorkingPartition(location, writer)) | ||
| } else { | ||
| primaryLocs.add(location) | ||
| } | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logError(s"CreateWriter for $shuffleKey failed.", e) | ||
|
|
||
| def collectResults( | ||
| tasks: ArrayBuffer[CompletableFuture[PartitionLocation]], | ||
| createdWriters: CopyOnWriteArrayList[PartitionDataWriter], | ||
| startTime: Long) = { | ||
| val primaryFuture = CompletableFuture.allOf(tasks.toSeq: _*) | ||
| .whenComplete(new BiConsumer[Void, Throwable] { | ||
| override def accept(ignore: Void, error: Throwable): Unit = { | ||
| if (error != null) { | ||
| createdWriters.forEach(new Consumer[PartitionDataWriter] { | ||
| override def accept(fileWriter: PartitionDataWriter) { | ||
| fileWriter.destroy(new IOException( | ||
| s"Destroy FileWriter $fileWriter caused by " + | ||
| s"reserving slots failed for $shuffleKey.", | ||
| error)) | ||
| } | ||
| }) | ||
| } else { | ||
| val timeToReserveLocations = System.currentTimeMillis() - startTime; | ||
| logInfo( | ||
| s"Reserved ${tasks.size} slots for $shuffleKey in $timeToReserveLocations ms (with ${conf.workerReserveSlotsIoThreadPoolSize} threads)") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. calculation of |
||
| } | ||
| createdWriters.clear() | ||
| } | ||
| }) | ||
| primaryFuture | ||
| } | ||
| if (primaryLocs.size() < requestPrimaryLocs.size()) { | ||
| val msg = s"Not all primary partition satisfied for $shuffleKey" | ||
| logWarning(s"[handleReserveSlots] $msg, will destroy writers.") | ||
| primaryLocs.asScala.foreach { partitionLocation => | ||
| val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter | ||
| fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " + | ||
| s"reserving slots failed for $shuffleKey.")) | ||
| } | ||
| context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg)) | ||
| return | ||
|
|
||
| logInfo(s"Reserving ${requestPrimaryLocs.size()} primary slots for $shuffleKey") | ||
| val startReservePrimaryLocs = System.currentTimeMillis | ||
| val primaryCfTasks = ArrayBuffer[CompletableFuture[PartitionLocation]]() | ||
| val primaryWriters: CopyOnWriteArrayList[PartitionDataWriter] = | ||
| new CopyOnWriteArrayList[PartitionDataWriter] | ||
| (0 until requestPrimaryLocs.size()).foreach { ind => | ||
| primaryCfTasks.append(CompletableFuture.supplyAsync[PartitionLocation]( | ||
| new Supplier[PartitionLocation] { | ||
| override def get(): PartitionLocation = { | ||
| var location: PartitionLocation = partitionLocationInfo.getPrimaryLocation( | ||
| shuffleKey, | ||
| requestPrimaryLocs.get(ind).getUniqueId) | ||
| if (location == null) { | ||
| location = requestPrimaryLocs.get(ind) | ||
| val writer = storageManager.createPartitionDataWriter( | ||
| applicationId, | ||
| shuffleId, | ||
| location, | ||
| splitThreshold, | ||
| splitMode, | ||
| partitionType, | ||
| rangeReadFilter, | ||
| userIdentifier, | ||
| partitionSplitEnabled, | ||
| isSegmentGranularityVisible) | ||
| primaryWriters.add(writer) | ||
| new WorkingPartition(location, writer) | ||
| } else { | ||
| location | ||
| } | ||
| } | ||
| }, | ||
| reserveSlotsThreadPool)) | ||
| } | ||
|
|
||
| val replicaLocs = new jArrayList[PartitionLocation]() | ||
| try { | ||
| for (ind <- 0 until requestReplicaLocs.size()) { | ||
| var location = | ||
| partitionLocationInfo.getReplicaLocation( | ||
| shuffleKey, | ||
| requestReplicaLocs.get(ind).getUniqueId) | ||
| if (location == null) { | ||
| location = requestReplicaLocs.get(ind) | ||
| val writer = storageManager.createPartitionDataWriter( | ||
| applicationId, | ||
| shuffleId, | ||
| location, | ||
| splitThreshold, | ||
| splitMode, | ||
| partitionType, | ||
| rangeReadFilter, | ||
| userIdentifier, | ||
| partitionSplitEnabled, | ||
| isSegmentGranularityVisible) | ||
| replicaLocs.add(new WorkingPartition(location, writer)) | ||
| logInfo(s"Reserving ${requestReplicaLocs.size()} replica slots for $shuffleKey") | ||
| val startReserveReplicLocs = System.currentTimeMillis | ||
| val replicaCfTasks = ArrayBuffer[CompletableFuture[PartitionLocation]]() | ||
| val replicaWriters: CopyOnWriteArrayList[PartitionDataWriter] = | ||
| new CopyOnWriteArrayList[PartitionDataWriter] | ||
| (0 until requestReplicaLocs.size()).foreach { ind => | ||
| replicaCfTasks.append(CompletableFuture.supplyAsync[PartitionLocation]( | ||
| new Supplier[PartitionLocation] { | ||
| override def get(): PartitionLocation = { | ||
| var location = | ||
| partitionLocationInfo.getReplicaLocation( | ||
| shuffleKey, | ||
| requestReplicaLocs.get(ind).getUniqueId) | ||
| if (location == null) { | ||
| location = requestReplicaLocs.get(ind) | ||
| val writer = storageManager.createPartitionDataWriter( | ||
| applicationId, | ||
| shuffleId, | ||
| location, | ||
| splitThreshold, | ||
| splitMode, | ||
| partitionType, | ||
| rangeReadFilter, | ||
| userIdentifier, | ||
| partitionSplitEnabled, | ||
| isSegmentGranularityVisible) | ||
| replicaWriters.add(writer) | ||
| new WorkingPartition(location, writer) | ||
| } else { | ||
| location | ||
| } | ||
| } | ||
| }, | ||
| reserveSlotsThreadPool)) | ||
| } | ||
|
|
||
| // collect results | ||
| val primaryFuture: CompletableFuture[Void] = | ||
| collectResults(primaryCfTasks, primaryWriters, startReservePrimaryLocs) | ||
| val replicaFuture: CompletableFuture[Void] = | ||
| collectResults(replicaCfTasks, replicaWriters, startReserveReplicLocs) | ||
|
|
||
| val future = CompletableFuture.allOf(primaryFuture, replicaFuture) | ||
| future.whenComplete(new BiConsumer[Void, Throwable] { | ||
| override def accept(ignore: Void, error: Throwable): Unit = { | ||
| if (error != null) { | ||
| logError(s"An error occurred while reserving slots for $shuffleKey", error) | ||
| val msg = s"An error occurred while reserving slots for $shuffleKey: $error"; | ||
| context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg)) | ||
| } else { | ||
| replicaLocs.add(location) | ||
| val primaryLocs = primaryCfTasks.map(cf => cf.join()).asJava | ||
| val replicaLocs = replicaCfTasks.map(cf => cf.join()).asJava | ||
| // reserve success, update status | ||
| partitionLocationInfo.addPrimaryPartitions(shuffleKey, primaryLocs) | ||
| partitionLocationInfo.addReplicaPartitions(shuffleKey, replicaLocs) | ||
| shufflePartitionType.put(shuffleKey, partitionType) | ||
| shufflePushDataTimeout.put( | ||
| shuffleKey, | ||
| if (pushDataTimeout <= 0) defaultPushdataTimeout else pushDataTimeout) | ||
| workerInfo.allocateSlots( | ||
| shuffleKey, | ||
| Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs)) | ||
| workerSource.incCounter( | ||
| WorkerSource.SLOTS_ALLOCATED, | ||
| primaryLocs.size() + replicaLocs.size()) | ||
|
|
||
| logInfo(s"Reserved ${primaryLocs.size()} primary location " + | ||
| s"${primaryLocs.asScala.map(_.getUniqueId).mkString(",")} and ${replicaLocs.size()} replica location " + | ||
| s"${replicaLocs.asScala.map(_.getUniqueId).mkString(",")} for $shuffleKey ") | ||
| if (log.isDebugEnabled()) { | ||
| logDebug(s"primary: $primaryLocs\nreplica: $replicaLocs.") | ||
| } | ||
| context.reply(ReserveSlotsResponse(StatusCode.SUCCESS)) | ||
| } | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logError(s"CreateWriter for $shuffleKey failed.", e) | ||
| } | ||
| if (replicaLocs.size() < requestReplicaLocs.size()) { | ||
| val msg = s"Not all replica partition satisfied for $shuffleKey" | ||
| logWarning(s"[handleReserveSlots] $msg, destroy writers.") | ||
| primaryLocs.asScala.foreach { partitionLocation => | ||
| val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter | ||
| fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " + | ||
| s"reserving slots failed for $shuffleKey.")) | ||
| } | ||
| replicaLocs.asScala.foreach { partitionLocation => | ||
| val fileWriter = partitionLocation.asInstanceOf[WorkingPartition].getFileWriter | ||
| fileWriter.destroy(new IOException(s"Destroy FileWriter $fileWriter caused by " + | ||
| s"reserving slots failed for $shuffleKey.")) | ||
| } | ||
| context.reply(ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, msg)) | ||
| return | ||
| } | ||
|
|
||
| // reserve success, update status | ||
| partitionLocationInfo.addPrimaryPartitions(shuffleKey, primaryLocs) | ||
| partitionLocationInfo.addReplicaPartitions(shuffleKey, replicaLocs) | ||
| shufflePartitionType.put(shuffleKey, partitionType) | ||
| shufflePushDataTimeout.put( | ||
| shuffleKey, | ||
| if (pushDataTimeout <= 0) defaultPushdataTimeout else pushDataTimeout) | ||
| workerInfo.allocateSlots( | ||
| shuffleKey, | ||
| Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs)) | ||
| workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() + replicaLocs.size()) | ||
|
|
||
| logInfo(s"Reserved ${primaryLocs.size()} primary location " + | ||
| s"${primaryLocs.asScala.map(_.getUniqueId).mkString(",")} and ${replicaLocs.size()} replica location " + | ||
| s"${replicaLocs.asScala.map(_.getUniqueId).mkString(",")} for $shuffleKey ") | ||
| if (log.isDebugEnabled()) { | ||
| logDebug(s"primary: $primaryLocs\nreplica: $replicaLocs.") | ||
| } | ||
| context.reply(ReserveSlotsResponse(StatusCode.SUCCESS)) | ||
| }) | ||
| } | ||
|
|
||
| private def commitFiles( | ||
|
|
@@ -828,4 +868,24 @@ private[deploy] class Controller( | |
| mapIdx += 1 | ||
| } | ||
| } | ||
|
|
||
| override def onStop(): Unit = { | ||
| if (reserveSlotsThreadPool != null) { | ||
| reserveSlotsThreadPool.shutdown() | ||
| try { | ||
| if (!reserveSlotsThreadPool.awaitTermination( | ||
| conf.workerGracefulShutdownTimeoutMs, | ||
| TimeUnit.MILLISECONDS)) { | ||
| logWarning("ReserveSlotsThreadPool shutdown timeout, forcing shutdown.") | ||
| reserveSlotsThreadPool.shutdownNow() | ||
| } | ||
| } catch { | ||
| case e: InterruptedException => | ||
| logWarning("ReserveSlotsThreadPool shutdown interrupted, forcing shutdown.", e) | ||
| reserveSlotsThreadPool.shutdownNow() | ||
| Thread.currentThread().interrupt() | ||
| } | ||
| } | ||
| super.onStop() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dot for namespaces, camelCase for words