This repository was archived by the owner on Jun 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 116
Issue400 just scalafmt formated #412
Closed
dmytroDragan
wants to merge
3
commits into
microsoft:master
from
dmytroDragan:issue400-just-scalafmt-formated
Closed
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,9 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri | |
| rec( | ||
| new Path(root.name), | ||
| root, | ||
| (f, prefix) => | ||
| ( | ||
| f, | ||
| prefix) => | ||
| FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet | ||
| } | ||
|
|
||
|
|
@@ -91,8 +93,9 @@ object Content { | |
| hadoopConfiguration: Configuration, | ||
| pathFilter: PathFilter = PathUtils.DataPathFilter, | ||
| throwIfNotExists: Boolean = false): Content = | ||
| Content(Directory.fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, | ||
| throwIfNotExists)) | ||
| Content( | ||
| Directory | ||
| .fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, throwIfNotExists)) | ||
|
|
||
| /** | ||
| * Create a Content object from a specified list of leaf files. Any files not listed here will | ||
|
|
@@ -102,9 +105,7 @@ object Content { | |
| * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. | ||
| * @return Content object with Directory tree from leaf files. | ||
| */ | ||
| def fromLeafFiles( | ||
| files: Seq[FileStatus], | ||
| fileIdTracker: FileIdTracker): Option[Content] = { | ||
| def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Option[Content] = { | ||
| if (files.nonEmpty) { | ||
| Some(Content(Directory.fromLeafFiles(files, fileIdTracker))) | ||
| } else { | ||
|
|
@@ -210,15 +211,6 @@ object Directory { | |
| } | ||
| } | ||
|
|
||
| @tailrec | ||
| private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { | ||
| if (path.isRoot) { | ||
| Directory(path.toString, subDirs = subDirs) | ||
| } else { | ||
| createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Create a Content object from a specified list of leaf files. Any files not listed here will | ||
| * NOT be part of the returned object. | ||
|
|
@@ -230,12 +222,10 @@ object Directory { | |
| * @param files List of leaf files. | ||
| * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. | ||
| * Note: If a new leaf file is discovered, the input fileIdTracker gets | ||
| * updated by adding it to the files it is tracking. | ||
| * updated by adding it to the files it is tracking. | ||
| * @return Content object with Directory tree from leaf files. | ||
| */ | ||
| def fromLeafFiles( | ||
| files: Seq[FileStatus], | ||
| fileIdTracker: FileIdTracker): Directory = { | ||
| def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Directory = { | ||
| require( | ||
| files.nonEmpty, | ||
| s"Empty files list found while creating a ${Directory.getClass.getName}.") | ||
|
|
@@ -291,6 +281,15 @@ object Directory { | |
| pathToDirectory(getRoot(files.head.getPath)) | ||
| } | ||
|
|
||
| @tailrec | ||
| private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { | ||
| if (path.isRoot) { | ||
| Directory(path.toString, subDirs = subDirs) | ||
| } else { | ||
| createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) | ||
| } | ||
| } | ||
|
|
||
| // Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/". | ||
| // For linux systems, this root will be "file:/". Other hdfs compatible file systems will have | ||
| // corresponding roots. | ||
|
|
@@ -323,8 +322,8 @@ case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) { | |
| override def equals(o: Any): Boolean = o match { | ||
| case that: FileInfo => | ||
| name.equals(that.name) && | ||
| size.equals(that.size) && | ||
| modifiedTime.equals(that.modifiedTime) | ||
| size.equals(that.size) && | ||
| modifiedTime.equals(that.modifiedTime) | ||
| case _ => false | ||
| } | ||
|
|
||
|
|
@@ -350,10 +349,11 @@ case class CoveringIndex(properties: CoveringIndex.Properties) { | |
| val kindAbbr = "CI" | ||
| } | ||
| object CoveringIndex { | ||
| case class Properties(columns: Properties.Columns, | ||
| schemaString: String, | ||
| numBuckets: Int, | ||
| properties: Map[String, String]) | ||
| case class Properties( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. No changes required. Here is how this part looks after I applied No changes. |
||
| columns: Properties.Columns, | ||
| schemaString: String, | ||
| numBuckets: Int, | ||
| properties: Map[String, String]) | ||
|
|
||
| object Properties { | ||
| case class Columns(indexed: Seq[String], included: Seq[String]) | ||
|
|
@@ -377,9 +377,7 @@ object LogicalPlanFingerprint { | |
| * @param appendedFiles Appended files. | ||
| * @param deletedFiles Deleted files. | ||
| */ | ||
| case class Update( | ||
| appendedFiles: Option[Content] = None, | ||
| deletedFiles: Option[Content] = None) | ||
| case class Update(appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. I don't have any changes. Please check you formatting settings. |
||
|
|
||
| // IndexLogEntry-specific Hdfs that represents the source data. | ||
| case class Hdfs(properties: Hdfs.Properties) { | ||
|
|
@@ -444,47 +442,50 @@ case class IndexLogEntry( | |
| properties: Map[String, String]) | ||
| extends LogEntry(IndexLogEntry.VERSION) { | ||
|
|
||
| def schema: StructType = | ||
| DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] | ||
|
|
||
| def created: Boolean = state.equals(Constants.States.ACTIVE) | ||
|
|
||
| def relations: Seq[Relation] = { | ||
| // Only one relation is currently supported. | ||
| assert(source.plan.properties.relations.size == 1) | ||
| source.plan.properties.relations | ||
| } | ||
|
|
||
| // FileInfo's 'name' contains the full path to the file. | ||
| @JsonIgnore | ||
| lazy val sourceFileInfoSet: Set[FileInfo] = { | ||
| relations.head.data.properties.content.fileInfos | ||
| } | ||
|
|
||
| @JsonIgnore | ||
| lazy val sourceFilesSizeInBytes: Long = { | ||
| sourceFileInfoSet.foldLeft(0L)(_ + _.size) | ||
| } | ||
|
|
||
| def sourceUpdate: Option[Update] = { | ||
| relations.head.data.properties.update | ||
| } | ||
|
|
||
| def hasSourceUpdate: Boolean = { | ||
| sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) | ||
| } | ||
|
|
||
| // FileInfo's 'name' contains the full path to the file. | ||
| @JsonIgnore | ||
| lazy val appendedFiles: Set[FileInfo] = { | ||
| sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) | ||
| } | ||
|
|
||
| // FileInfo's 'name' contains the full path to the file. | ||
| @JsonIgnore | ||
| lazy val deletedFiles: Set[FileInfo] = { | ||
| sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) | ||
| } | ||
| @JsonIgnore | ||
| lazy val fileIdTracker: FileIdTracker = { | ||
| val tracker = new FileIdTracker | ||
| tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) | ||
| tracker | ||
| } | ||
|
|
||
| /** | ||
| * A mutable map for holding auxiliary information of this index log entry while applying rules. | ||
| */ | ||
| @JsonIgnore | ||
| private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty | ||
|
|
||
| def schema: StructType = | ||
| DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] | ||
|
|
||
| def created: Boolean = state.equals(Constants.States.ACTIVE) | ||
|
|
||
| def hasSourceUpdate: Boolean = { | ||
| sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) | ||
| } | ||
|
|
||
| def sourceUpdate: Option[Update] = { | ||
| relations.head.data.properties.update | ||
| } | ||
|
|
||
| def copyWithUpdate( | ||
| latestFingerprint: LogicalPlanFingerprint, | ||
|
|
@@ -493,21 +494,20 @@ case class IndexLogEntry( | |
| def toFileStatus(f: FileInfo) = { | ||
| new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name)) | ||
| } | ||
| copy( | ||
| source = source.copy( | ||
| plan = source.plan.copy( | ||
| properties = source.plan.properties.copy( | ||
| fingerprint = latestFingerprint, | ||
| relations = Seq( | ||
| relations.head.copy( | ||
| data = relations.head.data.copy( | ||
| properties = relations.head.data.properties.copy( | ||
| update = Some( | ||
| Update( | ||
| appendedFiles = | ||
| Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), | ||
| deletedFiles = | ||
| Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) | ||
| copy(source = source.copy(plan = source.plan.copy(properties = source.plan.properties.copy( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neither here - I don't have any changes after applying the reformatting. |
||
| fingerprint = latestFingerprint, | ||
| relations = Seq( | ||
| relations.head.copy(data = relations.head.data.copy(properties = | ||
| relations.head.data.properties.copy(update = Some(Update( | ||
| appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), | ||
| deletedFiles = | ||
| Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) | ||
| } | ||
|
|
||
| def relations: Seq[Relation] = { | ||
| // Only one relation is currently supported. | ||
| assert(source.plan.properties.relations.size == 1) | ||
| source.plan.properties.relations | ||
| } | ||
|
|
||
| def bucketSpec: BucketSpec = | ||
|
|
@@ -543,43 +543,40 @@ case class IndexLogEntry( | |
| } | ||
|
|
||
| def hasLineageColumn: Boolean = { | ||
| derivedDataset.properties.properties.getOrElse( | ||
| IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean | ||
| derivedDataset.properties.properties | ||
| .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) | ||
| .toBoolean | ||
| } | ||
|
|
||
| def hasParquetAsSourceFormat: Boolean = { | ||
| relations.head.fileFormat.equals("parquet") || | ||
| derivedDataset.properties.properties.getOrElse( | ||
| IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean | ||
| } | ||
|
|
||
| @JsonIgnore | ||
| lazy val fileIdTracker: FileIdTracker = { | ||
| val tracker = new FileIdTracker | ||
| tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) | ||
| tracker | ||
| derivedDataset.properties.properties | ||
| .getOrElse(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false") | ||
| .toBoolean | ||
| } | ||
|
|
||
| override def hashCode(): Int = { | ||
| config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode | ||
| } | ||
|
|
||
| /** | ||
| * A mutable map for holding auxiliary information of this index log entry while applying rules. | ||
| */ | ||
| @JsonIgnore | ||
| private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty | ||
| def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { | ||
| tags.remove((plan, tag)) | ||
| } | ||
|
|
||
| def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { | ||
| tags((plan, tag)) = value | ||
| def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { | ||
| tags((null, tag)) = value | ||
| } | ||
|
|
||
| def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { | ||
| tags.get((plan, tag)).map(_.asInstanceOf[T]) | ||
| def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { | ||
| tags.get((null, tag)).map(_.asInstanceOf[T]) | ||
| } | ||
|
|
||
| def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { | ||
| tags.remove((plan, tag)) | ||
| def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { | ||
| tags.remove((null, tag)) | ||
| } | ||
|
|
||
| def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { | ||
| withCachedTag(null, tag)(f) | ||
| } | ||
|
|
||
| def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { | ||
|
|
@@ -592,20 +589,12 @@ case class IndexLogEntry( | |
| } | ||
| } | ||
|
|
||
| def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { | ||
| tags((null, tag)) = value | ||
| } | ||
|
|
||
| def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { | ||
| tags.get((null, tag)).map(_.asInstanceOf[T]) | ||
| } | ||
|
|
||
| def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { | ||
| tags.remove((null, tag)) | ||
| def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { | ||
| tags((plan, tag)) = value | ||
| } | ||
|
|
||
| def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { | ||
| withCachedTag(null, tag)(f) | ||
| def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { | ||
| tags.get((plan, tag)).map(_.asInstanceOf[T]) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -646,16 +635,15 @@ object IndexLogEntry { | |
| * Provides functionality to generate unique file ids for files. | ||
| */ | ||
| class FileIdTracker { | ||
| private var maxId: Long = -1L | ||
|
|
||
| // Combination of file properties, used as key, to identify a | ||
| // unique file for which an id is generated. | ||
| type key = ( | ||
| String, // Full path. | ||
| String, // Full path. | ||
| Long, // Size. | ||
| Long // Modified time. | ||
| ) | ||
| Long // Modified time. | ||
| ) | ||
| private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() | ||
| private var maxId: Long = -1L | ||
|
|
||
| def getMaxFileId: Long = maxId | ||
|
|
||
|
|
@@ -664,8 +652,6 @@ class FileIdTracker { | |
| def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] = | ||
| fileToIdMap.get((path, size, modifiedTime)) | ||
|
|
||
| def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) | ||
|
|
||
| /** | ||
| * Add a set of FileInfos to the fileToIdMap. The assumption is | ||
| * that the each FileInfo already has a valid file id if an entry | ||
|
|
@@ -679,8 +665,7 @@ class FileIdTracker { | |
| setSizeHint(files.size) | ||
| files.foreach { f => | ||
| if (f.id == IndexConstants.UNKNOWN_FILE_ID) { | ||
| throw HyperspaceException( | ||
| s"Cannot add file info with unknown id. (file: ${f.name}).") | ||
| throw HyperspaceException(s"Cannot add file info with unknown id. (file: ${f.name}).") | ||
| } | ||
|
|
||
| val key = (f.name, f.size, f.modifiedTime) | ||
|
|
@@ -698,6 +683,8 @@ class FileIdTracker { | |
| } | ||
| } | ||
|
|
||
| def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) | ||
|
|
||
| /** | ||
| * Try to add file properties to fileToIdMap. If the file is already in | ||
| * the map then return its current id. Otherwise, generate a new id, | ||
|
|
@@ -708,8 +695,7 @@ class FileIdTracker { | |
| */ | ||
| def addFile(file: FileStatus): Long = { | ||
| fileToIdMap.getOrElseUpdate( | ||
| (file.getPath.toString, file.getLen, file.getModificationTime), | ||
| { | ||
| (file.getPath.toString, file.getLen, file.getModificationTime), { | ||
| maxId += 1 | ||
| maxId | ||
| }) | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not scalafmt. I applied styling on the code and here it looks like this:
No extra spaces.
Please check your
scalafmt.