Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 96 additions & 110 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
rec(
new Path(root.name),
root,
(f, prefix) =>
(
f,
prefix) =>
FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet
}

Expand Down Expand Up @@ -91,8 +93,9 @@ object Content {
hadoopConfiguration: Configuration,
pathFilter: PathFilter = PathUtils.DataPathFilter,
throwIfNotExists: Boolean = false): Content =
Content(Directory.fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration,
throwIfNotExists))
Content(
Directory
.fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, throwIfNotExists))

/**
* Create a Content object from a specified list of leaf files. Any files not listed here will
Expand All @@ -102,9 +105,7 @@ object Content {
* @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids.
* @return Content object with Directory tree from leaf files.
*/
def fromLeafFiles(
files: Seq[FileStatus],
fileIdTracker: FileIdTracker): Option[Content] = {
def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Option[Content] = {
if (files.nonEmpty) {
Some(Content(Directory.fromLeafFiles(files, fileIdTracker)))
} else {
Expand Down Expand Up @@ -210,15 +211,6 @@ object Directory {
}
}

@tailrec
private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = {
if (path.isRoot) {
Directory(path.toString, subDirs = subDirs)
} else {
createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs)))
}
}

/**
* Create a Content object from a specified list of leaf files. Any files not listed here will
* NOT be part of the returned object.
Expand All @@ -230,12 +222,10 @@ object Directory {
* @param files List of leaf files.
* @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids.
* Note: If a new leaf file is discovered, the input fileIdTracker gets
* updated by adding it to the files it is tracking.
* updated by adding it to the files it is tracking.
* @return Content object with Directory tree from leaf files.
*/
def fromLeafFiles(
files: Seq[FileStatus],
fileIdTracker: FileIdTracker): Directory = {
def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Directory = {
require(
files.nonEmpty,
s"Empty files list found while creating a ${Directory.getClass.getName}.")
Expand Down Expand Up @@ -291,6 +281,15 @@ object Directory {
pathToDirectory(getRoot(files.head.getPath))
}

@tailrec
private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = {
if (path.isRoot) {
Directory(path.toString, subDirs = subDirs)
} else {
createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs)))
}
}

// Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/".
// For linux systems, this root will be "file:/". Other hdfs compatible file systems will have
// corresponding roots.
Expand Down Expand Up @@ -323,8 +322,8 @@ case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) {
override def equals(o: Any): Boolean = o match {
case that: FileInfo =>
name.equals(that.name) &&
size.equals(that.size) &&
modifiedTime.equals(that.modifiedTime)
size.equals(that.size) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not scalafmt. I applied styling on the code and here it looks like this:

  override def equals(o: Any): Boolean = o match {
    case that: FileInfo =>
      name.equals(that.name) &&
      size.equals(that.size) &&
      modifiedTime.equals(that.modifiedTime)
    case _ => false
  }

No extra spaces.

Please check your scalafmt.

modifiedTime.equals(that.modifiedTime)
case _ => false
}

Expand All @@ -350,10 +349,11 @@ case class CoveringIndex(properties: CoveringIndex.Properties) {
val kindAbbr = "CI"
}
object CoveringIndex {
case class Properties(columns: Properties.Columns,
schemaString: String,
numBuckets: Int,
properties: Map[String, String])
case class Properties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. No changes required. Here is how this part looks after I applied scalafmt reformat command:

object CoveringIndex {
  case class Properties(columns: Properties.Columns,
    schemaString: String,
    numBuckets: Int,
    properties: Map[String, String])

  object Properties {
    case class Columns(indexed: Seq[String], included: Seq[String])
  }
}

No changes.

columns: Properties.Columns,
schemaString: String,
numBuckets: Int,
properties: Map[String, String])

object Properties {
case class Columns(indexed: Seq[String], included: Seq[String])
Expand All @@ -377,9 +377,7 @@ object LogicalPlanFingerprint {
* @param appendedFiles Appended files.
* @param deletedFiles Deleted files.
*/
case class Update(
appendedFiles: Option[Content] = None,
deletedFiles: Option[Content] = None)
case class Update(appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I don't have any changes.

Please check you formatting settings.


// IndexLogEntry-specific Hdfs that represents the source data.
case class Hdfs(properties: Hdfs.Properties) {
Expand Down Expand Up @@ -444,47 +442,50 @@ case class IndexLogEntry(
properties: Map[String, String])
extends LogEntry(IndexLogEntry.VERSION) {

def schema: StructType =
DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType]

def created: Boolean = state.equals(Constants.States.ACTIVE)

def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

// FileInfo's 'name' contains the full path to the file.
@JsonIgnore
lazy val sourceFileInfoSet: Set[FileInfo] = {
relations.head.data.properties.content.fileInfos
}

@JsonIgnore
lazy val sourceFilesSizeInBytes: Long = {
sourceFileInfoSet.foldLeft(0L)(_ + _.size)
}

def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}

def hasSourceUpdate: Boolean = {
sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty)
}

// FileInfo's 'name' contains the full path to the file.
@JsonIgnore
lazy val appendedFiles: Set[FileInfo] = {
sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set())
}

// FileInfo's 'name' contains the full path to the file.
@JsonIgnore
lazy val deletedFiles: Set[FileInfo] = {
sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set())
}
@JsonIgnore
lazy val fileIdTracker: FileIdTracker = {
val tracker = new FileIdTracker
tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos)
tracker
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
@JsonIgnore
private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty

def schema: StructType =
DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType]

