# -*- coding: utf-8 -*-
# Copyright 2015 Cyan, Inc.
# Copyright 2016, 2017, 2018, 2019 Ciena Corporation
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple

import attr

# Constants
DefaultKafkaPort = 9092
OFFSET_EARLIEST = -2  # From the docs for OffsetRequest
OFFSET_LATEST = -1  # From the docs for OffsetRequest
OFFSET_NOT_COMMITTED = -1  # Returned by kafka when no offset is stored
OFFSET_COMMITTED = -101  # Used to avoid possible additions from the Kafka team
TIMESTAMP_INVALID = -1  # Used to specify that the broker should set timestamp
KAFKA_SUCCESS = 0  # An 'error' of 0 is used to indicate success

PRODUCER_ACK_NOT_REQUIRED = 0  # No ack is required
PRODUCER_ACK_LOCAL_WRITE = 1  # Send response only after it is written to log
PRODUCER_ACK_ALL_REPLICAS = -1  # Response after data written by all replicas

CODEC_LZ4 = 0x03
# NB: This doesn't contain LZ4 because we don't support it yet.

#   Structs   #
# SendRequest is used to encapsulate messages and keys prior to
# creating a message set
SendRequest = namedtuple(
    "SendRequest", ["topic", "key", "messages", "deferred"])

# Request payloads
ProduceRequest = namedtuple("ProduceRequest",
                            ["topic", "partition", "messages"])

FetchRequest = namedtuple("FetchRequest",
                          ["topic", "partition", "offset", "max_bytes"])

OffsetRequest = namedtuple("OffsetRequest",
                           ["topic", "partition", "time", "max_offsets"])

# This is currently for the API_Version=1
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
                                 ["topic", "partition", "offset", "timestamp",

OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])

# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
                             ["topic", "partition", "error", "offset"])

FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error",
                                             "highwaterMark", "messages"])

OffsetResponse = namedtuple("OffsetResponse",
                            ["topic", "partition", "error", "offsets"])

OffsetCommitResponse = namedtuple("OffsetCommitResponse",
                                  ["topic", "partition", "error"])

OffsetFetchResponse = namedtuple("OffsetFetchResponse",
                                 ["topic", "partition", "offset",
                                  "metadata", "error"])

ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
                                      ["error", "node_id", "host", "port"])

# Metadata tuples
BrokerMetadata = namedtuple("BrokerMetadata", ["node_id", "host", "port"])

