Testing Utils¶
Base TestCases¶
- There are two main test cases available in Sparkly:
SparklyTest
creates a new session for each test case.SparklyGlobalSessionTest
uses a single sparkly session for all test cases to boost performance.
from pyspark.sql import types as T
from sparkly import SparklySession
from sparkly.testing import SparklyTest, SparklyGlobalSessionTest
class MyTestCase(SparklyTest):
session = SparklySession
def test(self):
df = self.spark.read_ext.by_url(...)
# Compare all fields
self.assertRowsEqual(
df.collect(),
[
T.Row(col1='row1', col2=1),
T.Row(col1='row2', col2=2),
],
)
...
class MyTestWithReusableSession(SparklyGlobalSessionTest):
context = SparklySession
def test(self):
df = self.spark.read_ext.by_url(...)
...
DataFrame Assertions¶
Asserting that the dataframe produced by your transformation is equal to some expected output can be unnecessarily complicated at times. Common issues include:
- Ignoring the order in which elements appear in an array.
This could be particularly useful when that array is generated as part of a
groupBy
aggregation, and you only care about all elements being part of the end result, rather than the order in which Spark encountered them. - Comparing floats that could be arbitrarily nested in complicated datatypes within a given tolerance; exact matching is either fragile or impossible.
- Ignoring whether a field of a complex datatype is nullable. Spark infers this based on the applied transformations, but it is oftentimes inaccurate. As a result, assertions on complex data types might fail, even though in theory they shouldn’t have.
- Having rows with different field names compare equal if the values match in alphabetical order of the names (see unit tests for example).
- Unhelpful diffs in case of mismatches.
Sparkly addresses these issues by providing assertRowsEqual
:
from pyspark.sql import types as T
from sparkly import SparklySession
from sparkly.test import SparklyTest
def my_transformation(spark):
return spark.createDataFrame(
data=[
('row1', {'field': 'value_1'}, [1.1, 2.2, 3.3]),
('row2', {'field': 'value_2'}, [4.1, 5.2, 6.3]),
],
schema=T.StructType([
T.StructField('id', T.StringType()),
T.StructField(
'st',
T.StructType([
T.StructField('field', T.StringType()),
]),
),
T.StructField('ar', T.ArrayType(T.FloatType())),
]),
)
class MyTestCase(SparklyTest):
session = SparklySession
def test(self):
df = my_transformation(self.spark)
self.assertRowsEqual(
df.collect(),
[
T.Row(id='row2', st=T.Row(field='value_2'), ar=[6.0, 5.0, 4.0]),
T.Row(id='row1', st=T.Row(field='value_1'), ar=[2.0, 3.0, 1.0]),
],
atol=0.5,
)
Instant Iterative Development¶
The slowest part in Spark integration testing is context initialisation.
SparklyGlobalSessionTest
allows you to keep the same instance of spark context between different test cases,
but it still kills the context at the end. It’s especially annoying if you work in TDD fashion.
On each run you have to wait 25-30 seconds till a new context is ready.
We added a tool to preserve spark context between multiple test runs.
Note
In case if you change SparklySession
definition (new options, jars or packages)
you have to refresh the context via sparkly-testing refresh
.
However, you don’t need to refresh context if udfs
are changed.
Fixtures¶
“Fixture” is a term borrowed from Django framework. Fixtures load data to a database before the test execution.
- There are several storages supported in Sparkly:
- Elastic
- Cassandra (requires
cassandra-driver
) - Mysql (requires
PyMySql
) - Kafka (requires
kafka-python
)
from sparkly.test import MysqlFixture, SparklyTest
class MyTestCase(SparklyTest):
...
fixtures = [
MysqlFixture('mysql.host',
'user',
'password',
'/path/to/setup_data.sql',
'/path/to/remove_data.sql')
]
...
-
class
sparkly.testing.
CassandraFixture
(host, setup_file, teardown_file)[source]¶ Fixture to load data into cassandra.
Notes
- Depends on cassandra-driver.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... CassandraFixture( ... 'cassandra.host', ... absolute_path(__file__, 'resources', 'setup.cql'), ... absolute_path(__file__, 'resources', 'teardown.cql'), ... ) ... ] ...
>>> class MyTestCase(SparklyTest): ... data = CassandraFixture( ... 'cassandra.host', ... absolute_path(__file__, 'resources', 'setup.cql'), ... absolute_path(__file__, 'resources', 'teardown.cql'), ... ) ... def setUp(self): ... data.setup_data() ... def tearDown(self): ... data.teardown_data() ...
>>> def test(): ... fixture = CassandraFixture(...) ... with fixture: ... test_stuff() ...
-
class
sparkly.testing.
ElasticFixture
(host, es_index, es_type, mapping=None, data=None, port=None)[source]¶ Fixture for elastic integration tests.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... ElasticFixture( ... 'elastic.host', ... 'es_index', ... 'es_type', ... '/path/to/mapping.json', ... '/path/to/data.json', ... ) ... ] ...
-
class
sparkly.testing.
Fixture
[source]¶ Base class for fixtures.
Fixture is a term borrowed from Django tests, it’s data loaded into database for integration testing.
-
class
sparkly.testing.
KafkaFixture
(host, port=9092, topic=None, key_serializer=None, value_serializer=None, data=None)[source]¶ Fixture for kafka integration tests.
Notes
- depends on kafka-python lib.
- json file should contain array of dicts: [{‘key’: ..., ‘value’: ...}]
Examples
>>> class MyTestCase(SparklySession): ... fixtures = [ ... KafkaFixture( ... 'kafka.host', 'topic', ... key_serializer=..., value_serializer=..., ... data='/path/to/data.json', ... ) ... ]
-
class
sparkly.testing.
KafkaWatcher
(spark, df_schema, key_deserializer, value_deserializer, host, topic, port=9092)[source]¶ Context manager that tracks Kafka data published to a topic
Provides access to the new items that were written to a kafka topic by code running within this context.
NOTE: This is mainly useful in integration test cases and may produce unexpected results in production environments, since there are no guarantees about who else may be publishing to a kafka topic.
- Usage:
my_deserializer = lambda item: json.loads(item.decode(‘utf-8’)) kafka_watcher = KafkaWatcher(
my_sparkly_session, expected_output_dataframe_schema, my_deserializer, my_deserializer, ‘my.kafkaserver.net’, ‘my_kafka_topic’,) with kafka_watcher:
# do stuff that publishes messages to ‘my_kafka_topic’self.assertEqual(kafka_watcher.count, expected_number_of_new_messages) self.assertDataFrameEqual(kafka_watcher.df, expected_df)
-
class
sparkly.testing.
MysqlFixture
(host, user, password=None, data=None, teardown=None)[source]¶ Fixture for mysql integration tests.
Notes
- depends on PyMySql lib.
Examples
>>> class MyTestCase(SparklyTest): ... fixtures = [ ... MysqlFixture('mysql.host', 'user', 'password', '/path/to/data.sql') ... ] ... def test(self): ... pass ...
-
class
sparkly.testing.
SparklyGlobalSessionTest
(methodName='runTest')[source]¶ Base test case that keeps a single instance for the given session class across all tests.
Integration tests are slow, especially when you have to start/stop Spark context for each test case. This class allows you to reuse Spark session across multiple test cases.
-
class
sparkly.testing.
SparklyTest
(methodName='runTest')[source]¶ Base test for spark scrip tests.
Initialize and shut down Session specified in session attribute.
Example
>>> from pyspark.sql import types as T >>> class MyTestCase(SparklyTest): ... def test(self): ... self.assertRowsEqual( ... self.spark.sql('SELECT 1 as one').collect(), ... [T.Row(one=1)], ... )
-
assertDataFrameEqual
(actual_df, expected_data, fields=None, ordered=False)[source]¶ Ensure that DataFrame has the right data inside.
assertDataFrameEqual
is being deprecated. Please useassertRowsEqual
instead.Parameters: - actual_df (pyspark.sql.DataFrame|list[pyspark.sql.Row]) – Dataframe to test data in.
- expected_data (list[dict]) – Expected dataframe rows defined as dicts.
- fields (list[str]) – Compare only certain fields.
- ordered (bool) – Does order of rows matter?
-
assertRowsEqual
(first, second, msg=None, ignore_order=True, ignore_order_depth=None, atol=0, rtol=1e-07, equal_nan=True, ignore_nullability=True)[source]¶ Assert equal on steroids.
Extend this classic function signature to work better with comparisons involving rows, datatypes, dictionaries, lists and floats by:
- ignoring the order of lists and datatypes recursively,
- comparing floats within a given tolerance,
- assuming NaNs are equal,
- ignoring the nullability requirements of datatypes (since Spark can be inaccurate when inferring it),
- providing better diffs for rows and datatypes.
Float comparisons are inspired by NumPy’s
assert_allclose
. The main formula used is| float1 - float2 | <= atol + rtol * float2
.Parameters: - first – see
unittest.TestCase.assertEqual
. - second – see
unittest.TestCase.assertEqual
. - msg – see
unittest.TestCase.assertEqual
. - ignore_order (bool|True) – ignore the order in lists and datatypes (rows, dicts are inherently orderless).
- ignore_order_depth (int|None) – if ignore_order is true, do ignore order up to this level of nested lists or datatypes (exclusive). Setting this to 0 or None means ignore order infinitely, 1 means ignore order only at the top level, 2 will ignore order within lists of lists and so on. Default is ignore order arbitrarily deep.
- atol (int, float|0) – Absolute tolerance in float comparisons.
- rtol (int, float|1e-07) – Relative tolerance in float comparisons.
- equal_nan (bool|True) – If set, NaNs will compare equal.
- ignore_nullability (bool|True) – If set, ignore all
nullability fields in dataTypes. This includes
containsNull
in arrays,valueContainsNull
in maps andnullable
in struct fields.
Returns: None iff the two objects are equal.
- Raises
- AssertionError: iff the two objects are not equal. See
unittest.TestCase.assertEqual
for details.
-
session
¶ alias of
SparklySession
-