def created: Boolean = state.equals(Constants.States.ACTIVE)

def hasSourceUpdate: Boolean = {
sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty)
}

def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}

def copyWithUpdate(
latestFingerprint: LogicalPlanFingerprint,
Expand All @@ -493,21 +494,20 @@ case class IndexLogEntry(
def toFileStatus(f: FileInfo) = {
new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name))
}
copy(
source = source.copy(
plan = source.plan.copy(
properties = source.plan.properties.copy(
fingerprint = latestFingerprint,
relations = Seq(
relations.head.copy(
data = relations.head.data.copy(
properties = relations.head.data.properties.copy(
update = Some(
Update(
appendedFiles =
Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker),
deletedFiles =
Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker)))))))))))
copy(source = source.copy(plan = source.plan.copy(properties = source.plan.properties.copy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither here - I don't have any changes after applying the reformatting.

fingerprint = latestFingerprint,
relations = Seq(
relations.head.copy(data = relations.head.data.copy(properties =
relations.head.data.properties.copy(update = Some(Update(
appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker),
deletedFiles =
Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker)))))))))))
}

def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

def bucketSpec: BucketSpec =
Expand Down Expand Up @@ -543,43 +543,40 @@ case class IndexLogEntry(
}

def hasLineageColumn: Boolean = {
derivedDataset.properties.properties.getOrElse(
IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean
derivedDataset.properties.properties
.getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT)
.toBoolean
}

def hasParquetAsSourceFormat: Boolean = {
relations.head.fileFormat.equals("parquet") ||
derivedDataset.properties.properties.getOrElse(
IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean
}

@JsonIgnore
lazy val fileIdTracker: FileIdTracker = {
val tracker = new FileIdTracker
tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos)
tracker
derivedDataset.properties.properties
.getOrElse(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false")
.toBoolean
}

override def hashCode(): Int = {
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
@JsonIgnore
private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty
def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = {
tags.remove((plan, tag))
}

def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = {
tags((plan, tag)) = value
def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = {
tags((null, tag)) = value
}

def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = {
tags.get((plan, tag)).map(_.asInstanceOf[T])
def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = {
tags.get((null, tag)).map(_.asInstanceOf[T])
}

def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = {
tags.remove((plan, tag))
def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = {
tags.remove((null, tag))
}

def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = {
withCachedTag(null, tag)(f)
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
Expand All @@ -592,20 +589,12 @@ case class IndexLogEntry(
}
}

def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = {
tags((null, tag)) = value
}

def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = {
tags.get((null, tag)).map(_.asInstanceOf[T])
}

def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = {
tags.remove((null, tag))
def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = {
tags((plan, tag)) = value
}

def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = {
withCachedTag(null, tag)(f)
def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = {
tags.get((plan, tag)).map(_.asInstanceOf[T])
}
}

Expand Down Expand Up @@ -646,16 +635,15 @@ object IndexLogEntry {
* Provides functionality to generate unique file ids for files.
*/
class FileIdTracker {
private var maxId: Long = -1L

// Combination of file properties, used as key, to identify a
// unique file for which an id is generated.
type key = (
String, // Full path.
String, // Full path.
Long, // Size.
Long // Modified time.
)
Long // Modified time.
)
private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap()
private var maxId: Long = -1L

def getMaxFileId: Long = maxId

Expand All @@ -664,8 +652,6 @@ class FileIdTracker {
def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] =
fileToIdMap.get((path, size, modifiedTime))

def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size)

/**
* Add a set of FileInfos to the fileToIdMap. The assumption is
* that the each FileInfo already has a valid file id if an entry
Expand All @@ -679,8 +665,7 @@ class FileIdTracker {
setSizeHint(files.size)
files.foreach { f =>
if (f.id == IndexConstants.UNKNOWN_FILE_ID) {
throw HyperspaceException(
s"Cannot add file info with unknown id. (file: ${f.name}).")
throw HyperspaceException(s"Cannot add file info with unknown id. (file: ${f.name}).")
}

val key = (f.name, f.size, f.modifiedTime)
Expand All @@ -698,6 +683,8 @@ class FileIdTracker {
}
}

def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size)

/**
* Try to add file properties to fileToIdMap. If the file is already in
* the map then return its current id. Otherwise, generate a new id,
Expand All @@ -708,8 +695,7 @@ class FileIdTracker {
*/
def addFile(file: FileStatus): Long = {
fileToIdMap.getOrElseUpdate(
(file.getPath.toString, file.getLen, file.getModificationTime),
{
(file.getPath.toString, file.getLen, file.getModificationTime), {
maxId += 1
maxId
})
Expand Down
Loading