Kafka 部署和基础使用

基本术语

  • Topic: 每一个消息类别被称为一个话题(topic)

    消息可以用于区分不同的业务系统,每个话题都是多订阅者模式,一个话题可以包含一个或多个消费者

  • Partition: 每一个话题内,包含多个分区,每个分区可能包含多个副本,其中有一个副本作为 leader,用于对外提供服务

    分区是物理层面上存储的形式,每个分区都会被存储在一个 Broker 中,只有分区内可以确保绝对的顺序

    写入到一个话题的消息,会按照下面的方式进行分配

    • 轮询策略: 按顺序存储到分区中
    • 随机策略: 随机存储到一个分区中
    • 键哈希策略: 根据 key 求哈希值,存储到哈希对应的分区中
  • Producer: 发布消息的对象称为生产者(Kafka topic producer)

  • Consumer: 订阅消息并处理消息的对象称为消费者

  • Broker: 已发布的消息需要保存在服务器中,称为 Kafka 集群,集群中的每一个服务器都是一个代理(Broker)

  • 系统模式

    • 队列模式: 以队列形式管理消息,队列允许消息有多个消费者,并且消息在消费后将从队列移除(每个消息只会被消费一次)
    • 发布订阅模式: 当有新消息被添加后,将会通知所有订阅的消费者,所有的消费者维护自己的队列

Docker 镜像

优先在 Docker 中测试,鉴于镜像较大,可以使用国内镜像拉取(http://hub-mirror.c.163.com/https://registry.cn-hangzhou.aliyuncs.com

docker pull wurstmeister/zookeeper  
docker pull wurstmeister/kafka  

可以选择使用 docker-compose 启动 kafka

version: '2'

services:
  zoo:
    image: wurstmeister/zookeeper:latest
    restart: unless-stopped
    hostname: zoo
    ports:
      - "2181:2181"
    container_name: zookeeper

  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
    depends_on:
      - zoo
    container_name: kafka

镜像启动后,会挂载一个 volume 存储 kafka 所需要的文件,也即简单地关闭容器后,数据会被保存在 volume 中,重启可以恢复

这里,KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 需要配置为服务器的地址(如 host.docker.internal:9092)。当客户端和 Kafka 运行在不同的网络中,这个参数将会默认为 localhost,导致客户端连接失败

C 语言 DEMO

Kafka 本身的通过一个基于 TCP 的私有协议通信,因此要快速地在代码里操作 Kafka 需要使用第三方模块。

C/C++ 下,有 librdkafka 可以用于操作 kafka

  • 在 Ubuntu 下使用 apt install -y librdkafka-dev 安装
  • 在 Arch Linux 下,使用 pacman -Sy librdkafka --noconfirm 安装

安装完成后,即可使用官方样例的代码进行测试

生产者

#include <stdio.h>
#include <signal.h>
#include <string.h>

#include <librdkafka/rdkafka.h>


static volatile sig_atomic_t run = 1;

/**
 * @brief 程序终止的信号
 */

static void stop(int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}


/**
 * @brief 消息传递报告回调。
 *
 * 这个回调在每条信息中被精确地调用一次
 *
 * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 表示传递成功
 * (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 表示递交失败
 *
 */
static void dr_msg_cb(
    rd_kafka_t* rk,
    const rd_kafka_message_t* rkmessage,
    void* opaque
) {
    if (rkmessage->err)
        fprintf(stderr, "%% Message delivery failed: %s\n",
            rd_kafka_err2str(rkmessage->err));
    else
        fprintf(stderr,
            "%% Message delivered (%zd bytes, "
            "partition %" PRId32 ")\n",
            rkmessage->len, rkmessage->partition);

    /* rkmessage 的销毁由 librdkafka 维护 */
}



int main(int argc, char** argv) {
    rd_kafka_t* rk;         /* 生产者实例 */
    rd_kafka_conf_t* conf;  /* 临时配置对象 */
    char errstr[512];       /* librdkafka 接口错误缓冲区 */
    char buf[512];          /* 临时消息缓冲区 */
    const char* brokers;    /* 参数: broker 列表 */
    const char* topic;      /* 参数: 使用的话题 */

    /*
     * 参数检查
     * 使用 程序名 <broker> <topic>
     */
    if (argc != 3) {
        fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
        return 1;
    }


    brokers = argv[1];
    topic = argv[2];
    printf("%s\n%s\n", brokers, topic);


    /*
     * 创建 Kafka 客户端
     */
    conf = rd_kafka_conf_new();

    /*
     * 配置 Kafka 客户端
     * 使用逗号分割的 broker 列表
     */
    if (rd_kafka_conf_set(
        conf,
        "bootstrap.servers",
        brokers,
        errstr,
        sizeof(errstr)
    ) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        return 1;
    }

    /*
     * 配置交付回调
     */
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

    /*
     * 创建生产者示例
     *
     * rd_kafka_new() 取得 conf 对象所有权,后续程序不能再次引用 conf
     *
     */
    rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr,
            "%% Failed to create new producer: %s\n", errstr);
        return 1;
    }

    /* 监听 CTRL + C */
    signal(SIGINT, stop);

    fprintf(stderr,
        "%% 键入文本,并使用回车递交消息\n"
        "%% 直接回车,只展示递交报告\n"
        "%% 使用 Ctrl + C 或 Ctrl + D 结束\n");

    // 只要程序未接收到关闭信号,且未 EOF,则无限循环
    while (run && fgets(buf, sizeof(buf), stdin)) {
        size_t len = strlen(buf);
        rd_kafka_resp_err_t err;

        /* 清除换行符 */
        if (buf[len - 1] == '\n') buf[--len] = '\0';

        /* 空行,仅用于交付报告 */
        if (len == 0) {
            rd_kafka_poll(rk, 0/*non-blocking */);
            continue;
        }

        /*
         * 发送(生产)消息
         *
         * 异步调用,通过回调函数接收结果
         *
         */
    retry:
        err = rd_kafka_producev(
            /* 生产者处理 */
            rk,
            /* 话题名称 */
            RD_KAFKA_V_TOPIC(topic),
            /* 消息拷贝 */
            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
            /* 消息长度 */
            RD_KAFKA_V_VALUE(buf, len),
            /* 消息非透明 */
            RD_KAFKA_V_OPAQUE(NULL),
            /* 结束哨兵 */
            RD_KAFKA_V_END
        );

        if (err) {
            /*
             * 生产失败
             */
            fprintf(stderr,
                "%% Failed to produce to topic %s: %s\n",
                topic, rd_kafka_err2str(err));

            if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                /*
                 * 如果失败的原因是因为队列已满,则等待重试
                 */
                rd_kafka_poll(rk, 1000 /* 等待 1000ms */);
                goto retry;
            }
        } else {
            fprintf(stderr, "%% Enqueued message (%zd bytes) "
                "for topic %s\n",
                len, topic);
        }


        /*
         * 生产者需要不断使用 rd_kafka_poll 生产消息(未生产消息也需要调用)
         * 1. 在主循环中不断调用
         * 2. 或者在专用线程中
         */
        rd_kafka_poll(rk, 0/*non-blocking*/);
    }


    /* 
     * 等待最后的消息递交或失败
     * rd_kafka_flush 会等待所有消息被送达
     */
    fprintf(stderr, "%% Flushing final messages..\n");
    rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);

    /* 如果输出队列非空,则存在问题 */
    if (rd_kafka_outq_len(rk) > 0)
        fprintf(stderr, "%% %d message(s) were not delivered\n",
            rd_kafka_outq_len(rk));

    /* 销毁生产者实例 */
    rd_kafka_destroy(rk);

    return 0;
}

