Hadoop by default uses the FileOutputFormatCommitter
to manage the promotion of files created in a single task attempt to the final output of a query. This is done in a way to handle failures of tasks and jobs, and to support speculative execution. It does that by listing directories and renaming their content into the final destination when tasks and then jobs are committed.
Using "classic" FileOutputCommmitter to commit work to Amazon S3 risks loss or corruption of generated data. To address these problems there is now explicit support in the hadop-aws
module for committing work to Amazon S3 via the S3A filesystem client, the S3A Committers.
Even though Hadoop's S3A client can make an S3 bucket appear to be a Hadoop-compatible filesystem, it is still an object store, and has some limitations when acting as a Hadoop-compatible filesystem. The key things to be aware of are:
- Operations on directories are potentially slow and non-atomic.
- Not all file operations are supported, like
rename()
. - Data is not visible in the object store until the entire output stream has been written.
- Amazon S3 is eventually consistent. Objects are replicated across servers for availability, but changes to a replica take time to propagate to the other replicas; the object store is inconsistent during this process. The inconsistency issues surface when listing, reading, updating, or deleting files. To mitigate the inconsistency issues, you can configure
S3Guard
. To learn more, refer to S3Guard: Consistency and Metadata Caching for S3A. - Neither the per-file and per-directory permissions supported by HDFS nor its more sophisticated ACL mechanism are supported.
- Bandwidth between your workload clusters and Amazon S3 is limited and can vary significantly depending on network and VM load.
For these reasons, while Amazon S3 can be used as the source and store for persistent data, it cannot be used as a direct replacement for a cluster-wide filesystem such as HDFS, or be used as fs.defaultFS
.
Staging committer - Directory and Partitioned, contributed by Netflix. Magic committer - Hadoop Community Solution. It requires S3Guard for consistency.
-
Directory Committer: Buffers working data to the local disk, uses HDFS to propagate commit information from tasks to job committer, and manages conflict across the entire destination directory tree.
-
Partitioned Committer: Identical to the Directory committer except that conflict is managed on a partition-by-partition basis. This allows it to be used for in-place updates of existing datasets. It is only suitable for use with Spark.
-
Magic Committer: Data is written directly to S3, but “magically” retargeted at the final destination. Conflict is managed across the directory tree. It requires a consistent S3 object store, which means S3Guard is a mandatory pre-requisite.
-
EMRFS: The EMR File System (EMRFS) is an implementation of HDFS that all Amazon EMR clusters use for reading and writing regular files from Amazon EMR directly to Amazon S3. This is only available on EMR and it's not open source and we won't consider this case at this moment. (things may change)
Option 1: S3 Basic Authentication
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"spark.hadoop.fs.s3a.secret.key": "<your_aws_secret_key>"
"spark.hadoop.fs.s3a.access.key" : "<your_aws_access_key_id>"
Option 2: S3 Instance Profile Authentication
"spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.InstanceProfileCredentialsProvider"
Note: Please make sure you attach required S3 policy to your node group roles.
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
"spark.hadoop.fs.s3a.committer.name": "directory"
"spark.hadoop.fs.s3a.committer.staging.conflict-mode": "append"
Note: Magic Committer is not available in Spark 2.4.x
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
"spark.sql.parquet.output.committer.class": "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter"
"spark.hadoop.fs.s3a.committer.name": "directory"
"spark.hadoop.fs.s3a.committer.staging.conflict-mode": "append"
Limitations:
It is suggested to use a consistent store with staging committers together. Using s3 as fs.defaultFS
like this "spark.hadoop.fs.defaultFS": "s3a://spark-k8s-data/"
is not recommneded.
As EKS doesn't provide cluster-wide filesystem such as HDFS, an alternative is to mount EFS for consistent layer for S3.
With EFS, you can firstly mount EFS to pods, and then set spark.hadoop.fs.defaultFS
to the path of EFS volume.
"spark.hadoop.fs.s3a.committer.staging.tmp.path": "tmp/staging"
"spark.hadoop.fs.s3a.buffer.dir": "/tmp"
"spark.hadoop.fs.defaultFS": "/mnt/efs/path/"
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
"spark.sql.parquet.output.committer.class": "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter"
"spark.hadoop.fs.s3a.committer.name": "magic"
"spark.hadoop.fs.s3a.committer.magic.enabled": "true"
"spark.hadoop.fs.s3a.metadatastore.impl": "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore"
"spark.hadoop.fs.s3a.s3guard.ddb.region": "us-west-2"
"spark.hadoop.fs.s3a.s3guard.ddb.table.create": "true"