TopicMetadata = namedtuple("TopicMetadata", ["topic", "topic_error_code",

PartitionMetadata = namedtuple("PartitionMetadata",
                               ["topic", "partition", "partition_error_code",
                                "leader", "replicas", "isr"])

# Requests and responses for consumer groups
@attr.s(frozen=True, slots=True)
class _JoinGroupRequestProtocol(object):
    protocol_name = attr.ib()
    protocol_metadata = attr.ib()

@attr.s(frozen=True, slots=True)
class _JoinGroupProtocolMetadata(object):
    verison = attr.ib()
    subscriptions = attr.ib()
    user_data = attr.ib()

@attr.s(frozen=True, slots=True)
class _JoinGroupRequest(object):
    A request to join a coordinator group.
    group = attr.ib()
    session_timeout = attr.ib()
    member_id = attr.ib()
    protocol_type = attr.ib()
    group_protocols = attr.ib()

@attr.s(frozen=True, slots=True)
class _JoinGroupResponseMember(object):
    member_id = attr.ib()
    member_metadata = attr.ib()

@attr.s(frozen=True, slots=True)
class _JoinGroupResponse(object):
    error = attr.ib()
    generation_id = attr.ib()
    group_protocol = attr.ib()
    leader_id = attr.ib()
    member_id = attr.ib()
    members = attr.ib()

@attr.s(frozen=True, slots=True)
class _SyncGroupRequestMember(object):
    member_id = attr.ib()
    member_metadata = attr.ib()

@attr.s(frozen=True, slots=True)
class _SyncGroupMemberAssignment(object):
    version = attr.ib()
    assignments = attr.ib()
    user_data = attr.ib()

@attr.s(frozen=True, slots=True)
class _SyncGroupRequest(object):
    group = attr.ib()
    generation_id = attr.ib()
    member_id = attr.ib()
    group_assignment = attr.ib()

@attr.s(frozen=True, slots=True)
class _SyncGroupResponse(object):
    error = attr.ib()
    member_assignment = attr.ib()

_HeartbeatRequest = namedtuple("_HeartbeatRequest", ["group", "generation_id", "member_id"])
_HeartbeatResponse = namedtuple("_HeartbeatResponse", ["error"])

_LeaveGroupRequest = namedtuple("_LeaveGroupRequest", ["group", "member_id"])
_LeaveGroupResponse = namedtuple("_LeaveGroupResponse", ["error"])

# Other useful structs
OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"])

TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
SourcedMessage = namedtuple(
    "SourcedMessage", TopicAndPartition._fields + OffsetAndMessage._fields)

[docs]class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])): """ A Kafka `message`_ in format 0. :ivar int magic: Message format version, always 0. :ivar int attributes: Compression flags. :ivar bytes key: Message key, or ``None`` when the message lacks a key. Note that the key is required on a compacted topic. :ivar bytes value: Message value, or ``None`` if this is a tombstone a.k.a. null message. .. _message: """ __slots__ = () def __repr__(self): bits = ['<Message v0'] if self.attributes != 0: if self.attributes == CODEC_GZIP: codec = ' CODEC_GZIP' elif self.attributes == CODEC_SNAPPY: codec = ' CODEC_SNAPPY' elif self.attributes == CODEC_LZ4: codec = ' CODEC_LZ4' else: codec = ' attributes=0x{:x}'.format(self.attributes) bits.append(codec) if self.key is not None: bits.append(' key={!r}'.format(self.key)) if self.value is None or len(self.value) < 1024: bits.append(' value={!r}'.format(self.value)) else: bits.append(' value={:,d} bytes {!r}...'.format(len(self.value), self.value[:512])) bits.append('>') return ''.join(bits)
################# # Exceptions # #################
[docs]class KafkaError(Exception): pass
[docs]class ClientError(KafkaError): """ Generic error when the client detects an error """ pass
[docs]class RestartError(ClientError): """ Raised when a consumer start() call is made on an already running consumer """ pass
[docs]class RestopError(ClientError): """ Raised when a consumer stop() or shutdown() call is made on a non-running consumer """ pass
[docs]class DuplicateRequestError(KafkaError): """ Error caused by calling makeRequest() with a duplicate requestId """
[docs]class BrokerResponseError(KafkaError): """ One `BrokerResponseError` subclass is defined for each protocol `error code`_. :ivar int errno: The integer error code reported by the server. :ivar bool retriable: A flag which indicates whether it is valid to retry the request which produced the error. Note that a metadata refresh may be required before retry, depending on the type of error. :ivar str message: The error code string, per the table. ``None`` if the error code is unknown to Afkak (future Kafka releases may add additional error codes). Note that this value may change for a given exception type. Code should either check the exception type or errno. .. _error code: """ retriable = False message = None def __str__(self): base = Exception.__str__(self) if self.message is None: return 'error={:d} {}'.format(self.errno, base) else: return 'error={:d} ({}) {}'.format(self.errno, self.message, base)
[docs] @classmethod def raise_for_errno(cls, errno, *args): """ Raise an exception for the given error number. :param int errno: Kafka error code. :raises BrokerResponseError: For any non-zero *errno* a `BrokerResponseError` is raised. If Afkak defines a specific exception type for the error code that is raised. All such types subclass `BrokerResponseError`. """ if errno == 0: return None subcls = cls.errnos.get(errno) if subcls is None: error = cls(*args) error.errno = errno raise error else: raise subcls(*args)
[docs]class RetriableBrokerResponseError(BrokerResponseError): """ `RetriableBrokerResponseError` is the shared superclass of all broker errors which can be retried. """ retriable = True
[docs]class UnknownError(BrokerResponseError): errno = -1 message = 'UNKNOWN_SERVER_ERROR'
[docs]class OffsetOutOfRangeError(BrokerResponseError): errno = 1 message = 'OFFSET_OUT_OF_RANGE'
[docs]class CorruptMessage(RetriableBrokerResponseError): errno = 2 message = 'CORRUPT_MESSAGE'
# Compatibility alias: InvalidMessageError = CorruptMessage
[docs]class UnknownTopicOrPartitionError(RetriableBrokerResponseError): errno = 3 message = 'UNKNOWN_TOPIC_OR_PARTITION'
[docs]class InvalidFetchRequestError(BrokerResponseError): errno = 4 message = 'INVALID_FETCH_SIZE'
[docs]class LeaderNotAvailableError(RetriableBrokerResponseError): errno = 5 message = 'LEADER_NOT_AVAILABLE'
[docs]class NotLeaderForPartitionError(RetriableBrokerResponseError): errno = 6 message = 'NOT_LEADER_FOR_PARTITION'
[docs]class RequestTimedOutError(RetriableBrokerResponseError): errno = 7 message = 'REQUEST_TIMED_OUT'
[docs]class BrokerNotAvailableError(BrokerResponseError): errno = 8 message = 'BROKER_NOT_AVAILABLE'
[docs]class ReplicaNotAvailableError(BrokerResponseError): errno = 9 message = 'REPLICA_NOT_AVAILABLE'
[docs]class MessageSizeTooLargeError(BrokerResponseError): errno = 10 message = 'MESSAGE_SIZE_TOO_LARGE'
[docs]class StaleControllerEpochError(BrokerResponseError): errno = 11 message = 'STALE_CONTROLLER_EPOCH'
[docs]class OffsetMetadataTooLargeError(BrokerResponseError): errno = 12 message = 'OFFSET_METADATA_TOO_LARGE'
[docs]class NetworkException(RetriableBrokerResponseError): errno = 13 message = 'NETWORK_EXCEPTION'
# Compatibility alias: StaleLeaderEpochCodeError = NetworkException
[docs]class CoordinatorLoadInProgress(RetriableBrokerResponseError): errno = 14 message = 'COORDINATOR_LOAD_IN_PROGRESS'
# Compatibility alias: OffsetsLoadInProgressError = CoordinatorLoadInProgress
[docs]class CoordinatorNotAvailable(RetriableBrokerResponseError): errno = 15 message = 'COORDINATOR_NOT_AVAILABLE'
# Compatibility alias: ConsumerCoordinatorNotAvailableError = CoordinatorNotAvailable
[docs]class NotCoordinator(RetriableBrokerResponseError): errno = 16 message = 'NOT_COORDINATOR'
# Compatibility alias: NotCoordinatorForConsumerError = NotCoordinator
[docs]class InvalidTopic(BrokerResponseError): """ The request specified an illegal topic name. The name is either malformed or references an internal topic for which the operation is not valid. """ errno = 17 message = "INVALID_TOPIC_EXCEPTION"
[docs]class RecordListTooLarge(BrokerResponseError): """ The produce request message batch exceeds the maximum configured segment size. """ errno = 18 message = "RECORD_LIST_TOO_LARGE"
[docs]class NotEnoughReplicas(RetriableBrokerResponseError): """ The number of in-sync replicas is lower than can satisfy the number of acks required by the produce request. """ errno = 19 message = "NOT_ENOUGH_REPLICAS"
[docs]class NotEnoughReplicasAfterAppend(RetriableBrokerResponseError): """ The produce request was written to the log, but not by as many in-sync replicas as it required. """ errno = 20 message = "NOT_ENOUGH_REPLICAS_AFTER_APPEND"
[docs]class InvalidRequiredAcks(BrokerResponseError): errno = 21 message = "INVALID_REQUIRED_ACKS"
[docs]class IllegalGeneration(BrokerResponseError): errno = 22 message = "ILLEGAL_GENERATION"
[docs]class InconsistentGroupProtocol(BrokerResponseError): errno = 23 message = "INCONSISTENT_GROUP_PROTOCOL"
[docs]class InvalidGroupId(BrokerResponseError): errno = 24 message = "INVALID_GROUP_ID"
[docs]class UnknownMemberId(BrokerResponseError): errno = 25 message = "UNKNOWN_MEMBER_ID"
[docs]class InvalidSessionTimeout(BrokerResponseError): errno = 26 message = "INVALID_SESSION_TIMEOUT"
[docs]class RebalanceInProgress(BrokerResponseError): errno = 27 message = "REBALANCE_IN_PROGRESS"
[docs]class InvalidCommitOffsetSize(BrokerResponseError): errno = 28 message = "INVALID_COMMIT_OFFSET_SIZE"
[docs]class TopicAuthorizationFailed(BrokerResponseError): errno = 29 message = "TOPIC_AUTHORIZATION_FAILED"
[docs]class GroupAuthorizationFailed(BrokerResponseError): errno = 30 message = "GROUP_AUTHORIZATION_FAILED"
[docs]class ClusterAuthorizationFailed(BrokerResponseError): errno = 31 message = "CLUSTER_AUTHORIZATION_FAILED"
[docs]class InvalidTimestamp(BrokerResponseError): errno = 32 message = 'INVALID_TIMESTAMP'
[docs]class UnsupportedSaslMechanism(BrokerResponseError): errno = 33 message = 'UNSUPPORTED_SASL_MECHANISM'
[docs]class IllegalSaslState(BrokerResponseError): errno = 34 message = 'ILLEGAL_SASL_STATE'
[docs]class UnsupportedVersion(BrokerResponseError): errno = 35 message = 'UNSUPPORTED_VERSION'
[docs]class TopicAlreadyExists(BrokerResponseError): errno = 36 message = 'TOPIC_ALREADY_EXISTS'
[docs]class InvalidPartitions(BrokerResponseError): errno = 37 message = 'INVALID_PARTITIONS'
[docs]class InvalidReplicationFactor(BrokerResponseError): errno = 38 message = 'INVALID_REPLICATION_FACTOR'
[docs]class InvalidReplicaAssignment(BrokerResponseError): errno = 39 message = 'INVALID_REPLICA_ASSIGNMENT'
[docs]class InvalidConfig(BrokerResponseError): errno = 40 message = 'INVALID_CONFIG'
[docs]class NotController(RetriableBrokerResponseError): errno = 41 message = 'NOT_CONTROLLER'
[docs]class InvalidRequest(BrokerResponseError): errno = 42 message = 'INVALID_REQUEST'
[docs]class UnsupportedForMessageFormat(BrokerResponseError): errno = 43 message = 'UNSUPPORTED_FOR_MESSAGE_FORMAT'
[docs]class PolicyViolation(BrokerResponseError): errno = 44 message = 'POLICY_VIOLATION'
[docs]class OutOfOrderSequenceNumber(BrokerResponseError): errno = 45 message = 'OUT_OF_ORDER_SEQUENCE_NUMBER'
[docs]class DuplicateSequenceNumber(BrokerResponseError): errno = 46 message = 'DUPLICATE_SEQUENCE_NUMBER'
[docs]class InvalidProducerEpoch(BrokerResponseError): errno = 47 message = 'INVALID_PRODUCER_EPOCH'
[docs]class InvalidTxnState(BrokerResponseError): errno = 48 message = 'INVALID_TXN_STATE'
[docs]class InvalidProducerIdMapping(BrokerResponseError): errno = 49 message = 'INVALID_PRODUCER_ID_MAPPING'
[docs]class InvalidTransactionTimeout(BrokerResponseError): errno = 50 message = 'INVALID_TRANSACTION_TIMEOUT'
[docs]class ConcurrentTransactions(BrokerResponseError): errno = 51 message = 'CONCURRENT_TRANSACTIONS'
[docs]class TransactionCoordinatorFenced(BrokerResponseError): errno = 52 message = 'TRANSACTION_COORDINATOR_FENCED'
[docs]class TransactionalIdAuthorizationFailed(BrokerResponseError): errno = 53 message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED'
[docs]class SecurityDisabled(BrokerResponseError): errno = 54 message = 'SECURITY_DISABLED'
[docs]class OperationNotAttempted(BrokerResponseError): errno = 55 message = 'OPERATION_NOT_ATTEMPTED'
[docs]class KafkaStorageError(RetriableBrokerResponseError): errno = 56 message = 'KAFKA_STORAGE_ERROR'
[docs]class LogDirNotFound(BrokerResponseError): errno = 57 message = 'LOG_DIR_NOT_FOUND'
[docs]class SaslAuthenticationFailed(BrokerResponseError): errno = 58 message = 'SASL_AUTHENTICATION_FAILED'
[docs]class UnknownProducerId(BrokerResponseError): errno = 59 message = 'UNKNOWN_PRODUCER_ID'
[docs]class ReassignmentInProgress(BrokerResponseError): errno = 60 message = 'REASSIGNMENT_IN_PROGRESS'
[docs]class DelegationTokenAuthDisabled(BrokerResponseError): errno = 61 message = 'DELEGATION_TOKEN_AUTH_DISABLED'
[docs]class DelegationTokenNotFound(BrokerResponseError): errno = 62 message = 'DELEGATION_TOKEN_NOT_FOUND'
[docs]class DelegationTokenOwnerMismatch(BrokerResponseError): errno = 63 message = 'DELEGATION_TOKEN_OWNER_MISMATCH'
[docs]class DelegationTokenRequestNotAllowed(BrokerResponseError): errno = 64 message = 'DELEGATION_TOKEN_REQUEST_NOT_ALLOWED'
[docs]class DelegationTokenAuthorizationFailed(BrokerResponseError): errno = 65 message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED'
[docs]class DelegationTokenExpired(BrokerResponseError): errno = 66 message = 'DELEGATION_TOKEN_EXPIRED'
[docs]class InvalidPrincipalType(BrokerResponseError): errno = 67 message = 'INVALID_PRINCIPAL_TYPE'
[docs]class NonEmptyGroup(BrokerResponseError): errno = 68 message = 'NON_EMPTY_GROUP'
[docs]class GroupIdNotFound(BrokerResponseError): errno = 69 message = 'GROUP_ID_NOT_FOUND'
[docs]class FetchSessionIdNotFound(RetriableBrokerResponseError): errno = 70 message = 'FETCH_SESSION_ID_NOT_FOUND'
[docs]class InvalidFetchSessionEpoch(RetriableBrokerResponseError): errno = 71 message = 'INVALID_FETCH_SESSION_EPOCH'
[docs]class ListenerNotFound(RetriableBrokerResponseError): errno = 72 message = 'LISTENER_NOT_FOUND'
[docs]class KafkaUnavailableError(KafkaError): pass
[docs]class LeaderUnavailableError(KafkaError): pass
[docs]class PartitionUnavailableError(KafkaError): pass
[docs]class FailedPayloadsError(KafkaError): """ `FailedPayloadsError` indicates a partial or total failure In a method like `KafkaClient.send_produce_request()` partial failure is possible because payloads are distributed among the Kafka brokers that lead each partition. :ivar list responses: Any successful responses. :ivar list failed_payloads: Two-tuples of (payload, failure). """ responses = property(lambda self: self.args[0]) failed_payloads = property(lambda self: self.args[1])
[docs]class ConnectionError(KafkaError): pass
[docs]class BufferUnderflowError(KafkaError): pass
[docs]class ChecksumError(KafkaError): pass
[docs]class ConsumerFetchSizeTooSmall(KafkaError): pass
[docs]class ProtocolError(KafkaError): pass
[docs]class UnsupportedCodecError(KafkaError): pass
[docs]class CancelledError(KafkaError): def __init__(self, request_sent=None, message=None): self.request_sent = request_sent self.message = message def __str__(self): s = str(self.message) or 'Cancelled' if self.request_sent is not None: s += ' request_sent={!r}'.format(self.request_sent) return s
[docs]class InvalidConsumerGroupError(KafkaError): pass
[docs]class NoResponseError(KafkaError): pass
[docs]class OperationInProgress(KafkaError): def __init__(self, deferred=None): """Create an OperationInProgress exception deferred is an optional argument which represents the operation currently in progress. It should fire when the current operation completes. """ self.deferred = deferred
# TODO: document BrokerResponseError.errnos = { -1: UnknownError, 1: OffsetOutOfRangeError, 2: CorruptMessage, 3: UnknownTopicOrPartitionError, 4: InvalidFetchRequestError, 5: LeaderNotAvailableError, 6: NotLeaderForPartitionError, 7: RequestTimedOutError, 8: BrokerNotAvailableError, 9: ReplicaNotAvailableError, 10: MessageSizeTooLargeError, 11: StaleControllerEpochError, 12: OffsetMetadataTooLargeError, 13: NetworkException, 14: CoordinatorLoadInProgress, 15: CoordinatorNotAvailable, 16: NotCoordinator, 17: InvalidTopic, 18: RecordListTooLarge, 19: NotEnoughReplicas, 20: NotEnoughReplicasAfterAppend, 21: InvalidRequiredAcks, 22: IllegalGeneration, 23: InconsistentGroupProtocol, 24: InvalidGroupId, 25: UnknownMemberId, 26: InvalidSessionTimeout, 27: RebalanceInProgress, 28: InvalidCommitOffsetSize, 29: TopicAuthorizationFailed, 30: GroupAuthorizationFailed, 31: ClusterAuthorizationFailed, 32: InvalidTimestamp, 33: UnsupportedSaslMechanism, 34: IllegalSaslState, 35: UnsupportedVersion, 36: TopicAlreadyExists, 37: InvalidPartitions, 38: InvalidReplicationFactor, 39: InvalidReplicaAssignment, 40: InvalidConfig, 41: NotController, 42: InvalidRequest, 43: UnsupportedForMessageFormat, 44: PolicyViolation, 45: OutOfOrderSequenceNumber, 46: DuplicateSequenceNumber, 47: InvalidProducerEpoch, 48: InvalidTxnState, 49: InvalidProducerIdMapping, 50: InvalidTransactionTimeout, 51: ConcurrentTransactions, 52: TransactionCoordinatorFenced, 53: TransactionalIdAuthorizationFailed, 54: SecurityDisabled, 55: OperationNotAttempted, 56: KafkaStorageError, 57: LogDirNotFound, 58: SaslAuthenticationFailed, 59: UnknownProducerId, 60: ReassignmentInProgress, 61: DelegationTokenAuthDisabled, 62: DelegationTokenNotFound, 63: DelegationTokenOwnerMismatch, 64: DelegationTokenRequestNotAllowed, 65: DelegationTokenAuthorizationFailed, 66: DelegationTokenExpired, 67: InvalidPrincipalType, 68: NonEmptyGroup, 69: GroupIdNotFound, 70: FetchSessionIdNotFound, 71: InvalidFetchSessionEpoch, 72: ListenerNotFound, } def _pretty_errno(errno): """ Produce a string for a Kafka error code. The error code is looked up in the table and printed with its symbolic name: >>> _pretty_errno(5) 5 (LEADER_NOT_AVAILABLE) The result is sensible when the code is 0: >>> _pretty_errno(0) '0 (no error)' It also copes with unknown error codes: >>> _pretty_errno(999) '999 (unknown error)' :param int errno: An error code from a Kafka PDU. :returns: string suitable for a log message """ if errno == 0: message = 'no error' else: cls = BrokerResponseError.errnos.get(errno) if cls is None: message = 'unknown error' else: message = cls.message return '{:d} ({})'.format(errno, message)