Server Log Correlation with Cassandra Data
Repeat the server log correlation question, using Spark and getting the input data from the Cassandra table you populated in the last assignment.
Your program should be called
correlate_logs_cassandra.py and take command line arguments for the input keyspace and table name. As before, simply
spark-submit … correlate_logs_cassandra.py <userid> nasalogs
This should be as simple as combining pieces from the Cassandra instructions with your previous implementation of the correlation calculation.
Working With Relational Data
For this question, we will look at the TPC-H data set, which is a benchmark data set for relational databases. Since it's designed for relational databases, the assumption is that we're going to
JOIN a lot. CQL doesn't have a
JOIN operation, so we're going to have a mismatch.
The data sets we have available are
/courses/732/tpch-1 (a scale factor 0.2 data set, 200MB uncompressed),
tpch-2 (scale factor 0.5, 500MB),
tpch-3 (scale factor 3.0, 3GB), and
tpch-4 (scale factor 6.0, 6GB). These were generated with tpch-dbgen.
You will find these data sets loaded to our (reliable) Cassandra cluster in the keyspaces
tpch4. The tables there were created with these create tables statements.
For this problem, we imagine that we want to frequently display a particular order with the names of the parts that were ordered. The SQL to get that would be something like:
SELECT o.*, p.name FROM orders o JOIN lineitem l ON orderkey JOIN part p ON partkey WHERE …
Our command line will take the input keyspace, the output directory, and several orderkey values from the data set. You can get those like this:
keyspace = sys.argv outdir = sys.argv orderkeys = sys.argv[3:]
The command will be like this:
time spark-submit --packages datastax:spark-cassandra-connector:2.4.0-s_2.11 tpch_orders_df.py tpch1 output-1 151201 986499 28710 193734 810689
We want to produce a summary of each of the orders specified by the orderkey. We want the orderkey, the totalprice (a field on the orders table), and a list of the names of the parts ordered. We'll output each order (on a line in text file(s) in the output directory given on the command line) with the part names comma separated like this:
Order #28710 $43012.31: cream hot dodger peru green, deep firebrick slate dim misty, midnight coral antique misty spring Order #151201 $193245.63: blue papaya pink plum grey, chartreuse lace almond linen peru, cream honeydew cyan bisque light, dark coral dim maroon salmon, medium lemon aquamarine violet olive Order #193734 $88031.31: drab papaya spring burnished royal, rosy salmon aquamarine lavender chocolate Order #810689 $276159.84: almond cornflower black lemon burnished, lemon indian azure orange steel, linen ivory antique spring powder, red wheat navy floral dodger, royal turquoise goldenrod coral steel, turquoise yellow sky royal peach, white puff pink cornflower lemon Order #986499 $44583.01: peru khaki coral rose midnight
The actual output will be relatively small: we will give no more than ten or so orderkeys, and each order in the data set has at most seven parts. The lines should be sorted by order number; the part names within the line should be alphabetical.
Our cluster has dynamic allocation enabled by default: your jobs get more executors if there are tasks queued, and give them up if they are idle. In order to place nicely on the cluster, please include this in your SparkConf object for code in this question:
spark = SparkSession.….config('spark.dynamicAllocation.maxExecutors', 16).getOrCreate()
Attempt #1: Let Spark Do It
For our first attempt, we will fall back on old habits: we can use the spark-cassandra-connector to create DataFrames of the tables we need. Name this program
Create DataFrames from the Cassandra data, and use Spark to join the tables and extract the data you need. In this scenario, Cassandra is just providing the data we need for the calculations done in Spark.
Once you have the data joined appropriately, notice the aggregate function collect_set to get all of the item names in a single row. Then sort by orderkey, switch to an RDD, and you can use a function like this to produce the required output format:
def output_line(orderkey, price, names): namestr = ', '.join(sorted(list(names))) return 'Order #%d $%.2f: %s' % (orderkey, price, namestr)
Why So Fast?
If you run this on the
tpch4 keyspace, it probably runs in about a minute.
That doesn't sound right: you likely created DataFrames for the lineitem, part, and orders tables. The lineitem table alone in
tpch4 contains 36M records.
Have a look at the execution plan for your final DataFrame in this problem and figure out why this code runs so fast. Hint: your DataFrames are actually small and you should see “PushedFilters” in your execution plan: if not, you have done something to defeat the optimizer. [❓]
Reshape The Data
The real problem here is that the data is in the wrong shape for Cassandra. Cassandra doesn't expect foreign keys and joins, and it's not good at dealing with them. All of the above was an attempt to force Cassandra to behave like a relational database, with Spark picking up the slack.
If we needed to do this query often (maybe as part of a production system), this isn't a reasonable way to do it.
The solution is to reshape our data so it can be efficiently queried for the results we need. Doing this requires us to know what queries we're going to do in the future, but we do for this question.
Denormalize The Data
Cassandra has a set data type: we can use it to store the part names in the order table, so they're right there when we need them.
In your own keyspace, create a table
orders_parts that has all of the same columns as the original
orders table, plus a set of the part names for this order: [❓]
CREATE TABLE orders_parts ( ⋮ part_names set<text>, ⋮ );
Create a program
tpch_denormalize.py that copies all of the data from the
orders table and adds the
part_names column (containing part names as we found in the previous part) as it's inserted. Your program should take two arguments: the input keyspace (with the TPC-H data), and the output keyspace (where the
orders_parts table will be populated).
This will be an expensive, but one-time operation: try it on the
tpch2 input so we can compare the running times.
We should be able to query the data we need quickly once it's done…
Attempt #2: Now Select What You Need
Repeat the above problem (giving your keyspace instead of
tpch, since that's where the data is) and select the relevant data from the
orders_parts table. Name this program
You should give the same output as above. There will be very little Spark work here: really just fetching the Cassandra data and outputting. [❓]
In a text file
answers.txt, answer these questions:
- What did you see in the execution plan for the “join in Spark” solution? Why was the execution so fast (and the memory usage so small)?
- What was the
CREATE TABLEstatement you used for the
- What were the running times of the two
tpch_orders_*programs on the
tpch2data on the cluster? These orderkeys have results in that data set:
2579142 2816486 586119 441985 2863331.
- Consider the logic that you would have to implement to maintain the denormalized data (assuming that the
orderstable had the
part_namescolumn in the main data set). Write a few sentences on what you'd have to do when inserting/updating/deleting data in this case.
Submit your files to the CourSys activity Assignment 8.