Scheduling a Rmd file with embedded ETL code is really helpful. It allows the data engineer to document the process side-by-side with the code. There is also a generated artefact that is produced (besides the data file) for each run of the process.
This python code uses the tweepy
, nltk
, and feather
modules to pull data from twitter, cleanse the data, and dump a file with new data that can be picked up by downstream processes.
Credentials for the twitter API are kept seperate from the ETL code and pushed to the server when this Rmd file is deployed. This allows us to share the Rmd file and ETL code without sharing credentials.
import tweepy
import feather
from collections import Counter
import sys
from nltk.corpus import stopwords
from nltk.tokenize import TweetTokenizer
from collections import defaultdict
import pandas as pd
import feather
import string
import os
reload(sys)
sys.setdefaultencoding('utf-8')
# Define Tokenizer and Stop Words
tknzr = TweetTokenizer()
punctuation = list(string.punctuation)
stop = stopwords.words('english') + punctuation
def preprocess(s):
tokens = tknzr.tokenize(s)
# remove stop words, tiny urls and force to lowercase
tokens = [token.lower() for token in tokens if not token.find('https') >= 0 and token not in stop]
return tokens
#-------------------
# Begin
#---------------------
#set up auth (reading in from a shared file)
cred = feather.read_dataframe("cred.feather")
consumer_key = cred["consumer_key"][0]
consumer_secret = cred["consumer_secret"][0]
access_token = cred["access_token"][0]
access_token_secret = cred["access_token_key"][0]
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
#initialize variables
res = ""
results = []
#make API call
rstats = tweepy.Cursor(api.search, q='#rstats', lang='en').items(100)
# concatenate unique tweets
for r in rstats:
if r.text not in results:
t = r.text
t=t.decode('utf-8','ignore').encode("utf-8")
res = res + " " + t
results.append(r.text)
#process
c = Counter()
words = preprocess(res)
for word in words:
c[word] += 1
# write results to feather file
df = pd.DataFrame.from_dict(c, orient='index').reset_index()
os.remove("/tmp_shared/data.feather")
feather.write_dataframe(df, "/tmp_shared/data.feather")
## /usr/local/lib/python2.7/dist-packages/urllib3/util/ssl_.py:339: SNIMissingWarning: An HTTPS request has been made, but the SNI (Subject Name Indication) extension to TLS is not available on this platform. This may cause the server to present an incorrect TLS certificate, which can cause validation failures. You can upgrade to a newer version of Python to solve this. For more information, see https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
## SNIMissingWarning
## /usr/local/lib/python2.7/dist-packages/urllib3/util/ssl_.py:137: InsecurePlatformWarning: A true SSLContext object is not available. This prevents urllib3 from configuring SSL appropriately and may cause certain SSL connections to fail. You can upgrade to a newer version of Python to solve this. For more information, see https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
## InsecurePlatformWarning
In addition to the compiled Rmd file, the Connect logs, and the feather file with the data, we’ll also create a plain text log file on the shared drive. The downstream app listens for changes to this file so a user can easily view the run history from the dashboard.
system("date | cat - /tmp_shared/log.txt > temp && mv temp /tmp_shared/log.txt")