From e8e6060e5a69ea968926ca36906ab639e27f4e8d Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 9 Apr 2021 15:12:54 +0200 Subject: [PATCH 1/2] applied scalafmt --- .../hyperspace/index/IndexLogEntry.scala | 238 +++++++-------- .../hyperspace/index/IndexLogEntryTest.scala | 272 ++++++++---------- 2 files changed, 237 insertions(+), 273 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index bc6cbc39f..dcbb8cc86 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -16,22 +16,20 @@ package com.microsoft.hyperspace.index -import java.io.FileNotFoundException - -import scala.annotation.tailrec -import scala.collection.mutable.{HashMap, ListBuffer} -import scala.collection.mutable - import com.fasterxml.jackson.annotation.JsonIgnore +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.util.PathUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils} +import java.io.FileNotFoundException +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.{HashMap, ListBuffer} // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. case class NoOpFingerprint() { @@ -53,7 +51,9 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri rec( new Path(root.name), root, - (f, prefix) => + ( + f, + prefix) => FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet } @@ -91,8 +91,9 @@ object Content { hadoopConfiguration: Configuration, pathFilter: PathFilter = PathUtils.DataPathFilter, throwIfNotExists: Boolean = false): Content = - Content(Directory.fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, - throwIfNotExists)) + Content( + Directory + .fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, throwIfNotExists)) /** * Create a Content object from a specified list of leaf files. Any files not listed here will @@ -102,9 +103,7 @@ object Content { * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * @return Content object with Directory tree from leaf files. */ - def fromLeafFiles( - files: Seq[FileStatus], - fileIdTracker: FileIdTracker): Option[Content] = { + def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Option[Content] = { if (files.nonEmpty) { Some(Content(Directory.fromLeafFiles(files, fileIdTracker))) } else { @@ -210,15 +209,6 @@ object Directory { } } - @tailrec - private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { - if (path.isRoot) { - Directory(path.toString, subDirs = subDirs) - } else { - createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) - } - } - /** * Create a Content object from a specified list of leaf files. Any files not listed here will * NOT be part of the returned object. @@ -230,12 +220,10 @@ object Directory { * @param files List of leaf files. * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * Note: If a new leaf file is discovered, the input fileIdTracker gets - * updated by adding it to the files it is tracking. + * updated by adding it to the files it is tracking. * @return Content object with Directory tree from leaf files. */ - def fromLeafFiles( - files: Seq[FileStatus], - fileIdTracker: FileIdTracker): Directory = { + def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Directory = { require( files.nonEmpty, s"Empty files list found while creating a ${Directory.getClass.getName}.") @@ -291,6 +279,15 @@ object Directory { pathToDirectory(getRoot(files.head.getPath)) } + @tailrec + private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { + if (path.isRoot) { + Directory(path.toString, subDirs = subDirs) + } else { + createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) + } + } + // Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/". // For linux systems, this root will be "file:/". Other hdfs compatible file systems will have // corresponding roots. @@ -323,8 +320,8 @@ case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) { override def equals(o: Any): Boolean = o match { case that: FileInfo => name.equals(that.name) && - size.equals(that.size) && - modifiedTime.equals(that.modifiedTime) + size.equals(that.size) && + modifiedTime.equals(that.modifiedTime) case _ => false } @@ -350,10 +347,11 @@ case class CoveringIndex(properties: CoveringIndex.Properties) { val kindAbbr = "CI" } object CoveringIndex { - case class Properties(columns: Properties.Columns, - schemaString: String, - numBuckets: Int, - properties: Map[String, String]) + case class Properties( + columns: Properties.Columns, + schemaString: String, + numBuckets: Int, + properties: Map[String, String]) object Properties { case class Columns(indexed: Seq[String], included: Seq[String]) @@ -377,9 +375,7 @@ object LogicalPlanFingerprint { * @param appendedFiles Appended files. * @param deletedFiles Deleted files. */ -case class Update( - appendedFiles: Option[Content] = None, - deletedFiles: Option[Content] = None) +case class Update(appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None) // IndexLogEntry-specific Hdfs that represents the source data. case class Hdfs(properties: Hdfs.Properties) { @@ -439,47 +435,50 @@ case class IndexLogEntry( properties: Map[String, String]) extends LogEntry(IndexLogEntry.VERSION) { - def schema: StructType = - DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] - - def created: Boolean = state.equals(Constants.States.ACTIVE) - - def relations: Seq[Relation] = { - // Only one relation is currently supported. - assert(source.plan.properties.relations.size == 1) - source.plan.properties.relations - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val sourceFileInfoSet: Set[FileInfo] = { relations.head.data.properties.content.fileInfos } - @JsonIgnore lazy val sourceFilesSizeInBytes: Long = { sourceFileInfoSet.foldLeft(0L)(_ + _.size) } - - def sourceUpdate: Option[Update] = { - relations.head.data.properties.update - } - - def hasSourceUpdate: Boolean = { - sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val appendedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val deletedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) } + @JsonIgnore + lazy val fileIdTracker: FileIdTracker = { + val tracker = new FileIdTracker + tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) + tracker + } + + /** + * A mutable map for holding auxiliary information of this index log entry while applying rules. + */ + @JsonIgnore + private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + + def schema: StructType = + DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] + + def created: Boolean = state.equals(Constants.States.ACTIVE) + + def hasSourceUpdate: Boolean = { + sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) + } + + def sourceUpdate: Option[Update] = { + relations.head.data.properties.update + } def copyWithUpdate( latestFingerprint: LogicalPlanFingerprint, @@ -488,21 +487,20 @@ case class IndexLogEntry( def toFileStatus(f: FileInfo) = { new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name)) } - copy( - source = source.copy( - plan = source.plan.copy( - properties = source.plan.properties.copy( - fingerprint = latestFingerprint, - relations = Seq( - relations.head.copy( - data = relations.head.data.copy( - properties = relations.head.data.properties.copy( - update = Some( - Update( - appendedFiles = - Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), - deletedFiles = - Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) + copy(source = source.copy(plan = source.plan.copy(properties = source.plan.properties.copy( + fingerprint = latestFingerprint, + relations = Seq( + relations.head.copy(data = relations.head.data.copy(properties = + relations.head.data.properties.copy(update = Some(Update( + appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), + deletedFiles = + Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) + } + + def relations: Seq[Relation] = { + // Only one relation is currently supported. + assert(source.plan.properties.relations.size == 1) + source.plan.properties.relations } def bucketSpec: BucketSpec = @@ -522,6 +520,23 @@ case class IndexLogEntry( case _ => false } + def hasLineageColumn: Boolean = { + derivedDataset.properties.properties + .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) + .toBoolean + } + + def hasParquetAsSourceFormat: Boolean = { + relations.head.fileFormat.equals("parquet") || + derivedDataset.properties.properties + .getOrElse(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false") + .toBoolean + } + + override def hashCode(): Int = { + config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode + } + def numBuckets: Int = derivedDataset.properties.numBuckets def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) @@ -536,44 +551,24 @@ case class IndexLogEntry( sourcePlanSignatures.head } - def hasLineageColumn: Boolean = { - derivedDataset.properties.properties.getOrElse( - IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean - } - - def hasParquetAsSourceFormat: Boolean = { - relations.head.fileFormat.equals("parquet") || - derivedDataset.properties.properties.getOrElse( - IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean - } - - @JsonIgnore - lazy val fileIdTracker: FileIdTracker = { - val tracker = new FileIdTracker - tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) - tracker + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { + tags.remove((plan, tag)) } - override def hashCode(): Int = { - config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { + tags((null, tag)) = value } - /** - * A mutable map for holding auxiliary information of this index log entry while applying rules. - */ - @JsonIgnore - private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty - - def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { - tags((plan, tag)) = value + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((null, tag)).map(_.asInstanceOf[T]) } - def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((plan, tag)).map(_.asInstanceOf[T]) + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { + tags.remove((null, tag)) } - def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { - tags.remove((plan, tag)) + def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { + withCachedTag(null, tag)(f) } def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { @@ -586,20 +581,12 @@ case class IndexLogEntry( } } - def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { - tags((null, tag)) = value - } - - def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((null, tag)).map(_.asInstanceOf[T]) - } - - def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { - tags.remove((null, tag)) + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { + tags((plan, tag)) = value } - def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { - withCachedTag(null, tag)(f) + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((plan, tag)).map(_.asInstanceOf[T]) } } @@ -616,16 +603,15 @@ object IndexLogEntry { * Provides functionality to generate unique file ids for files. */ class FileIdTracker { - private var maxId: Long = -1L - // Combination of file properties, used as key, to identify a // unique file for which an id is generated. type key = ( - String, // Full path. + String, // Full path. Long, // Size. - Long // Modified time. - ) + Long // Modified time. + ) private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() + private var maxId: Long = -1L def getMaxFileId: Long = maxId @@ -634,8 +620,6 @@ class FileIdTracker { def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] = fileToIdMap.get((path, size, modifiedTime)) - def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) - /** * Add a set of FileInfos to the fileToIdMap. The assumption is * that the each FileInfo already has a valid file id if an entry @@ -649,8 +633,7 @@ class FileIdTracker { setSizeHint(files.size) files.foreach { f => if (f.id == IndexConstants.UNKNOWN_FILE_ID) { - throw HyperspaceException( - s"Cannot add file info with unknown id. (file: ${f.name}).") + throw HyperspaceException(s"Cannot add file info with unknown id. (file: ${f.name}).") } val key = (f.name, f.size, f.modifiedTime) @@ -668,6 +651,8 @@ class FileIdTracker { } } + def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) + /** * Try to add file properties to fileToIdMap. If the file is already in * the map then return its current id. Otherwise, generate a new id, @@ -678,8 +663,7 @@ class FileIdTracker { */ def addFile(file: FileStatus): Long = { fileToIdMap.getOrElseUpdate( - (file.getPath.toString, file.getLen, file.getModificationTime), - { + (file.getPath.toString, file.getLen, file.getModificationTime), { maxId += 1 maxId }) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index ebe4a5eec..8f1d4302b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -68,10 +68,10 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter fileIdTracker = new FileIdTracker } - private def toPath(path: file.Path): Path = PathUtils.makeAbsolute(path.toString) - private def toFileStatus(path: file.Path): FileStatus = fs.getFileStatus(toPath(path)) + private def toPath(path: file.Path): Path = PathUtils.makeAbsolute(path.toString) + test("IndexLogEntry spec example") { val schemaString = """{\"type\":\"struct\", @@ -191,34 +191,20 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Seq( Relation( Seq("rootpath"), - Hdfs( - Hdfs.Properties( - Content( - Directory( - "test", - Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), - Seq() - ) - ), - Some( - Update( - None, - Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))) - ) - ) - ) - ), + Hdfs(Hdfs.Properties( + Content(Directory( + "test", + Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), + Seq())), + Some(Update(None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))))))), "schema", "type", - Map() - ) - ), + Map())), null, null, LogicalPlanFingerprint( LogicalPlanFingerprint - .Properties(Seq(Signature("provider", "signatureValue"))) - )) + .Properties(Seq(Signature("provider", "signatureValue"))))) val expected = IndexLogEntry( "indexName", @@ -241,15 +227,19 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter } test("Content.files api lists all files from Content object.") { - val content = Content(Directory("file:/", subDirs = - Seq( - Directory("a", - files = Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + val content = Content( + Directory( + "file:/", + subDirs = Seq(Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), subDirs = Seq( - Directory("b", - files = - Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID))))) - ))) + Directory( + "b", + files = Seq( + FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), + FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)))))))) val expected = Seq("file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4").map(new Path(_)).toSet @@ -261,8 +251,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) Content(rootDirectory, NoOpFingerprint()) @@ -276,8 +267,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) Content(rootDirectory, NoOpFingerprint()) @@ -291,8 +283,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) createDirectory(nestedDirPath, nestedDirDirectory) } @@ -301,17 +294,21 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromDirectory api creates the correct Directory objects, " + - "recursively listing all leaf files.") { + test( + "Directory.fromDirectory api creates the correct Directory objects, " + + "recursively listing all leaf files.") { val testDirPath = toPath(testDir) val testDirLeafFiles = - Seq(f1, f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + Seq(f1, f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) val expected = createDirectory(testDirPath, testDirDirectory) @@ -325,12 +322,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val testDirPath = toPath(testDir) val testDirLeafFiles = - Seq(f1, f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + Seq(f1, f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) @@ -344,12 +344,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter test("Directory.fromLeafFiles api does not include other files in the directory.") { val testDirPath = toPath(testDir) - val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirLeafFiles = Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) @@ -360,8 +363,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromLeafFiles: throwIfNotExist flag throws exception for non-existent" + - "directory, otherwise works as expected.") { + test( + "Directory.fromLeafFiles: throwIfNotExist flag throws exception for non-existent" + + "directory, otherwise works as expected.") { val testDirPath = toPath(testDir) val nonExistentDir = new Path(testDirPath, "nonexistent") @@ -402,8 +406,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter override def accept(path: Path): Boolean = path.getName.startsWith("f1") } - val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirLeafFiles = Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val testDirDirectory = Directory(name = "testDir", files = testDirLeafFiles) val expected = createDirectory(testDirPath, testDirDirectory) @@ -413,8 +418,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + - "dirs.") { + test( + "Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + + "dirs.") { // File Structure // testDir/temp/a/f1 // testDir/temp/b/f2 @@ -426,11 +432,17 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f2 = Files.createFile(Paths.get(b + "/f2")) val aDirectory = - Directory("a", Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "a", + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val bDirectory = - Directory("b", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "b", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -458,12 +470,18 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f2 = Files.createFile(Paths.get(c + "/f2")) val cDirectory = - Directory("c", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "c", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val bDirectory = Directory("b", subDirs = Seq(cDirectory)) val aDirectory = - Directory("a", Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "a", + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -478,8 +496,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter FileUtils.deleteDirectory(tempDir.toFile) } - test("Directory.fromDirectory and fromLeafFiles where files belong to multiple" + - "subdirectories.") { + test( + "Directory.fromDirectory and fromLeafFiles where files belong to multiple" + + "subdirectories.") { // File Structure // testDir/temp/a/f1 // testDir/temp/a/b/f2 @@ -494,17 +513,23 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f3 = Files.createFile(Paths.get(c + "/f3")) val bDirectory = - Directory("b", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "b", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val cDirectory = - Directory("c", Seq(f3).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "c", + Seq(f3) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val aDirectory = Directory( "a", - Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), - Seq(bDirectory, cDirectory) - ) + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), + Seq(bDirectory, cDirectory)) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -524,11 +549,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/f2 val directory1 = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L))) // directory2: // a/b/f3 @@ -538,13 +559,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f3", 100L, 100L, 3L), - FileInfo("f4", 100L, 100L, 4L) - ) - ) - ) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))))) // Expected result of merging directory1 and directory2: // a/f1 @@ -553,20 +568,11 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/b/f4 val expected = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ), + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)), subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f3", 100L, 100L, 3L), - FileInfo("f4", 100L, 100L, 4L) - ) - ) - ) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))))) val actual1 = directory1.merge(directory2) val actual2 = directory2.merge(directory1) @@ -582,14 +588,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/b/f3 val directory1 = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ), - subDirs = Seq( - Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L))) - ) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)), + subDirs = Seq(Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L))))) // directory2: // a/f4 @@ -602,17 +602,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f5", 100L, 100L, 5L), - FileInfo("f6", 100L, 100L, 6L) - ), - subDirs = Seq(Directory( - name = "c", - files = Seq(FileInfo("f7", 100L, 100L, 7L)) - )) - ) - ) - ) + files = Seq(FileInfo("f5", 100L, 100L, 5L), FileInfo("f6", 100L, 100L, 6L)), + subDirs = Seq(Directory(name = "c", files = Seq(FileInfo("f7", 100L, 100L, 7L))))))) // Expected result of merging directory1 and directory2: // directory1: @@ -628,23 +619,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter files = Seq( FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L), - FileInfo("f4", 100L, 100L, 4L) - ), + FileInfo("f4", 100L, 100L, 4L)), subDirs = Seq( Directory( name = "b", files = Seq( FileInfo("f3", 100L, 100L, 3L), FileInfo("f5", 100L, 100L, 5L), - FileInfo("f6", 100L, 100L, 6L) - ), - subDirs = Seq( - Directory("c", - files = Seq(FileInfo("f7", 100L, 100L, 7L))) - ) - ) - ) - ) + FileInfo("f6", 100L, 100L, 6L)), + subDirs = Seq(Directory("c", files = Seq(FileInfo("f7", 100L, 100L, 7L))))))) val actual1 = directory1.merge(directory2) val actual2 = directory2.merge(directory1) @@ -659,19 +642,17 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/f2 val directory1 = Directory( name = "a", - files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L))) // directory2: // b/f3 // b/f4 val directory2 = Directory( name = "b", - files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))) - val ex1 = intercept[HyperspaceException] (directory1.merge(directory2)) - val ex2 = intercept[HyperspaceException] (directory2.merge(directory1)) + val ex1 = intercept[HyperspaceException](directory1.merge(directory2)) + val ex2 = intercept[HyperspaceException](directory2.merge(directory1)) assert(ex1.msg.contains("Merging directories with names a and b failed.")) assert(ex2.msg.contains("Merging directories with names b and a failed.")) @@ -683,19 +664,18 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter private def directoryEquals(dir1: Directory, dir2: Directory): Boolean = { dir1.name.equals(dir2.name) && - dir1.files.toSet.equals(dir2.files.toSet) && - dir1.subDirs.size.equals(dir2.subDirs.size) && - dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall{ - case (d1, d2) => directoryEquals(d1, d2) - } + dir1.files.toSet.equals(dir2.files.toSet) && + dir1.subDirs.size.equals(dir2.subDirs.size) && + dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall { case (d1, d2) => + directoryEquals(d1, d2) + } } // Using `directoryPath`, create a Directory tree starting from root and ending at // `leafDirectory`. private def createDirectory(directoryPath: Path, leafDirectory: Directory): Directory = { - TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { - (accum, name) => - Directory(name, Seq(), Seq(accum)) + TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { (accum, name) => + Directory(name, Seq(), Seq(accum)) } } } From 360755368c27ae6ba96d9e516e4aa40d6235e2c5 Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 9 Apr 2021 15:39:43 +0200 Subject: [PATCH 2/2] fixed imports --- .../hyperspace/index/IndexLogEntry.scala | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index dcbb8cc86..d1f2c022c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -16,20 +16,22 @@ package com.microsoft.hyperspace.index +import java.io.FileNotFoundException + +import scala.annotation.tailrec +import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable + import com.fasterxml.jackson.annotation.JsonIgnore -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.util.PathUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} -import java.io.FileNotFoundException -import scala.annotation.tailrec -import scala.collection.mutable -import scala.collection.mutable.{HashMap, ListBuffer} +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils} // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. case class NoOpFingerprint() { @@ -520,6 +522,20 @@ case class IndexLogEntry( case _ => false } + def numBuckets: Int = derivedDataset.properties.numBuckets + + def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) + + def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed + + def includedColumns: Seq[String] = derivedDataset.properties.columns.included + + def signature: Signature = { + val sourcePlanSignatures = source.plan.properties.fingerprint.properties.signatures + assert(sourcePlanSignatures.length == 1) + sourcePlanSignatures.head + } + def hasLineageColumn: Boolean = { derivedDataset.properties.properties .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) @@ -537,20 +553,6 @@ case class IndexLogEntry( config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } - def numBuckets: Int = derivedDataset.properties.numBuckets - - def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) - - def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed - - def includedColumns: Seq[String] = derivedDataset.properties.columns.included - - def signature: Signature = { - val sourcePlanSignatures = source.plan.properties.fingerprint.properties.signatures - assert(sourcePlanSignatures.length == 1) - sourcePlanSignatures.head - } - def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) }