From f4d9d63d21ddc7a2aaad0938cb0f4b526b428003 Mon Sep 17 00:00:00 2001 From: xxx <953396112@qq.com> Date: Sun, 14 Dec 2025 19:47:46 +0800 Subject: [PATCH] [CELEBORN-2143] Create DiskFile sequentially based on createFileOrder --- .../worker/storage/StorageManager.scala | 155 +++++++++++------- .../deploy/worker/storage/StoragePolicy.scala | 36 +++- .../storagePolicy/StoragePolicyCase1.scala | 3 +- .../storagePolicy/StoragePolicyCase2.scala | 4 +- .../storagePolicy/StoragePolicyCase3.scala | 8 +- .../storagePolicy/StoragePolicyCase4.scala | 4 +- 6 files changed, 129 insertions(+), 81 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index a0b46dd6dc3..73fa10c1200 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -238,9 +238,16 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val activeTypes = conf.availableStorageTypes lazy val localOrDfsStorageAvailable: Boolean = { + localStorageAvailable || dfsStorageAvailable + } + + lazy val localStorageAvailable: Boolean = { + StorageInfo.localDiskAvailable(activeTypes) || !diskInfos.isEmpty + } + + lazy val dfsStorageAvailable: Boolean = { StorageInfo.OSSAvailable(activeTypes) || StorageInfo.S3Available(activeTypes) || - StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(activeTypes) || - hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty + StorageInfo.HDFSAvailable(activeTypes) || hdfsDir.nonEmpty || s3Dir.nonEmpty || ossDir.nonEmpty } override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized { @@ -1029,7 +1036,17 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } else if (location.getStorageInfo.localDiskAvailable() || location.getStorageInfo.HDFSAvailable() || location.getStorageInfo.S3Available() || location.getStorageInfo.OSSAvailable()) { logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") - val createDiskFileResult = createDiskFile( + val createDiskFileResult = createLocalDiskFile( + location, + partitionDataWriterContext.getAppId, + partitionDataWriterContext.getShuffleId, + location.getFileName, + partitionDataWriterContext.getUserIdentifier, + partitionDataWriterContext.getPartitionType, + partitionDataWriterContext.isPartitionSplitEnabled) + (null, createDiskFileResult._1, createDiskFileResult._2, createDiskFileResult._3) + } else if (location.getStorageInfo.HDFSAvailable() || location.getStorageInfo.S3Available() || location.getStorageInfo.OSSAvailable()) { + val createDiskFileResult = createDfsDiskFile( location, partitionDataWriterContext.getAppId, partitionDataWriterContext.getShuffleId, @@ -1069,10 +1086,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs memoryFileInfo } - /** - * @return (Flusher,DiskFileInfo,workingDir) - */ - def createDiskFile( + def createLocalDiskFile( location: PartitionLocation, appId: String, shuffleId: Int, @@ -1102,61 +1116,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } - if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) { - val shuffleDir = - new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") - FileSystem.mkdirs( - StorageManager.hadoopFs.get(StorageInfo.Type.HDFS), - shuffleDir, - hdfsPermission) - val hdfsFilePath = new Path(shuffleDir, fileName).toString - val hdfsFileInfo = new DiskFileInfo( - userIdentifier, - partitionSplitEnabled, - getFileMeta(partitionType, s"hdfs", conf.shuffleChunkSize), - hdfsFilePath, - StorageInfo.Type.HDFS) - diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put( - fileName, - hdfsFileInfo) - return (hdfsFlusher.get, hdfsFileInfo, null) - } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) { - val shuffleDir = - new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId") - FileSystem.mkdirs( - StorageManager.hadoopFs.get(StorageInfo.Type.S3), - shuffleDir, - hdfsPermission) - val s3FilePath = new Path(shuffleDir, fileName).toString - val s3FileInfo = new DiskFileInfo( - userIdentifier, - partitionSplitEnabled, - new ReduceFileMeta(conf.shuffleChunkSize), - s3FilePath, - StorageInfo.Type.S3) - diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put( - fileName, - s3FileInfo) - return (s3Flusher.get, s3FileInfo, null) - } else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) { - val shuffleDir = - new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId") - FileSystem.mkdirs( - StorageManager.hadoopFs.get(StorageInfo.Type.OSS), - shuffleDir, - hdfsPermission) - val ossFilePath = new Path(shuffleDir, fileName).toString - val ossFileInfo = new DiskFileInfo( - userIdentifier, - partitionSplitEnabled, - new ReduceFileMeta(conf.shuffleChunkSize), - ossFilePath, - StorageInfo.Type.OSS) - diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put( - fileName, - ossFileInfo) - return (ossFlusher.get, ossFileInfo, null) - } else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) { + if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) { val dir = dirs(getNextIndex % dirs.size) val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints) val shuffleDir = new File(dir, s"$appId/$shuffleId") @@ -1210,9 +1170,80 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } retryCount += 1 } + if (dfsStorageAvailable) { + logWarning("Failed to create localFileWriter", exception) + return (null, null, null) + } throw exception } + def createDfsDiskFile( + location: PartitionLocation, + appId: String, + shuffleId: Int, + fileName: String, + userIdentifier: UserIdentifier, + partitionType: PartitionType, + partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = { + val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) + if (location.getStorageInfo.HDFSAvailable()) { + val shuffleDir = + new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") + FileSystem.mkdirs( + StorageManager.hadoopFs.get(StorageInfo.Type.HDFS), + shuffleDir, + hdfsPermission) + val hdfsFilePath = new Path(shuffleDir, fileName).toString + val hdfsFileInfo = new DiskFileInfo( + userIdentifier, + partitionSplitEnabled, + getFileMeta(partitionType, s"hdfs", conf.shuffleChunkSize), + hdfsFilePath, + StorageInfo.Type.HDFS) + diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put( + fileName, + hdfsFileInfo) + return (hdfsFlusher.get, hdfsFileInfo, null) + } else if (location.getStorageInfo.S3Available()) { + val shuffleDir = + new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId") + FileSystem.mkdirs( + StorageManager.hadoopFs.get(StorageInfo.Type.S3), + shuffleDir, + hdfsPermission) + val s3FilePath = new Path(shuffleDir, fileName).toString + val s3FileInfo = new DiskFileInfo( + userIdentifier, + partitionSplitEnabled, + new ReduceFileMeta(conf.shuffleChunkSize), + s3FilePath, + StorageInfo.Type.S3) + diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put( + fileName, + s3FileInfo) + return (s3Flusher.get, s3FileInfo, null) + } else if (location.getStorageInfo.OSSAvailable()) { + val shuffleDir = + new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId") + FileSystem.mkdirs( + StorageManager.hadoopFs.get(StorageInfo.Type.OSS), + shuffleDir, + hdfsPermission) + val ossFilePath = new Path(shuffleDir, fileName).toString + val ossFileInfo = new DiskFileInfo( + userIdentifier, + partitionSplitEnabled, + new ReduceFileMeta(conf.shuffleChunkSize), + ossFilePath, + StorageInfo.Type.OSS) + diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put( + fileName, + ossFileInfo) + return (ossFlusher.get, ossFileInfo, null) + } + (null, null, null) + } + def startDeviceMonitor(): Unit = { deviceMonitor.startCheck() } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 08a1d085122..82548cc6dc6 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -120,10 +120,10 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } else { null } - case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 => - if (storageManager.localOrDfsStorageAvailable) { - logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") - val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile( + case StorageInfo.Type.HDD | StorageInfo.Type.SSD => + if (storageManager.localStorageAvailable) { + logDebug(s"create local disk file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val (flusher, diskFileInfo, workingDir) = storageManager.createLocalDiskFile( location, partitionDataWriterContext.getAppId, partitionDataWriterContext.getShuffleId, @@ -131,10 +131,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: partitionDataWriterContext.getUserIdentifier, partitionDataWriterContext.getPartitionType, partitionDataWriterContext.isPartitionSplitEnabled) - partitionDataWriterContext.setWorkingDir(workingDir) - val metaHandler = getPartitionMetaHandler(diskFileInfo) - if (flusher.isInstanceOf[LocalFlusher] - && location.getStorageInfo.localDiskAvailable()) { + if (diskFileInfo != null) { + partitionDataWriterContext.setWorkingDir(workingDir) + val metaHandler = getPartitionMetaHandler(diskFileInfo) new LocalTierWriter( conf, metaHandler, @@ -147,6 +146,25 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: partitionDataWriterContext, storageManager) } else { + null + } + } else { + null + } + case StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 => + if (storageManager.dfsStorageAvailable) { + logDebug(s"create dfs disk file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + val (flusher, diskFileInfo, workingDir) = storageManager.createDfsDiskFile( + location, + partitionDataWriterContext.getAppId, + partitionDataWriterContext.getShuffleId, + location.getFileName, + partitionDataWriterContext.getUserIdentifier, + partitionDataWriterContext.getPartitionType, + partitionDataWriterContext.isPartitionSplitEnabled) + if (diskFileInfo != null) { + partitionDataWriterContext.setWorkingDir(workingDir) + val metaHandler = getPartitionMetaHandler(diskFileInfo) new DfsTierWriter( conf, metaHandler, @@ -158,6 +176,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: diskFileInfo.getStorageType, partitionDataWriterContext, storageManager) + } else { + null } } else { null diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala index 640a67113a5..14fe8bf8e07 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala @@ -59,7 +59,7 @@ class StoragePolicyCase1 extends CelebornFunSuite { val mockedFlusher = mock[Flusher] val mockedFile = mock[File] when( - mockedStorageManager.createDiskFile( + mockedStorageManager.createLocalDiskFile( any(), any(), any(), @@ -101,7 +101,6 @@ class StoragePolicyCase1 extends CelebornFunSuite { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation) when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) val conf = new CelebornConf() - val flushLock = new AnyRef conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3") val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) val pendingWriters = new AtomicInteger() diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala index 7a7ab956daa..45b00c0c949 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala @@ -59,7 +59,7 @@ class StoragePolicyCase2 extends CelebornFunSuite { val mockedFlusher = mock[LocalFlusher] val mockedFile = mock[File] when( - mockedStorageManager.createDiskFile( + mockedStorageManager.createLocalDiskFile( any(), any(), any(), @@ -100,7 +100,7 @@ class StoragePolicyCase2 extends CelebornFunSuite { test("test create file order case2") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin) when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) - when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) + when(mockedStorageManager.localStorageAvailable).thenAnswer(true) when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD) val conf = new CelebornConf() conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS,S3") diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala index 9097037001d..7a6d4fd8284 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala @@ -59,7 +59,7 @@ class StoragePolicyCase3 extends CelebornFunSuite { val mockedFlusher = mock[LocalFlusher] val mockedFile = mock[File] when( - mockedStorageManager.createDiskFile( + mockedStorageManager.createLocalDiskFile( any(), any(), any(), @@ -100,11 +100,10 @@ class StoragePolicyCase3 extends CelebornFunSuite { test("test getEvicted file case1") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin) when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) - when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) + when(mockedStorageManager.localStorageAvailable).thenAnswer(true) when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD) val mockedMemoryFile = mock[LocalTierWriter] val conf = new CelebornConf() - val flushLock = new AnyRef val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) val pendingWriters = new AtomicInteger() val notifier = new FlushNotifier @@ -120,11 +119,10 @@ class StoragePolicyCase3 extends CelebornFunSuite { test("test evict file case2") { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation) when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) - when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) + when(mockedStorageManager.localStorageAvailable).thenAnswer(true) when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD) val mockedMemoryFile = mock[LocalTierWriter] val conf = new CelebornConf() - val flushLock = new AnyRef val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) val pendingWriters = new AtomicInteger() val notifier = new FlushNotifier diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala index e25837d2b92..bdb718ec71a 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala @@ -59,7 +59,7 @@ class StoragePolicyCase4 extends CelebornFunSuite { val mockedFlusher = mock[LocalFlusher] val mockedFile = mock[File] when( - mockedStorageManager.createDiskFile( + mockedStorageManager.createLocalDiskFile( any(), any(), any(), @@ -101,7 +101,7 @@ class StoragePolicyCase4 extends CelebornFunSuite { when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer( memoryDisabledHintPartitionLocation) when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE) - when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) + when(mockedStorageManager.localStorageAvailable).thenAnswer(true) when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD) val conf = new CelebornConf() conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3")