Add warning log - newly created Hyperspace context for different Spark Session#374
Add warning log - newly created Hyperspace context for different Spark Session#374sezruby wants to merge 2 commits intomicrosoft:masterfrom
Conversation
| contexts.put(spark, (Thread.currentThread().getId, new HyperspaceContext(spark))) | ||
| } | ||
| } else if (ctx.get._1 != Thread.currentThread().getId) { | ||
| throw HyperspaceException(s"Hyperspace does not support multiple threads " + |
There was a problem hiding this comment.
@imback82
I guess this is too restricted.. How about writing some warning log and creating a new context in this case?
Seems checkAnswer API uses a different thread internally.
| } | ||
| } else if (ctx.get._1 != Thread.currentThread().getId) { | ||
| logWarning(s"Hyperspace is not thread safe for threads using one Spark session. " + | ||
| "Please be aware of it. Current thread id: ${Thread.currentThread().getId}, " + |
There was a problem hiding this comment.
What is the implication of potentially sharing the HyperspaceContext by threads if we already know that it's not thread safe? Isn't it better to fail faster here instead of failing somewhere else?
There was a problem hiding this comment.
checkAnswer won't work if we throw an exception here. No idea how to fix it and one possible scenario:
- setup & check in main thread
- run with a worker thread
There was a problem hiding this comment.
are you still blocked on this?
There was a problem hiding this comment.
I think throwing exception is too restrictive. Users might not know how to fix the issue ..?
There was a problem hiding this comment.
I thought it's user's responsibility to use the API which is not thread safe.
Different thread id not always means they are running concurrently..
I think we need documentation rather than restricting the use case?
There was a problem hiding this comment.
Different thread id not always means they are running concurrently..
But the new structure allows multiple threads accessing the object concurrently? The existing implementation makes sure only one to one mapping via thread local. So, I would keep this condition as it is, and if there are demands for accessing the hyperspace object from multiple threads, we should think about making it thread-safe instead of documenting.
Plus, I think random failures are much worse and hard to debug, esp. failures caused by the thread safety.
There was a problem hiding this comment.
I agree on how difficult analyzing failures from concurrent threads.
I added the exception change, then this can be a breaking change for some use case.
There was a problem hiding this comment.
Added breaking change label so we can document it in the release notes.
There was a problem hiding this comment.
btw, even if this is a breaking change, it's alerting the user a possible misuse where hyperspace context is being recreated, so I think it's worthwhile.
src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala
Show resolved
Hide resolved
|
Could you update the PR description as it seems out of date? Basically, we are preventing hyperspace context from being re-created in certain scenarios, and making sure too many contexts are not created. |
| assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan)) | ||
| checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) | ||
| val resultEnabled = dfWithHyperspaceEnabled.collect().toSeq.toSet | ||
| assert(resultEnabled.equals(resultDisabled)) |
There was a problem hiding this comment.
is this change still needed? if so, why?
There was a problem hiding this comment.
In checkAnswer, a new thread tries to build the query plan again - so it causes the exception.
No idea why other checkAnswers have no problem with it..
There was a problem hiding this comment.
oh interesting. could you share the code where a new thread is spawned in checkAnswer?
There was a problem hiding this comment.
in checkAnswer
val sparkAnswer = try df.collect().toSeq catch {
case e: Exception =>
val errorMessage =
s"""
|Exception thrown while executing query:
|${df.queryExecution}
|== Exception ==
|$e
|${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
""".stripMargin
return Some(errorMessage)
}
It's because of broadcast join..
[info] at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
[info] at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
[info] at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:144)
[info] at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info] at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
[info] at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
[info] at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
[info] at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:211)
[info] at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
failed thread stack:
..
[info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
[info] at com.microsoft.hyperspace.index.rules.FilterIndexRule$.apply(FilterIndexRule.scala:52)
[info] at com.microsoft.hyperspace.index.rules.FilterIndexRule$.apply(FilterIndexRule.scala:38)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:87)
[info] at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
[info] at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
[info] at scala.collection.immutable.List.foldLeft(List.scala:89)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:84)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:76)
[info] at scala.collection.immutable.List.foreach(List.scala:392)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
[info] at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
[info] at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
[info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
[info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
[info] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
[info] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
[info] at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3359)
[info] at org.apache.spark.sql.Dataset.collect(Dataset.scala:2782)
[info] at org.apache.spark.sql.delta.PartitionFiltering.filesForScan(PartitionFiltering.scala:40)
[info] at org.apache.spark.sql.delta.PartitionFiltering.filesForScan$(PartitionFiltering.scala:27)
[info] at org.apache.spark.sql.delta.Snapshot.filesForScan(Snapshot.scala:52)
[info] at org.apache.spark.sql.delta.files.TahoeLogFileIndex.matchingFiles(TahoeFileIndex.scala:140)
[info] at org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:56)
[info] at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:193)
[info] at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:190)
[info] at org.apache.spark.sql.execution.FileSourceScanExec.updateDriverMetrics(DataSourceScanExec.scala:529)
[info] at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:307)
[info] at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
[info] at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
[info] at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
[info] at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
[info] at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
There was a problem hiding this comment.
Seems we shouldn't throw the exception .. ? 😧😧😧😧😧😧😧
There was a problem hiding this comment.
Hmm, then we have to think about this feature again, since it may hit into thread-safety issue even if the user didn't intend it?
There was a problem hiding this comment.
For example, this could have gone unnoticed if we didn't throw the exception?
There was a problem hiding this comment.
Seems so. How about ThreadLocal[HashMap[Session, Context]]? 😁
There was a problem hiding this comment.
But that would allow multiple threads share the same Hyperspace context.
There was a problem hiding this comment.
So, basically in Delta code, it does allFiles.toDF() and eventually that dataframe is collect-ed, which triggers our optimizer. In our optimizer, we have an extractor that calls Hyperspace.getContext(spark).sourceProviderManager in ExtractRelation. So instead of case filter @ Filter(condition: Expression, ExtractRelation(relation)), we could do case filter @ Filter if isSupportedRelation, and I think we can avoid the issue.
But, I think this is still hacky and the right way to fix it seems like making the context thread-safe.
| val resultEnabled = dfWithHyperspaceEnabled.collect().toSeq.sortBy(_.hashCode()) | ||
| assert(!basePlan.equals(updatedPlan)) | ||
| assert(resultEnabled.equals(resultDisabled)) | ||
| checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) |
There was a problem hiding this comment.
After executing collect(), checkAnswer doesn't throw the exception 😧
| assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan)) | ||
| checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) | ||
| val resultEnabled = dfWithHyperspaceEnabled.collect().toSeq.toSet | ||
| assert(resultEnabled.equals(resultDisabled)) |
There was a problem hiding this comment.
oh interesting. could you share the code where a new thread is spawned in checkAnswer?
| s"Current limit: ${contexts.size}") | ||
| } | ||
| val newCtx = new HyperspaceContext(spark) | ||
| contexts.put(spark, (threadId, newCtx)) |
There was a problem hiding this comment.
Should we check the return value of put and throw an exception if it returns Some? There could be multiple threads sharing the same spark session that hits if (!ctx.isDefined) at the same time.
| // the one HyperspaceContext is using because Hyperspace depends on the | ||
| // session's properties such as configs, etc. | ||
| context.set(new HyperspaceContext(spark)) | ||
| if (!ctx.spark.equals(spark)) { |
There was a problem hiding this comment.
@imback82 I'm thinking of this change & what can we do about this.
I think the problem is - we access using the active spark session when optimizing query plan, but it can be different from the session which is used to create Hyperspace object.
So.. how about
- not resetting hyperspace & just using the previous one even with different active spark session.
- if a user wants to use another configs/spark session, they need to redefine Hyperspace object.
I think this is clearer behavior because we do val hs = new Hyperspace(spark)
There was a problem hiding this comment.
- not resetting hyperspace & just using the previous one even with different active spark session.
Will it be ok to silently use a different active session?
What is the context for this pull request?
What changes were proposed in this pull request?
Currently, Hyperspace keeps only one HyperspaceContext for each Hyperspace object.
So if an app uses multiple concurrent Spark sessions using one Hyperspace object & one thread, a new Hyperspace context is continuously created for each request.
As Hyperspace object is not thread-safe, we tried to force - "One client thread" should use only "one Spark Session", but we can't because it's possible to access from multiple threads with the same SparkSession (e.g. delta lake - broadcast join query execution).
So for now, we left some warning log in that case, so that users could notice it might an ineffective use of Hyperspace.
Does this PR introduce any user-facing change?
Yes, warning log can be left if a Hyperspace object is accessed with different Spark Session.
How was this patch tested?
tested on local env