Skip to content

Commit

Permalink
Covid19 collection
Browse files Browse the repository at this point in the history
* TwitterStream collection for covid-19 stream

* WIP: covid19 eventsapi

* Covid SQL query fix and docker version change

* Handle null exception for tweet user object

* Refactored code to restart closed partition connections

* Changing fatal to panic for restarting partitions. Changed some log msgs

* Increasing buffer size of request and error handling
  • Loading branch information
rylo5688 authored Oct 22, 2020
1 parent 6060243 commit 19e6da7
Show file tree
Hide file tree
Showing 22 changed files with 4,623 additions and 6 deletions.
2 changes: 1 addition & 1 deletion EventsAPI/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
all: jar

TAG_CLIENT = 1.0.32
TAG_CLIENT = 1.0.33
PROJECT_NAME = eventsapi


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum Status {
}

private enum MatchKey {
KEYWORDS("tweets"), FOLLOWS("tweets-follow");
KEYWORDS("tweets"), FOLLOWS("tweets-follow"), COVID19("tweets-covid19");
private final String kafkaTopic;
private MatchKey(String kafkaTopic) {
this.kafkaTopic = kafkaTopic;
Expand Down Expand Up @@ -145,6 +145,9 @@ public void setMatchKey(String match_key) {
case "FOLLOWS":
this.matchKey = MatchKey.FOLLOWS;
break;
case "COVID19":
this.matchKey = MatchKey.COVID19;
break;
case "KEYWORDS":
default:
this.matchKey = MatchKey.KEYWORDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public List<Event> getEvents(String eventType) {
"FROM events e INNER JOIN follows j ON j.event_name = e.normalized_name " +
"ORDER BY e.normalized_name";
break;
case "covid19":
query = "SELECT e.name e_name, e.author e_author, e.normalized_name e_normalized_name, e.description e_description, e.status e_status, e.created_at e_created_at, '' AS j_key " +
"FROM events e INNER JOIN covid19 j ON j.event_name = e.normalized_name " +
"ORDER BY e.normalized_name";
break;
case "keywords":
default:
query = "SELECT e.name e_name, e.author e_author, e.normalized_name e_normalized_name, e.description e_description, e.status e_status, e.created_at e_created_at, j.keyword j_key " +
Expand Down Expand Up @@ -94,6 +99,7 @@ public List<Event> getActiveEvents() {
List<Event> ret = new ArrayList<>();
ret.addAll(getActiveKeywordEvents());
ret.addAll(getActiveFollowEvents());
ret.addAll(getActiveCovid19Event());
return ret;
}

Expand Down Expand Up @@ -143,13 +149,39 @@ public List<Event> getActiveFollowEvents() {
));
}

public List<Event> getActiveCovid19Event() {
return postgres.withHandle(handle -> new ArrayList<>(handle.createQuery(
"SELECT e.name e_name, e.author e_author, e.normalized_name e_normalized_name, e.description e_description, e.status e_status, e.created_at e_created_at " +
"FROM events e INNER JOIN covid19 c ON c.event_name = e.normalized_name " +
"WHERE e.status = :activeStatus " +
"ORDER BY e.normalized_name")
.bind("activeStatus", Event.Status.ACTIVE.toString())
.registerRowMapper(BeanMapper.factory(Event.class, "e"))
.registerRowMapper(BeanMapper.factory(String.class, "c"))
.reduceRows(new LinkedHashMap<String, Event>(),
(map, rowView) -> {
Event event = map.computeIfAbsent(
rowView.getColumn("e_normalized_name", String.class),
id -> rowView.getRow(Event.class)
);
event.appendKeywords("");
event.setMatchKey("covid19");
return map;
})
.values()
));
}

public ExtendedEvent getEvent(String normalizedName, String eventType) {
String keywordQuery;

switch(eventType.toLowerCase()) {
case "follows":
keywordQuery = "SELECT follow FROM follows WHERE event_name=:normalizedName ORDER BY follow";
break;
case "covid19":
keywordQuery = "SELECT '' as term FROM covid19 WHERE event_name=:normalizedName ORDER BY term";
break;
case "keywords":
default:
keywordQuery = "SELECT keyword FROM keywords WHERE event_name=:normalizedName ORDER BY keyword";
Expand Down
7 changes: 7 additions & 0 deletions TwitterStreamCovid19/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.idea
main
mainmac
app.sh
keywords.txt
pkg
vendor/
4 changes: 4 additions & 0 deletions TwitterStreamCovid19/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM scratch
ADD ca-certificates.crt /etc/ssl/certs/
ADD main /
CMD ["/main"]
40 changes: 40 additions & 0 deletions TwitterStreamCovid19/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
all: build
push: push
.PHONY: push build

TAG = 1.0.1
PROJECT_NAME = twitter-stream-covid19

build: main ca-certificates.crt
docker build -t projectepic/$(PROJECT_NAME) .
docker tag projectepic/$(PROJECT_NAME) projectepic/$(PROJECT_NAME):$(TAG)

push: build
docker push projectepic/$(PROJECT_NAME)
docker push projectepic/$(PROJECT_NAME):$(TAG)

main: src/TwitterStreamCovid19/TwitterStreamCovid19.go
cd src/TwitterStreamCovid19/ && GOPATH="$$(pwd)/../../" dep ensure
cd src/TwitterStreamCovid19/ && GOPATH="$$(pwd)/../../" CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o ../../main TwitterStreamCovid19.go

mainmac: src/TwitterStreamCovid19/TwitterStreamCovid19.go
cd src/TwitterStreamCovid19/ && GOPATH="$$(pwd)/../../" dep ensure
cd src/TwitterStreamCovid19/ && GOPATH="$$(pwd)/../../" CGO_ENABLED=0 GOOS=darwin go build -a -installsuffix cgo -o ../../mainmac TwitterStreamCovid19.go

runmac: mainmac
./mainmac

run: mainmac
./main

ca-certificates.crt:
curl --remote-name --time-cond cacert.pem https://curl.haxx.se/ca/cacert.pem
mv cacert.pem ca-certificates.crt

clean:
docker rmi projectepic/$(PROJECT_NAME):$(TAG) || :
docker rmi projectepic/$(PROJECT_NAME) || :
rm ca-certificates.crt
rm main
rm mainmac

20 changes: 20 additions & 0 deletions TwitterStreamCovid19/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Twitter streaming API Client

Client that watches a specific file for userids and connects to the filter Streaming API.

All tweets are forwarded to an specified kafka queue.


## Run locally

Required: GoLang, `dep`

- `cd src/TwitterStreamCovid19/ && GOPATH="$(pwd)/../../" dep ensure`
- `cp app.sh.template app.sh`
- Modify Twitter credentials
- `chmod +x app.sh`
- `./app.sh`

## Deploy to Docker

- `make push`
7 changes: 7 additions & 0 deletions TwitterStreamCovid19/app.sh.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

export TWITTER_CONSUMER_API_KEY="REPLACE"
export TWITTER_CONSUMER_API_SECRET="REPLACE"

GOPATH="$(pwd)" go run src/TwitterStreamCovid19/TwitterStreamCovid19.go

Loading

0 comments on commit 19e6da7

Please sign in to comment.