Data Pull Started at 2019-07-22 23:13:09

Scheduling a Rmd file with embedded ETL code is really helpful. It allows the data engineer to document the process side-by-side with the code. There is also a generated artefact that is produced (besides the data file) for each run of the process.

Code

This python code uses the tweepy, nltk, and feather modules to pull data from twitter, cleanse the data, and dump a file with new data that can be picked up by downstream processes.

Credentials for the twitter API are kept seperate from the ETL code and pushed to the server when this Rmd file is deployed. This allows us to share the Rmd file and ETL code without sharing credentials.

import tweepy
import feather
from collections import Counter
import sys
from nltk.corpus import stopwords
from nltk.tokenize import TweetTokenizer
from collections import defaultdict
import pandas as pd
import feather
import string
import os

reload(sys)
sys.setdefaultencoding('utf-8')

 
# Define Tokenizer and Stop Words
tknzr = TweetTokenizer() 
punctuation = list(string.punctuation)
stop = stopwords.words('english') + punctuation

 
def preprocess(s):
    tokens = tknzr.tokenize(s)
    # remove stop words, tiny urls and force to lowercase 
    tokens = [token.lower() for token in tokens if not token.find('https') >= 0 and token not in stop]
    return tokens


#-------------------
# Begin 
#---------------------

#set up auth (reading in from a shared file)
cred = feather.read_dataframe("cred.feather")

consumer_key = cred["consumer_key"][0]
consumer_secret = cred["consumer_secret"][0]
access_token = cred["access_token"][0]
access_token_secret = cred["access_token_key"][0]

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

#initialize variables
res = ""
results = []

#make API call
rstats = tweepy.Cursor(api.search, q='#rstats', lang='en').items(100)


# concatenate unique tweets
for r in rstats:
  if r.text not in results:
      t = r.text
      t=t.decode('utf-8','ignore').encode("utf-8")
      res = res + " " + t
      results.append(r.text)

#process
c = Counter()
words = preprocess(res)
for word in words:
    c[word] += 1

# write  results to feather file
df = pd.DataFrame.from_dict(c, orient='index').reset_index()


os.remove("/tmp_shared/data.feather")
feather.write_dataframe(df, "/tmp_shared/data.feather")
## /usr/local/lib/python2.7/dist-packages/urllib3/util/ssl_.py:339: SNIMissingWarning: An HTTPS request has been made, but the SNI (Subject Name Indication) extension to TLS is not available on this platform. This may cause the server to present an incorrect TLS certificate, which can cause validation failures. You can upgrade to a newer version of Python to solve this. For more information, see https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
##   SNIMissingWarning
## /usr/local/lib/python2.7/dist-packages/urllib3/util/ssl_.py:137: InsecurePlatformWarning: A true SSLContext object is not available. This prevents urllib3 from configuring SSL appropriately and may cause certain SSL connections to fail. You can upgrade to a newer version of Python to solve this. For more information, see https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
##   InsecurePlatformWarning

Log A Success

In addition to the compiled Rmd file, the Connect logs, and the feather file with the data, we’ll also create a plain text log file on the shared drive. The downstream app listens for changes to this file so a user can easily view the run history from the dashboard.

system("date | cat - /tmp_shared/log.txt > temp && mv temp /tmp_shared/log.txt")