Skip to content

Commit

Permalink
[SPARK-4437] update doc for WholeCombineFileRecordReader
Browse files Browse the repository at this point in the history
update doc for WholeCombineFileRecordReader

Author: Davies Liu <[email protected]>
Author: Josh Rosen <[email protected]>

Closes apache#3301 from davies/fix_doc and squashes the following commits:

1d7422f [Davies Liu] Merge pull request #2 from JoshRosen/whole-text-file-cleanup
dc3d21a [Josh Rosen] More genericization in ConfigurableCombineFileRecordReader.
95d13eb [Davies Liu] address comment
bf800b9 [Davies Liu] update doc for WholeCombineFileRecordReader
  • Loading branch information
Davies Liu authored and JoshRosen committed Dec 16, 2014
1 parent c246b95 commit ed36200
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.input

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
Expand All @@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat

override protected def isSplitable(context: JobContext, file: Path): Boolean = false

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
val reader =
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
reader.setConf(getConf)
reader
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.input

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
Expand All @@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext


/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
*/
private[spark] trait Configurable extends HConfigurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
Expand All @@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
index: Integer)
extends RecordReader[String, String] with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)

Expand Down Expand Up @@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader(


/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
* that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
* RecordReaders.
*/
private[spark] class WholeCombineFileRecordReader(
private[spark] class ConfigurableCombineFileRecordReader[K, V](
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
context: TaskAttemptContext,
recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
extends CombineFileRecordReader[K, V](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader]
recordReaderClass
) with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
}
r
}
Expand Down

0 comments on commit ed36200

Please sign in to comment.