Source code for kafka.producer.base

from __future__ import absolute_import

import logging
import time

try:
    from queue import Empty
except ImportError:
    from Queue import Empty
from collections import defaultdict
from multiprocessing import Queue, Process

import six

from kafka.common import (
    ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set

log = logging.getLogger("kafka")

BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20

STOP_ASYNC_PRODUCER = -1


def _send_upstream(queue, client, codec, batch_time, batch_size,
                   req_acks, ack_timeout):
    """
    Listen on the queue for a specified number of messages or till
    a specified timeout and send them upstream to the brokers in one
    request

    NOTE: Ideally, this should have been a method inside the Producer
    class. However, multiprocessing module has issues in windows. The
    functionality breaks unless this function is kept outside of a class
    """
    stop = False
    client.reinit()

    while not stop:
        timeout = batch_time
        count = batch_size
        send_at = time.time() + timeout
        msgset = defaultdict(list)

        # Keep fetching till we gather enough messages or a
        # timeout is reached
        while count > 0 and timeout >= 0:
            try:
                topic_partition, msg, key = queue.get(timeout=timeout)

            except Empty:
                break

            # Check if the controller has requested us to stop
            if topic_partition == STOP_ASYNC_PRODUCER:
                stop = True
                break

            # Adjust the timeout to match the remaining period
            count -= 1
            timeout = send_at - time.time()
            msgset[topic_partition].append(msg)

        # Send collected requests upstream
        reqs = []
        for topic_partition, msg in msgset.items():
            messages = create_message_set(msg, codec, key)
            req = ProduceRequest(topic_partition.topic,
                                 topic_partition.partition,
                                 messages)
            reqs.append(req)

        try:
            client.send_produce_request(reqs,
                                        acks=req_acks,
                                        timeout=ack_timeout)
        except Exception:
            log.exception("Unable to send message")


[docs]class Producer(object): """ Base class to be used by producers Arguments: client: The Kafka client instance to use async: If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these WARNING!!! current implementation of async producer does not guarantee message delivery. Use at your own risk! Or help us improve with a PR! req_acks: A value indicating the acknowledgements that the server must receive before responding to the request ack_timeout: Value (in milliseconds) indicating a timeout for waiting for an acknowledgement batch_send: If True, messages are send in batches batch_send_every_n: If set, messages are send in batches of this size batch_send_every_t: If set, messages are send after this timeout """ ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed DEFAULT_ACK_TIMEOUT = 1000 def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 self.client = client self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout if codec is None: codec = CODEC_NONE elif codec not in ALL_CODECS: raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) self.codec = codec if self.async: log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, args=(self.queue, self.client.copy(), self.codec, batch_send_every_t, batch_send_every_n, self.req_acks, self.ack_timeout)) # Process will die if main thread exits self.proc.daemon = True self.proc.start()
[docs] def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests @param: topic, name of topic for produce request -- type str @param: partition, partition number for produce request -- type int @param: *msg, one or more message payloads -- type bytes @returns: ResponseRequest returned by server raises on error Note that msg type *must* be encoded to bytes by user. Passing unicode message will not work, for example you should encode before calling send_messages via something like `unicode_message.encode('utf-8')` All messages produced via this method will set the message 'key' to Null """ return self._send_messages(topic, partition, *msg)
def _send_messages(self, topic, partition, *msg, **kwargs): key = kwargs.pop('key', None) # Guarantee that msg is actually a list or tuple (should always be true) if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") # Raise TypeError if any message is not encoded as bytes if any(not isinstance(m, six.binary_type) for m in msg): raise TypeError("all produce message payloads must be type bytes") # Raise TypeError if the key is not encoded as bytes if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") if self.async: for m in msg: self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] else: messages = create_message_set(msg, self.codec, key) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) except Exception: log.exception("Unable to send messages") raise return resp
[docs] def stop(self, timeout=1): """ Stop the producer. Optionally wait for the specified timeout before forcefully cleaning up. """ if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.proc.join(timeout) if self.proc.is_alive(): self.proc.terminate()