diff --git a/bin/dev/run-tests-from-scratch b/bin/dev/run-tests-from-scratch index b6172b23..ab526c02 100755 --- a/bin/dev/run-tests-from-scratch +++ b/bin/dev/run-tests-from-scratch @@ -16,7 +16,7 @@ SBT_OPTS_DEFAULT="-Xms512M -Xmx2048M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:Ma SPARK_MEM_DEFAULT=4g SHARK_MASTER_MEM_DEFAULT=4g SPARK_KV_JAVA_OPTS_DEFAULT=("-Dspark.local.dir=/tmp " "-Dspark.kryoserializer.buffer.mb=10 -XX:MaxPermSize=1g ") -SPARK_GIT_URL_DEFAULT="https://github.com/apache/incubator-spark.git spark" +SPARK_GIT_URL_DEFAULT="https://github.com/apache/spark.git spark" HIVE_GIT_URL_DEFAULT="https://github.com/amplab/hive.git -b shark-0.11" SPARK_HADOOP_VERSION_DEFAULT="1.0.4" SPARK_WITH_YARN_DEFAULT=false diff --git a/project/SharkBuild.scala b/project/SharkBuild.scala index fe9dcfbc..123d4604 100755 --- a/project/SharkBuild.scala +++ b/project/SharkBuild.scala @@ -29,13 +29,13 @@ import sbtassembly.Plugin.AssemblyKeys._ object SharkBuild extends Build { // Shark version - val SHARK_VERSION = "0.9.1" + val SHARK_VERSION = "1.0.0-SNAPSHOT" val SHARK_ORGANIZATION = "edu.berkeley.cs.shark" val HIVE_VERSION = "0.11.0-shark-0.9.1" - val SPARK_VERSION = "0.9.1" + val SPARK_VERSION = "1.0.0-SNAPSHOT" val SCALA_VERSION = "2.10.3" diff --git a/src/main/scala/shark/SharkEnv.scala b/src/main/scala/shark/SharkEnv.scala index 14293e9f..cdc8da96 100755 --- a/src/main/scala/shark/SharkEnv.scala +++ b/src/main/scala/shark/SharkEnv.scala @@ -129,7 +129,7 @@ object SharkEnv extends LogHelper { var sc: SharkContext = _ - val shuffleSerializerName = classOf[ShuffleSerializer].getName + val shuffleSerializer = new ShuffleSerializer(new SparkConf()) val memoryMetadataManager = new MemoryMetadataManager diff --git a/src/main/scala/shark/api/JavaTableRDD.scala b/src/main/scala/shark/api/JavaTableRDD.scala index 50be2d4f..85bcba6c 100644 --- a/src/main/scala/shark/api/JavaTableRDD.scala +++ b/src/main/scala/shark/api/JavaTableRDD.scala @@ -54,7 +54,7 @@ class JavaTableRDD(val rdd: RDD[Row], val schema: Array[ColumnDesc]) * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: JFunction[Row, java.lang.Boolean]): JavaTableRDD = - wrapRDD(rdd.filter((x => f(x).booleanValue()))) + wrapRDD(rdd.filter((x => f.call(x).booleanValue()))) /** * Return a sampled subset of this RDD. diff --git a/src/main/scala/shark/execution/CoGroupedRDD.scala b/src/main/scala/shark/execution/CoGroupedRDD.scala index 873a15ae..5c760512 100644 --- a/src/main/scala/shark/execution/CoGroupedRDD.scala +++ b/src/main/scala/shark/execution/CoGroupedRDD.scala @@ -81,7 +81,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) - new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializerName) + new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializer) } } } @@ -117,7 +117,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } values } - val serializer = SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName, SparkEnv.get.conf) + val serializer =SharkEnv.shuffleSerializer for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => { // Read them from the parent diff --git a/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala b/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala index 3a604441..7332632a 100755 --- a/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala +++ b/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala @@ -201,7 +201,7 @@ class GroupByPostShuffleOperator extends GroupByPreShuffleOperator val partitioner = new ReduceKeyPartitioner(numReduceTasks) val repartitionedRDD = new ShuffledRDD[Any, Any, (Any, Any)](inputRdd, partitioner) - .setSerializer(SharkEnv.shuffleSerializerName) + .setSerializer(SharkEnv.shuffleSerializer) if (distinctKeyAggrs.size > 0) { // If there are distinct aggregations, do sort-based aggregation. diff --git a/src/main/scala/shark/execution/HadoopTableReader.scala b/src/main/scala/shark/execution/HadoopTableReader.scala index d96596f5..be67d21e 100644 --- a/src/main/scala/shark/execution/HadoopTableReader.scala +++ b/src/main/scala/shark/execution/HadoopTableReader.scala @@ -90,7 +90,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf val fs = hiveTable.getPath().getFileSystem(hiveConf) if (!fs.exists(hiveTable.getPath())) - return new EmptyRDD(SharkEnv.sc) + return SharkEnv.sc.emptyRDD // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. @@ -150,7 +150,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf val fs = partPath.getFileSystem(hiveConf) if (!fs.exists(partPath)) - return new EmptyRDD(SharkEnv.sc) + return SharkEnv.sc.emptyRDD val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) val ifc = partDesc.getInputFileFormatClass @@ -223,7 +223,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf }.toSeq // Even if we don't use any partitions, we still need an empty RDD if (hivePartitionRDDs.size == 0) { - new EmptyRDD[Object](SharkEnv.sc) + SharkEnv.sc.emptyRDD[Object] } else { new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) } diff --git a/src/main/scala/shark/execution/LimitOperator.scala b/src/main/scala/shark/execution/LimitOperator.scala index c78c0ab4..36e809a1 100755 --- a/src/main/scala/shark/execution/LimitOperator.scala +++ b/src/main/scala/shark/execution/LimitOperator.scala @@ -38,7 +38,7 @@ class LimitOperator extends UnaryOperator[LimitDesc] { val inputRdd = executeParents().head._2 inputRdd.mapPartitions({ iter => iter.take(limitNum) }, preservesPartitioning = true) } else { - new EmptyRDD(SharkEnv.sc) + SharkEnv.sc.emptyRDD } } diff --git a/src/main/scala/shark/execution/RDDUtils.scala b/src/main/scala/shark/execution/RDDUtils.scala index 1306f9bd..ada09d35 100755 --- a/src/main/scala/shark/execution/RDDUtils.scala +++ b/src/main/scala/shark/execution/RDDUtils.scala @@ -76,7 +76,7 @@ object RDDUtils { def repartition[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], part: Partitioner) : RDD[(K, V)] = { - new ShuffledRDD[K, V, (K, V)](rdd, part).setSerializer(SharkEnv.shuffleSerializerName) + new ShuffledRDD[K, V, (K, V)](rdd, part).setSerializer(SharkEnv.shuffleSerializer) } /** @@ -88,7 +88,7 @@ object RDDUtils { { val part = new RangePartitioner(rdd.partitions.length, rdd) val shuffled = new ShuffledRDD[K, V, (K, V)](rdd, part) - .setSerializer(SharkEnv.shuffleSerializerName) + .setSerializer(SharkEnv.shuffleSerializer) shuffled.mapPartitions(iter => { val buf = iter.toArray buf.sortWith((x, y) => x._1.compareTo(y._1) < 0).iterator diff --git a/src/main/scala/shark/execution/SharkDDLTask.scala b/src/main/scala/shark/execution/SharkDDLTask.scala index 799c689e..f93fb94f 100644 --- a/src/main/scala/shark/execution/SharkDDLTask.scala +++ b/src/main/scala/shark/execution/SharkDDLTask.scala @@ -105,7 +105,7 @@ private[shark] class SharkDDLTask extends HiveTask[SharkDDLWork] val memoryTable = SharkEnv.memoryMetadataManager.createMemoryTable( dbName, tableName, cacheMode) // An empty table has a MemoryTable table entry with 'tableRDD' referencing an EmptyRDD. - memoryTable.put(new EmptyRDD(SharkEnv.sc)) + memoryTable.put(SharkEnv.sc.emptyRDD) } } } @@ -134,7 +134,7 @@ private[shark] class SharkDDLTask extends HiveTask[SharkDDLWork] MemoryMetadataManager.makeTableKey(dbName, tableName), Some(partKeyStr)) } else { val partitionedTable = getPartitionedTableWithAssertions(dbName, tableName) - partitionedTable.putPartition(partKeyStr, new EmptyRDD(SharkEnv.sc)) + partitionedTable.putPartition(partKeyStr, SharkEnv.sc.emptyRDD) } } diff --git a/src/main/scala/shark/execution/TableReader.scala b/src/main/scala/shark/execution/TableReader.scala index a5ea0938..938a8631 100644 --- a/src/main/scala/shark/execution/TableReader.scala +++ b/src/main/scala/shark/execution/TableReader.scala @@ -115,13 +115,11 @@ class OffHeapTableReader(@transient _tableDesc: TableDesc, _storageClient: OffHe if (hivePartitionRDDs.size > 0) { new UnionRDD(hivePartitionRDDs.head.context, hivePartitionRDDs) } else { - new EmptyRDD[Object](SharkEnv.sc) + SharkEnv.sc.emptyRDD[Object] } } } - - /** Helper class for scanning tables stored in Spark's block manager */ class HeapTableReader(@transient _tableDesc: TableDesc) extends TableReader { @@ -208,7 +206,7 @@ class HeapTableReader(@transient _tableDesc: TableDesc) extends TableReader { if (hivePartitionRDDs.size > 0) { new UnionRDD(hivePartitionRDDs.head.context, hivePartitionRDDs) } else { - new EmptyRDD[Object](SharkEnv.sc) + SharkEnv.sc.emptyRDD[Object] } } diff --git a/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala b/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala index e4eba584..d6179ecd 100644 --- a/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala +++ b/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala @@ -49,7 +49,7 @@ import shark.execution.{ReduceKey, ReduceKeyReduceSide} * into a hash table. We want to reduce the size of the hash table. Having the BytesWritable wrapper * would increase the size of the hash table by another 16 bytes per key-value pair. */ -class ShuffleSerializer(conf: SparkConf) extends Serializer { +class ShuffleSerializer(conf: SparkConf) extends Serializer with Serializable { // A no-arg constructor since conf is not needed in this serializer. def this() = this(null) @@ -58,7 +58,7 @@ class ShuffleSerializer(conf: SparkConf) extends Serializer { } -class ShuffleSerializerInstance extends SerializerInstance { +class ShuffleSerializerInstance extends SerializerInstance with Serializable { override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException @@ -77,7 +77,7 @@ class ShuffleSerializerInstance extends SerializerInstance { } -class ShuffleSerializationStream(stream: OutputStream) extends SerializationStream { +class ShuffleSerializationStream(stream: OutputStream) extends SerializationStream with Serializable { override def writeObject[T](t: T): SerializationStream = { // On the write-side, the ReduceKey should be of type ReduceKeyMapSide. @@ -108,7 +108,7 @@ class ShuffleSerializationStream(stream: OutputStream) extends SerializationStre } -class ShuffleDeserializationStream(stream: InputStream) extends DeserializationStream { +class ShuffleDeserializationStream(stream: InputStream) extends DeserializationStream with Serializable { override def readObject[T](): T = { // Return type is (ReduceKeyReduceSide, Array[Byte])