Skip to content

Commit

Permalink
Merge pull request #167 from scrapinghub/codecs-setting
Browse files Browse the repository at this point in the history
kafka codec option, docs update
  • Loading branch information
sibiryakov authored Jun 24, 2016
2 parents 0aee6d6 + f13d58f commit 39aa31b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 14 deletions.
11 changes: 11 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ Default: ``5.0``

Time process should block until requested amount of data will be received from message bus.

.. setting:: KAFKA_CODEC_LEGACY

KAFKA_CODEC_LEGACY
------------------

Default: ``KAFKA_CODEC_LEGACY``

Kafka-python 0.x version codec, could be one of ``CODEC_NONE``, ``CODEC_SNAPPY`` or ``CODEC_GZIP``,
imported from ``kafka.protocol``.


.. setting:: LOGGING_CONFIG

LOGGING_CONFIG
Expand Down
6 changes: 3 additions & 3 deletions docs/source/topics/production-broad-crawling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Now, let's create a Frontera workers settings file under ``frontera`` subfolder
#--------------------------------------------------------
# Url storage
#--------------------------------------------------------
BACKEND = 'distributed_frontera.contrib.backends.hbase.HBaseBackend'
BACKEND = 'frontera.contrib.backends.hbase.HBaseBackend'
HBASE_DROP_ALL_TABLES = False
HBASE_THRIFT_PORT = 9090
HBASE_THRIFT_HOST = 'localhost'
Expand All @@ -98,7 +98,7 @@ Configure Frontera spiders
==========================
Next step is to create Frontera settings file and point Scrapy to it.::

from distributed_frontera.settings.default_settings import MIDDLEWARES
from frontera.settings.default_settings import MIDDLEWARES

MAX_NEXT_REQUESTS = 256

Expand All @@ -110,7 +110,7 @@ Next step is to create Frontera settings file and point Scrapy to it.::
#--------------------------------------------------------
# Crawl frontier backend
#--------------------------------------------------------
BACKEND = 'distributed_frontera.backends.remote.KafkaOverusedBackend'
BACKEND = 'frontera.backends.remote.messagebus.MessageBusBackend'

#--------------------------------------------------------
# Logging
Expand Down
21 changes: 13 additions & 8 deletions frontera/contrib/messagebus/kafkabus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from kafka import KafkaClient, SimpleConsumer, KeyedProducer as KafkaKeyedProducer, SimpleProducer as KafkaSimpleProducer
from kafka.common import BrokerResponseError, MessageSizeTooLargeError
from kafka.protocol import CODEC_SNAPPY

from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner
from frontera.contrib.messagebus.kafka import OffsetsFetcher
Expand Down Expand Up @@ -69,13 +68,14 @@ def get_offset(self):


class SimpleProducer(BaseStreamProducer):
def __init__(self, connection, topic):
def __init__(self, connection, topic, codec):
self._connection = connection
self._topic = topic
self._codec = codec
self._create()

def _create(self):
self._producer = KafkaSimpleProducer(self._connection, codec=CODEC_SNAPPY)
self._producer = KafkaSimpleProducer(self._connection, codec=self._codec)

def send(self, key, *messages):
self._producer.send_messages(self._topic, *messages)
Expand All @@ -91,16 +91,17 @@ def get_offset(self, partition_id):


class KeyedProducer(BaseStreamProducer):
def __init__(self, connection, topic_done, partitioner_cls):
def __init__(self, connection, topic_done, partitioner_cls, codec):
self._prod = None
self._conn = connection
self._topic_done = topic_done
self._partitioner_cls = partitioner_cls
self._codec = codec

def _connect_producer(self):
if self._prod is None:
try:
self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=CODEC_SNAPPY)
self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=self._codec)
except BrokerResponseError:
self._prod = None
logger.warning("Could not connect producer to Kafka server")
Expand Down Expand Up @@ -143,9 +144,10 @@ def __init__(self, messagebus):
self._db_group = messagebus.general_group
self._sw_group = messagebus.sw_group
self._topic_done = messagebus.topic_done
self._codec = messagebus.codec

def producer(self):
return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner)
return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner, self._codec)

def consumer(self, partition_id, type):
"""
Expand All @@ -166,6 +168,7 @@ def __init__(self, messagebus):
self._max_next_requests = messagebus.max_next_requests
self._hostname_partitioning = messagebus.hostname_partitioning
self._offset_fetcher = OffsetsFetcher(self._conn, self._topic, self._general_group)
self._codec = messagebus.codec

def consumer(self, partition_id):
return Consumer(self._conn, self._topic, self._general_group, partition_id)
Expand All @@ -180,20 +183,21 @@ def available_partitions(self):

def producer(self):
partitioner = Crc32NamePartitioner if self._hostname_partitioning else FingerprintPartitioner
return KeyedProducer(self._conn, self._topic, partitioner)
return KeyedProducer(self._conn, self._topic, partitioner, self._codec)


class ScoringLogStream(BaseScoringLogStream):
def __init__(self, messagebus):
self._topic = messagebus.topic_scoring
self._group = messagebus.general_group
self._conn = messagebus.conn
self._codec = messagebus.codec

def consumer(self):
return Consumer(self._conn, self._topic, self._group, partition_id=None)

def producer(self):
return SimpleProducer(self._conn, self._topic)
return SimpleProducer(self._conn, self._topic, self._codec)


class MessageBus(BaseMessageBus):
Expand All @@ -207,6 +211,7 @@ def __init__(self, settings):
self.spider_partition_id = settings.get('SPIDER_PARTITION_ID')
self.max_next_requests = settings.MAX_NEXT_REQUESTS
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.codec = settings.get('KAFKA_CODEC_LEGACY')

self.conn = KafkaClient(server)

Expand Down
4 changes: 3 additions & 1 deletion frontera/settings/default_settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta
import logging
from kafka.protocol import CODEC_NONE


AUTO_START = True
BACKEND = 'frontera.contrib.backends.memory.FIFO'
Expand All @@ -19,6 +20,7 @@
HBASE_STATE_CACHE_SIZE_LIMIT = 3000000
HBASE_QUEUE_TABLE = 'queue'
KAFKA_GET_TIMEOUT = 5.0
KAFKA_CODEC_LEGACY = CODEC_NONE
MAX_NEXT_REQUESTS = 64
MAX_REQUESTS = 0
MESSAGE_BUS = 'frontera.contrib.messagebus.zeromq.MessageBus'
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@
'msgpack-python'
],
'kafka': [
'kafka-python<=0.9.5',
'python-snappy'
'kafka-python<=0.9.5'
],
'distributed': [
'Twisted'
Expand Down

0 comments on commit 39aa31b

Please sign in to comment.