Skip to content

Commit

Permalink
[SPARK-2470] PEP8 fixes to PySpark
Browse files Browse the repository at this point in the history
This pull request aims to resolve all outstanding PEP8 violations in PySpark.

Author: Nicholas Chammas <[email protected]>
Author: nchammas <[email protected]>

Closes apache#1505 from nchammas/master and squashes the following commits:

98171af [Nicholas Chammas] [SPARK-2470] revert PEP 8 fixes to cloudpickle
cba7768 [Nicholas Chammas] [SPARK-2470] wrap expression list in parentheses
e178dbe [Nicholas Chammas] [SPARK-2470] style - change position of line break
9127d2b [Nicholas Chammas] [SPARK-2470] wrap expression lists in parentheses
22132a4 [Nicholas Chammas] [SPARK-2470] wrap conditionals in parentheses
24639bc [Nicholas Chammas] [SPARK-2470] fix whitespace for doctest
7d557b7 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to tests.py
8f8e4c0 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to storagelevel.py
b3b96cf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to statcounter.py
d644477 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to worker.py
aa3a7b6 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to sql.py
1916859 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to shell.py
95d1d95 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to serializers.py
a0fec2e [Nicholas Chammas] [SPARK-2470] PEP8 fixes to mllib
c85e1e5 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to join.py
d14f2f1 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to __init__.py
81fcb20 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to resultiterable.py
1bde265 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to java_gateway.py
7fc849c [Nicholas Chammas] [SPARK-2470] PEP8 fixes to daemon.py
ca2d28b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to context.py
f4e0039 [Nicholas Chammas] [SPARK-2470] PEP8 fixes to conf.py
a6d5e4b [Nicholas Chammas] [SPARK-2470] PEP8 fixes to cloudpickle.py
f0a7ebf [Nicholas Chammas] [SPARK-2470] PEP8 fixes to rddsampler.py
4dd148f [nchammas] Merge pull request apache#5 from apache/master
f7e4581 [Nicholas Chammas] unrelated pep8 fix
a36eed0 [Nicholas Chammas] name ec2 instances and security groups consistently
de7292a [nchammas] Merge pull request apache#4 from apache/master
2e4fe00 [nchammas] Merge pull request apache#3 from apache/master
89fde08 [nchammas] Merge pull request #2 from apache/master
69f6e22 [Nicholas Chammas] PEP8 fixes
2627247 [Nicholas Chammas] broke up lines before they hit 100 chars
6544b7e [Nicholas Chammas] [SPARK-2065] give launched instances names
69da6cf [nchammas] Merge pull request #1 from apache/master
  • Loading branch information
nchammas authored and rxin committed Jul 22, 2014
1 parent c3462c6 commit 5d16d5b
Show file tree
Hide file tree
Showing 18 changed files with 127 additions and 97 deletions.
3 changes: 2 additions & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@
from pyspark.storagelevel import StorageLevel