消费者

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>

#include <librdkafka/rdkafka.h>


static volatile sig_atomic_t run = 1;

/**
 * @brief 程序终止的信号
 */
static void stop(int sig) {
    run = 0;
}



/**
 * 判断是否所有内容都是可打印的
 * @returns 返回 1 表示都可打印
 */
static int is_printable(const char* buf, size_t size) {
    size_t i;

    for (i = 0; i < size; i++)
        if (!isprint((int)buf[i]))
            return 0;

    return 1;
}


int main(int argc, char** argv) {
    rd_kafka_t* rk;          /* 消费者 */
    rd_kafka_conf_t* conf;   /* 临时配置对象 */
    rd_kafka_resp_err_t err; /* Kafka 接口错误码 */
    char errstr[512];        /* librdkafka 接口错误缓冲区 */
    const char* brokers;     /* 参数: broker 列表 */
    const char* groupid;     /* 参数:消费者组 ID */
    char** topics;           /* 参数:订阅的话题 */
    int topic_cnt;           /* 订阅的话题数 */
    rd_kafka_topic_partition_list_t* subscription; /* 订阅的话题列表 */
    int i;

    /*
     * 参数检查
     */
    if (argc < 4) {
        fprintf(stderr,
            "%% Usage: "
            "%s <broker> <group.id> <topic1> <topic2>..\n",
            argv[0]);
        return 1;
    }

    brokers = argv[1];
    groupid = argv[2];
    topics = &argv[3];
    topic_cnt = argc - 3;


    /*
     * Create Kafka client configuration place-holder
     */
    conf = rd_kafka_conf_new();

    /* Set bootstrap broker(s) as a comma-separated list of
     * host or host:port (default port 9092).
     * librdkafka will use the bootstrap brokers to acquire the full
     * set of brokers from the cluster. */
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    /* Set the consumer group id.
     * All consumers sharing the same group id will join the same
     * group, and the subscribed topic' partitions will be assigned
     * according to the partition.assignment.strategy
     * (consumer config property) to the consumers in the group. */
    if (rd_kafka_conf_set(conf, "group.id", groupid,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    /* If there is no previously committed offset for a partition
     * the auto.offset.reset strategy will be used to decide where
     * in the partition to start fetching messages.
     * By setting this to earliest the consumer will read all messages
     * in the partition if there was no previously committed offset. */
    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    /*
     * Create consumer instance.
     *
     * NOTE: rd_kafka_new() takes ownership of the conf object
     *       and the application must not reference it again after
     *       this call.
     */
    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr,
            "%% Failed to create new consumer: %s\n", errstr);
        return 1;
    }

    conf = NULL; /* Configuration object is now owned, and freed,
                  * by the rd_kafka_t instance. */


                  /* Redirect all messages from per-partition queues to
                   * the main queue so that messages can be consumed with one
                   * call from all assigned partitions.
                   *
                   * The alternative is to poll the main queue (for events)
                   * and each partition queue separately, which requires setting
                   * up a rebalance callback and keeping track of the assignment:
                   * but that is more complex and typically not recommended. */
    rd_kafka_poll_set_consumer(rk);


    /* Convert the list of topics to a format suitable for librdkafka */
    subscription = rd_kafka_topic_partition_list_new(topic_cnt);
    for (i = 0; i < topic_cnt; i++)
        rd_kafka_topic_partition_list_add(subscription,
            topics[i],
            /* the partition is ignored
             * by subscribe() */
            RD_KAFKA_PARTITION_UA);

    /* Subscribe to the list of topics */
    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,
            "%% Failed to subscribe to %d topics: %s\n",
            subscription->cnt, rd_kafka_err2str(err));
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,
        "%% Subscribed to %d topic(s), "
        "waiting for rebalance and messages...\n",
        subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);


    /* Signal handler for clean shutdown */
    signal(SIGINT, stop);

    /* Subscribing to topics will trigger a group rebalance
     * which may take some time to finish, but there is no need
     * for the application to handle this idle period in a special way
     * since a rebalance may happen at any time.
     * Start polling for messages. */

    while (run) {
        rd_kafka_message_t* rkm;

        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm)
            continue; /* Timeout: no message within 100ms,
                       *  try again. This short timeout allows
                       *  checking for `run` at frequent intervals.
                       */

                       /* consumer_poll() will return either a proper message
                        * or a consumer error (rkm->err is set). */
        if (rkm->err) {
            /* Consumer errors are generally to be considered
             * informational as the consumer will automatically
             * try to recover from all types of errors. */
            fprintf(stderr,
                "%% Consumer error: %s\n",
                rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }

        /* Proper message. */
        printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",
            rd_kafka_topic_name(rkm->rkt), rkm->partition,
            rkm->offset);

        /* Print the message key. */
        if (rkm->key && is_printable(rkm->key, rkm->key_len))
            printf(" Key: %.*s\n",
                (int)rkm->key_len, (const char*)rkm->key);
        else if (rkm->key)
            printf(" Key: (%d bytes)\n", (int)rkm->key_len);

        /* Print the message value/payload. */
        if (rkm->payload && is_printable(rkm->payload, rkm->len))
            printf(" Value: %.*s\n",
                (int)rkm->len, (const char*)rkm->payload);
        else if (rkm->payload)
            printf(" Value: (%d bytes)\n", (int)rkm->len);

        rd_kafka_message_destroy(rkm);
    }


    /* Close the consumer: commit final offsets and leave the group. */
    fprintf(stderr, "%% Closing consumer\n");
    rd_kafka_consumer_close(rk);


    /* Destroy the consumer */
    rd_kafka_destroy(rk);

    return 0;
}

测试

由于需要链接 librdkafka 动态链接库,因此编译时,需要添加编译参数 -lrdkafka(C 语言)或-lrdkafka++(C++)

也即,完整的编译指令为

gcc src/producer.c -o producer -lrdkafka
gcc src/consumer.c -o consumer -lrdkafka

编译后,即可启动两个程序,进行测试

./producer 127.0.0.1:9092 test
./consumer 127.0.0.1:9092 0 test

两个程序的话题(topic)需要一致,消费者需要额外传入一个 group ID,用于区分成组的消费者(每个消息对于一个组内只会被处理一次)

可以先启动生产者,并预发送几条消息,而后启动消费者,查看是否能接收到该消息。接下来则是在两者都启动的情况下,传输消息

测试截图测试截图

参考资料