spark-instrumented-optimizer/R/pkg/inst/worker/worker.R
cafreeman 59e206deb7 [SPARK-6807] [SparkR] Merge recent SparkR-pkg changes
This PR pulls in recent changes in SparkR-pkg, including

cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField.

Author: cafreeman <cfreeman@alteryx.com>
Author: Davies Liu <davies@databricks.com>
Author: Zongheng Yang <zongheng.y@gmail.com>
Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com>
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Author: Sun Rui <rui.sun@intel.com>

Closes #5436 from davies/R3 and squashes the following commits:

c2b09be [Davies Liu] SQLTypes -> schema
a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3
168b7fe [Davies Liu] sort generics
b1fe460 [Davies Liu] fix conflict in README.md
e74c04e [Davies Liu] fix schema.R
4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5
41f8184 [Davies Liu] rm man
ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3
1bdcb63 [Zongheng Yang] Updates to README.md.
5a553e7 [cafreeman] Use object attribute instead of argument
71372d9 [cafreeman] Update docs and examples
8526d2e71 [cafreeman] Remove `tojson` functions
6ef5f2d [cafreeman] Fix spacing
7741d66 [cafreeman] Rename the SQL DataType function
141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream
9387402 [Davies Liu] fix style
40199eb [Shivaram Venkataraman] Move except into sorted position
07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD.
7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey
ed66c81 [cafreeman] Update `subtract` to work with `generics.R`
f3ba785 [cafreeman] Fixed duplicate export
275deb4 [cafreeman] Update `NAMESPACE` and tests
1a3b63d [cafreeman] new version of `CreateDF`
836c4bf [cafreeman] Update `createDataFrame` and `toDF`
be5d5c1 [cafreeman] refactor schema functions
40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5
20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist
ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4
c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master
b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master
136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats
cd66603 [cafreeman] new line at EOF
8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep
7dd81b7 [cafreeman] Documentation
0e2a94f [cafreeman] Define functions for schema and fields
2015-04-17 13:42:19 -07:00

178 lines
5.7 KiB
R

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Worker class
# Get current system time
currentTimeSecs <- function() {
as.numeric(Sys.time())
}
# Get elapsed time
elapsedSecs <- function() {
proc.time()[3]
}
# Constants
specialLengths <- list(END_OF_STERAM = 0L, TIMING_DATA = -1L)
# Timing R process boot
bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
# SparkR namespace
.libPaths(c(rLibDir, .libPaths()))
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(port = port, blocking = TRUE, open = "rb")
outputCon <- socketConnection(port = port, blocking = TRUE, open = "wb")
# read the index of the current partition inside the RDD
partition <- SparkR:::readInt(inputCon)
deserializer <- SparkR:::readString(inputCon)
serializer <- SparkR:::readString(inputCon)
# Include packages as required
packageNames <- unserialize(SparkR:::readRaw(inputCon))
for (pkg in packageNames) {
suppressPackageStartupMessages(library(as.character(pkg), character.only=TRUE))
}
# read function dependencies
funcLen <- SparkR:::readInt(inputCon)
computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen))
env <- environment(computeFunc)
parent.env(env) <- .GlobalEnv # Attach under global environment.
# Timing init envs for computing
initElap <- elapsedSecs()
# Read and set broadcast variables
numBroadcastVars <- SparkR:::readInt(inputCon)
if (numBroadcastVars > 0) {
for (bcast in seq(1:numBroadcastVars)) {
bcastId <- SparkR:::readInt(inputCon)
value <- unserialize(SparkR:::readRaw(inputCon))
setBroadcastValue(bcastId, value)
}
}
# Timing broadcast
broadcastElap <- elapsedSecs()
# If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int
# as number of partitions to create.
numPartitions <- SparkR:::readInt(inputCon)
isEmpty <- SparkR:::readInt(inputCon)
if (isEmpty != 0) {
if (numPartitions == -1) {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
data <- SparkR:::readDeserialize(inputCon)
} else if (deserializer == "string") {
data <- as.list(readLines(inputCon))
} else if (deserializer == "row") {
data <- SparkR:::readDeserializeRows(inputCon)
}
# Timing reading input data for execution
inputElap <- elapsedSecs()
output <- computeFunc(partition, data)
# Timing computing
computeElap <- elapsedSecs()
if (serializer == "byte") {
SparkR:::writeRawSerialize(outputCon, output)
} else if (serializer == "row") {
SparkR:::writeRowSerialize(outputCon, output)
} else {
# write lines one-by-one with flag
lapply(output, function(line) SparkR:::writeString(outputCon, line))
}
# Timing output
outputElap <- elapsedSecs()
} else {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
data <- SparkR:::readDeserialize(inputCon)
} else if (deserializer == "string") {
data <- readLines(inputCon)
} else if (deserializer == "row") {
data <- SparkR:::readDeserializeRows(inputCon)
}
# Timing reading input data for execution
inputElap <- elapsedSecs()
res <- new.env()
# Step 1: hash the data to an environment
hashTupleToEnvir <- function(tuple) {
# NOTE: execFunction is the hash function here
hashVal <- computeFunc(tuple[[1]])
bucket <- as.character(hashVal %% numPartitions)
acc <- res[[bucket]]
# Create a new accumulator
if (is.null(acc)) {
acc <- SparkR:::initAccumulator()
}
SparkR:::addItemToAccumulator(acc, tuple)
res[[bucket]] <- acc
}
invisible(lapply(data, hashTupleToEnvir))
# Timing computing
computeElap <- elapsedSecs()
# Step 2: write out all of the environment as key-value pairs.
for (name in ls(res)) {
SparkR:::writeInt(outputCon, 2L)
SparkR:::writeInt(outputCon, as.integer(name))
# Truncate the accumulator list to the number of elements we have
length(res[[name]]$data) <- res[[name]]$counter
SparkR:::writeRawSerialize(outputCon, res[[name]]$data)
}
# Timing output
outputElap <- elapsedSecs()
}
} else {
inputElap <- broadcastElap
computeElap <- broadcastElap
outputElap <- broadcastElap
}
# Report timing
SparkR:::writeInt(outputCon, specialLengths$TIMING_DATA)
SparkR:::writeDouble(outputCon, bootTime)
SparkR:::writeDouble(outputCon, initElap - bootElap) # init
SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast
SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input
SparkR:::writeDouble(outputCon, computeElap - inputElap) # compute
SparkR:::writeDouble(outputCon, outputElap - computeElap) # output
# End of output
SparkR:::writeInt(outputCon, specialLengths$END_OF_STERAM)
close(outputCon)
close(inputCon)