Spark vs. MapReduce Series – Part I – Get some data

With this series I will try to show how to use Cassandra for storing data and how to use MapReduce or Spark to analyse data. First I have to store data in a Apache Cassandra database and my decision was storing tweets. Sounds easy but with CQL you have to think about your data model and how you want to store data. Unlike using Thrift, what goes away, you can’t store data without having any idea of what it will look like.

I used python to store tweets in my database and hitting some issues storing data directly using CQL INSERT commands, cause they will produce a lot of code lines and you have to ensure every data format exactly. Talking to Jon Haddad who´s a Technical Evangelist here at DataStax pointed me to CQLEngine. CQLEngine is a kind of CQL wrapper and allowed me to create a data model directly in my python code. So I used cassandra, tweepy, json and cqlengine to build some code (note that you have to create a Keyspace in Cassandra before, based on your needs):

 

from cassandra.cluster import Cluster
import tweepy
import json

from cqlengine import columns
from cqlengine.models import Model
from cqlengine import connection
from cqlengine.management import sync_table

class Tweet(Model):
	truncated = columns.Boolean()
	text = columns.Text()
	in_reply_to_status_id = columns.BigInt()
	tweet_id = columns.BigInt(primary_key=True)
	favorite_count = columns.Integer()
	source = columns.Text()
	retweeted = columns.Boolean()
	entities = columns.List(columns.Text)
	in_reply_to_screen_name = columns.Text()
	id_str = columns.Text()
	retweet_count = columns.Integer()
	in_reply_to_user_id = columns.BigInt()
	favorited = columns.Boolean()
	user = columns.Map(columns.Bytes, columns.Bytes)
	geo = columns.Integer()
	in_reply_to_user_id_str = columns.Text()
	possibly_sensitive = columns.Boolean()
	lang = columns.Text()
	created_at = columns.Text()
	filter_level = columns.Text()
	in_reply_to_status_id_str = columns.Text()
	place = columns.Map(columns.Text, columns.Text)

class myListener(tweepy.StreamListener):
 def on_data(self, data):
      twdata = json.loads(data)
      print "write tweet into cassandra..."
      print twdata
      stored = Tweet.create(
          truncated=twdata['truncated'],
          text=twdata['text'],
          in_reply_to_status_id=twdata['in_reply_to_status_id'],
          tweet_id=twdata['id'],
          favorite_count=twdata['favorite_count'],
          source=twdata['source'],
          retweeted=twdata['retweeted'],
          entities=twdata['entities'], 
          in_reply_to_screen_name=twdata['in_reply_to_screen_name'],
          id_str=twdata['id_str'],
          retweet_count=twdata['retweet_count'],
          in_reply_to_user_id=twdata['in_reply_to_user_id'],
          favorited=twdata['favorited'],
          created_at=twdata['created_at']
          )
      stored.save()
      
 def on_error(self, status):
  print status

connection.setup(['YOURDSESERVER'], 'KEYSPACE', protocol_version=1)
sync_table(Tweet)

consumer_key = 'YOURKEY'
consumer_secret = 'YOURSECRET'
access_token = 'YOURTOKEN'
access_token_secret = 'YOURTOKENSECRET'

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)


listen = myListener()
stream = tweepy.Stream(auth, listen)
stream.filter(track=['hadoop', 'bigdata', 'cassandra', 'nosql'])

 

If needed you can find the description of the data types and entities here: https://dev.twitter.com/docs

This script will now pump the tweets into your cassandra database. You can control the values with the DevCenter by selecting all values of your table.

 

DevCenter Screenshot

So with this data we are now able to do some map reduce or spark tasks on. Part II will have a look on MapReduce first…

 

Leave a Reply