__all__ = ["SparkConf", "SparkContext", "SQLContext", "RDD", "SchemaRDD", "SparkFiles", "StorageLevel", "Row"]
__all__ = ["SparkConf", "SparkContext", "SQLContext", "RDD", "SchemaRDD",
"SparkFiles", "StorageLevel", "Row"]
9 changes: 5 additions & 4 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
spark.executorEnv.VAR4=value4
spark.home=/path
>>> sorted(conf.getAll(), key=lambda p: p[0])
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \
(u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
"""


Expand Down Expand Up @@ -118,9 +119,9 @@ def setExecutorEnv(self, key=None, value=None, pairs=None):
"""Set an environment variable to be passed to executors."""
if (key is not None and pairs is not None) or (key is None and pairs is None):
raise Exception("Either pass one key-value pair or a list of pairs")
elif key != None:
elif key is not None:
self._jconf.setExecutorEnv(key, value)
elif pairs != None:
elif pairs is not None:
for (k, v) in pairs:
self._jconf.setExecutorEnv(k, v)
return self
Expand All @@ -137,7 +138,7 @@ def setAll(self, pairs):

def get(self, key, defaultValue=None):
"""Get the configured value for some key, or return a default otherwise."""
if defaultValue == None: # Py4J doesn't call the right get() if we pass None
if defaultValue is None: # Py4J doesn't call the right get() if we pass None
if not self._jconf.contains(key):
return None
return self._jconf.get(key)
Expand Down
45 changes: 25 additions & 20 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer
PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand All @@ -50,12 +50,11 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH

_python_includes = None # zip and egg files that need to be added to PYTHONPATH

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._accumulatorServer = accumulators._start_update_server()
(host, port) = self._accumulatorServer.server_address
self._javaAccumulator = self._jsc.accumulator(
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))

self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')

Expand All @@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(dirname, filename) = os.path.split(path)
self._python_includes.append(filename)
sys.path.append(path)
if not dirname in sys.path:
if dirname not in sys.path:
sys.path.append(dirname)

# Create a temporary directory inside spark.local.dir:
Expand All @@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
if (SparkContext._active_spark_context and
SparkContext._active_spark_context != instance):
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
" created by %s at %s:%s " \
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance

Expand Down Expand Up @@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None):
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
Expand Down Expand Up @@ -584,11 +587,12 @@ def addPyFile(self, path):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix

if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
# for tests in local mode
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))

def setCheckpointDir(self, dirName):
"""
Expand Down Expand Up @@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
Cancelled
If interruptOnCancel is set to true for the job group, then job cancellation will result
in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
where HDFS may respond to Thread.interrupt() by marking nodes as dead.
in Thread.interrupt() being called on the job's executor threads. This is useful to help
ensure that the tasks are actually stopped in a timely manner, but is off by default due
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
"""
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

Expand Down Expand Up @@ -688,7 +692,7 @@ def cancelAllJobs(self):
"""
self._jsc.sc().cancelAllJobs()

def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
Executes the given partitionFunc on the specified set of partitions,
returning the result as an array of elements.
Expand All @@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
"""
if partitions == None:
if partitions is None:
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)

Expand All @@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))


def _test():
import atexit
import doctest
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def should_exit():


def compute_real_exit_code(exit_code):
# SystemExit's code can be integer or string, but os._exit only accepts integers
import numbers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
return 1
# SystemExit's code can be integer or string, but os._exit only accepts integers
import numbers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
return 1


def worker(listen_sock):
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@

from pyspark.resultiterable import ResultIterable


def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x: dispatch(x.__iter__()))


def python_join(rdd, other, numPartitions):
Expand Down Expand Up @@ -85,6 +86,7 @@ def make_mapper(i):
vrdds = [rdd.map(make_mapper(i)) for i, rdd in enumerate(rdds)]
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
rdd_len = len(vrdds)

def dispatch(seq):
bufs = [[] for i in range(rdd_len)]
for (n, v) in seq:
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _deserialize_double_vector(ba, offset=0):
nb = len(ba) - offset
if nb < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % nb)
"which is too short" % nb)
if ba[offset] == DENSE_VECTOR_MAGIC:
return _deserialize_dense_vector(ba, offset)
elif ba[offset] == SPARSE_VECTOR_MAGIC:
Expand Down Expand Up @@ -272,6 +272,7 @@ def _serialize_labeled_point(p):
header_float[0] = p.label
return header + serialized_features


def _deserialize_labeled_point(ba, offset=0):
"""Deserialize a LabeledPoint from a mutually understood format."""
from pyspark.mllib.regression import LabeledPoint
Expand All @@ -283,6 +284,7 @@ def _deserialize_labeled_point(ba, offset=0):
features = _deserialize_double_vector(ba, offset + 9)
return LabeledPoint(label, features)


def _copyto(array, buffer, offset, shape, dtype):
"""
Copy the contents of a vector to a destination bytearray at the
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def stringify(vector):
else:
return "[" + ",".join([str(v) for v in vector]) + "]"


def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from pyspark.serializers import NoOpSerializer



class MLUtils:
"""
Helper methods to load, save and pre-process data used in MLlib.
Expand Down Expand Up @@ -154,7 +153,6 @@ def saveAsLibSVMFile(data, dir):
lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p))
lines.saveAsTextFile(dir)


@staticmethod
def loadLabeledPoints(sc, path, minPartitions=None):
"""
Expand Down
24 changes: 12 additions & 12 deletions python/pyspark/rddsampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import sys
import random


class RDDSampler(object):
def __init__(self, withReplacement, fraction, seed=None):
try:
import numpy
self._use_numpy = True
except ImportError:
print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling."
print >> sys.stderr, (
"NumPy does not appear to be installed. "
"Falling back to default random generator for sampling.")
self._use_numpy = False

self._seed = seed if seed is not None else random.randint(0, sys.maxint)
Expand Down Expand Up @@ -61,7 +64,7 @@ def getUniformSample(self, split):
def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)

if self._use_numpy:
return self._random.poisson(mean)
else:
Expand All @@ -80,30 +83,27 @@ def getPoissonSample(self, split, mean):
num_arrivals += 1

return (num_arrivals - 1)

def shuffle(self, vals):
if self._random is None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter

if self._use_numpy:
self._random.shuffle(vals)
else:
self._random.shuffle(vals, self._random.random)

def func(self, split, iterator):
if self._withReplacement:
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in a sample with
# replacement is Poisson(frac). We use that to get a count for each element.
count = self.getPoissonSample(split, mean = self._fraction)
# For large datasets, the expected number of occurrences of each element in
# a sample with replacement is Poisson(frac). We use that to get a count for
# each element.
count = self.getPoissonSample(split, mean=self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
yield obj




3 changes: 3 additions & 0 deletions python/pyspark/resultiterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import collections


class ResultIterable(collections.Iterable):
"""
A special result iterable. This is used because the standard iterator can not be pickled
Expand All @@ -27,7 +28,9 @@ def __init__(self, data):
self.data = data
self.index = 0
self.maxindex = len(data)

def __iter__(self):
return iter(self.data)

def __len__(self):
return len(self.data)
Loading

0 comments on commit 5d16d5b

Please sign in to comment.