diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index f25bba499..f8306ebe2 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -140,6 +140,17 @@ Default: ``5.0`` Time process should block until requested amount of data will be received from message bus. +.. setting:: KAFKA_CODEC_LEGACY + +KAFKA_CODEC_LEGACY +------------------ + +Default: ``KAFKA_CODEC_LEGACY`` + +Kafka-python 0.x version codec, could be one of ``CODEC_NONE``, ``CODEC_SNAPPY`` or ``CODEC_GZIP``, +imported from ``kafka.protocol``. + + .. setting:: LOGGING_CONFIG LOGGING_CONFIG diff --git a/docs/source/topics/production-broad-crawling.rst b/docs/source/topics/production-broad-crawling.rst index 51b7e40d8..9208be22b 100644 --- a/docs/source/topics/production-broad-crawling.rst +++ b/docs/source/topics/production-broad-crawling.rst @@ -73,7 +73,7 @@ Now, let's create a Frontera workers settings file under ``frontera`` subfolder #-------------------------------------------------------- # Url storage #-------------------------------------------------------- - BACKEND = 'distributed_frontera.contrib.backends.hbase.HBaseBackend' + BACKEND = 'frontera.contrib.backends.hbase.HBaseBackend' HBASE_DROP_ALL_TABLES = False HBASE_THRIFT_PORT = 9090 HBASE_THRIFT_HOST = 'localhost' @@ -98,7 +98,7 @@ Configure Frontera spiders ========================== Next step is to create Frontera settings file and point Scrapy to it.:: - from distributed_frontera.settings.default_settings import MIDDLEWARES + from frontera.settings.default_settings import MIDDLEWARES MAX_NEXT_REQUESTS = 256 @@ -110,7 +110,7 @@ Next step is to create Frontera settings file and point Scrapy to it.:: #-------------------------------------------------------- # Crawl frontier backend #-------------------------------------------------------- - BACKEND = 'distributed_frontera.backends.remote.KafkaOverusedBackend' + BACKEND = 'frontera.backends.remote.messagebus.MessageBusBackend' #-------------------------------------------------------- # Logging diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index f31a788e7..b650ca0a3 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -5,7 +5,6 @@ from kafka import KafkaClient, SimpleConsumer, KeyedProducer as KafkaKeyedProducer, SimpleProducer as KafkaSimpleProducer from kafka.common import BrokerResponseError, MessageSizeTooLargeError -from kafka.protocol import CODEC_SNAPPY from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.kafka import OffsetsFetcher @@ -69,13 +68,14 @@ def get_offset(self): class SimpleProducer(BaseStreamProducer): - def __init__(self, connection, topic): + def __init__(self, connection, topic, codec): self._connection = connection self._topic = topic + self._codec = codec self._create() def _create(self): - self._producer = KafkaSimpleProducer(self._connection, codec=CODEC_SNAPPY) + self._producer = KafkaSimpleProducer(self._connection, codec=self._codec) def send(self, key, *messages): self._producer.send_messages(self._topic, *messages) @@ -91,16 +91,17 @@ def get_offset(self, partition_id): class KeyedProducer(BaseStreamProducer): - def __init__(self, connection, topic_done, partitioner_cls): + def __init__(self, connection, topic_done, partitioner_cls, codec): self._prod = None self._conn = connection self._topic_done = topic_done self._partitioner_cls = partitioner_cls + self._codec = codec def _connect_producer(self): if self._prod is None: try: - self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=CODEC_SNAPPY) + self._prod = KafkaKeyedProducer(self._conn, partitioner=self._partitioner_cls, codec=self._codec) except BrokerResponseError: self._prod = None logger.warning("Could not connect producer to Kafka server") @@ -143,9 +144,10 @@ def __init__(self, messagebus): self._db_group = messagebus.general_group self._sw_group = messagebus.sw_group self._topic_done = messagebus.topic_done + self._codec = messagebus.codec def producer(self): - return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner) + return KeyedProducer(self._conn, self._topic_done, FingerprintPartitioner, self._codec) def consumer(self, partition_id, type): """ @@ -166,6 +168,7 @@ def __init__(self, messagebus): self._max_next_requests = messagebus.max_next_requests self._hostname_partitioning = messagebus.hostname_partitioning self._offset_fetcher = OffsetsFetcher(self._conn, self._topic, self._general_group) + self._codec = messagebus.codec def consumer(self, partition_id): return Consumer(self._conn, self._topic, self._general_group, partition_id) @@ -180,7 +183,7 @@ def available_partitions(self): def producer(self): partitioner = Crc32NamePartitioner if self._hostname_partitioning else FingerprintPartitioner - return KeyedProducer(self._conn, self._topic, partitioner) + return KeyedProducer(self._conn, self._topic, partitioner, self._codec) class ScoringLogStream(BaseScoringLogStream): @@ -188,12 +191,13 @@ def __init__(self, messagebus): self._topic = messagebus.topic_scoring self._group = messagebus.general_group self._conn = messagebus.conn + self._codec = messagebus.codec def consumer(self): return Consumer(self._conn, self._topic, self._group, partition_id=None) def producer(self): - return SimpleProducer(self._conn, self._topic) + return SimpleProducer(self._conn, self._topic, self._codec) class MessageBus(BaseMessageBus): @@ -207,6 +211,7 @@ def __init__(self, settings): self.spider_partition_id = settings.get('SPIDER_PARTITION_ID') self.max_next_requests = settings.MAX_NEXT_REQUESTS self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') + self.codec = settings.get('KAFKA_CODEC_LEGACY') self.conn = KafkaClient(server) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 262ccfd92..fa6b1c4ce 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -1,5 +1,6 @@ from datetime import timedelta -import logging +from kafka.protocol import CODEC_NONE + AUTO_START = True BACKEND = 'frontera.contrib.backends.memory.FIFO' @@ -19,6 +20,7 @@ HBASE_STATE_CACHE_SIZE_LIMIT = 3000000 HBASE_QUEUE_TABLE = 'queue' KAFKA_GET_TIMEOUT = 5.0 +KAFKA_CODEC_LEGACY = CODEC_NONE MAX_NEXT_REQUESTS = 64 MAX_REQUESTS = 0 MESSAGE_BUS = 'frontera.contrib.messagebus.zeromq.MessageBus' diff --git a/setup.py b/setup.py index e7f743bd5..c5da6d81d 100644 --- a/setup.py +++ b/setup.py @@ -63,8 +63,7 @@ 'msgpack-python' ], 'kafka': [ - 'kafka-python<=0.9.5', - 'python-snappy' + 'kafka-python<=0.9.5' ], 'distributed': [ 'Twisted'