This code calculates the sum of the actions for all the items in a user's category and all the items in another user's category. It uses a Map[Int, Set[Int]] to keep track of which categories have children and joins those categories together to get the sum of the actions.
# The following code registers a user-defined function (UDF) and uses it in a query.
# (The general business logic is irrelevant to the question.)
# What is problematic about the code such that it might tear down the whole cluster, and how can it be solved?
# (Hint: It has to do with the usage of the categoryNodesWithChildren Map variable.)
def calculate(sparkSession: SparkSession): Unit = {
val UserIdColumnName = "userId"
val CategoryIdColumnName = "categoryId"
val NumActionsColumnName = "numActions"
val OtherCategoryIdColumnName = "otherCategoryId"
val OtherNumActionsColumnName = "otherNumActions"
val categoryNodesWithChildren: Map[Int, Set[Int]] =
Map(0 -> Set(1, 2, 3),
1 -> Set(4, 5),
2 -> Set(6, 7),
3 -> Set(8),
7 -> Set(9, 10)
)
sparkSession.udf.register("isChildOf", (nodeId: Int, parentNodeId: Int) =>
nodeId != parentNodeId && categoryNodesWithChildren.getOrElse(nodeId, Set[Int]()).contains(parentNodeId))
val userCategoryActions = readUserCategoryActions(sparkSession)
val otherUserCategoryActions = userCategoryActions
.select(
col(UserIdColumnName),
col(CategoryIdColumnName).alias(OtherCategoryIdColumnName),
col(NumActionsColumnName).alias(OtherNumActionsColumnName)
)
val joinedUserActions = userCategoryActions
.join(otherUserCategoryActions, UserIdColumnName)
.where("!(isChildOf(categoryId,otherCategoryId) or isChildOf(otherCategoryId,categoryId))")
.groupBy(UserIdColumnName, CategoryIdColumnName, OtherCategoryIdColumnName)
.sum(OtherNumActionsColumnName)
.withColumnRenamed(s"sum($OtherNumActionsColumnName)", OtherNumActionsColumnName)
joinedUserActions.show()
}