-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathfb_posts_realtime.py
35 lines (28 loc) · 1.25 KB
/
fb_posts_realtime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from kafka import KafkaProducer
def init_kafka(port):
producer = KafkaProducer(bootstrap_servers=port)
producer.send('test', b'another_message').get(timeout=60)
return producer
def get_as_json(items):
message= {"status_id": items[0], "status_message": items[1], "link_name": items[2], "status_type": items[3],
"status_link": items[4], "status_published": items[5], "num_reactions": items[6], "num_comments": items[7],
"num_shares": items[8], "num_likes": items[9], "num_loves": items[10], "from_id": items[11]}
return message
def serialize(items):
from avro import schema, io
import io as io2
schema_path = "data/files/fb_scheam.avsc"
schema = schema.Parse(open(schema_path).read())
writer = io.DatumWriter(schema)
bytes_writer = io2.BytesIO()
encoder = io.BinaryEncoder(bytes_writer)
# There must be a better way of writing this item that isn't so long
print(get_as_json(items))
writer.write(get_as_json(items), encoder)
raw_bytes = bytes_writer.getvalue()
return raw_bytes
def send_message(producer, message, page_id):
message_data = serialize(message)
my_str_as_bytes = str.encode(page_id)
producer.send('fb', key=my_str_as_bytes, value=message_data)
return "message sent"