Kafka学习笔记

it2025-08-12  6

consumer string errstr; /* 消费者基础使用流程与参数说明 */ /* 创建consumer配置对象 */ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // Set bootstrap.servers conf->set("bootstrap.servers", "localhost", errstr); // Set consumer 组 group.id // Set max.partition.fetch.bytes /* 创建consumer */ RdKafka::Consumer *kafka_consumer = Rdkafa::Consumer::create(RdKafka::Consumer::create(conf, errstr); if (!kafka_consumer) // ... /* 创建topic配置对象 */ RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafa::Conf::CONF_TOPIC); // Set auto.offset.reset ( largest || smallest) /* 创建topic */ RdKafka::Topic *topic = RdKafka::Topic::create(kafka_consumer, topic_str_name,tconf, errstr) /* 启动consumer */ RdKafka::ErrCode *resp = kafka_consumer->start(topic, partition, RdKafka::TOPIC::OFFSSET_BEGINNING); /* consumer->consume(); */ /* kafka_consumer->poll(0); */ /* kafka_consumer->stop(); */ /* RdKafka::wait_destroyed(5000); */ code #include <iostream> #include <librdkafka/rdkafkacpp.h> #include <string> #include <thread> #include <fstream> #include <unistd.h> using namespace std; bool run_ = true; static void sigterm(int sig) { run_ = false; } void msg_consume(RdKafka::Message *msg, void *opaque) { switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: cout << static_cast<char *>(msg->payload()) << endl; break; default: break; } } // ccb class ExampleConsumerCb : public RdKafka::ConsumeCb { public: void consume_cb(RdKafka::Message &msg, void *opaque) { msg_consume(&msg, opaque); } }; static void *thread_consume(RdKafka::Consumer *consumer, RdKafka::Topic *topic) { ofstream of("text.txt"); while (run_) { RdKafka::Message *msg = nullptr; msg = consumer->consume(topic, 0, 100); switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: cout << "len " << msg->len() << endl; of << static_cast<char *>(msg->payload()) << endl; break; default: break; } of.flush(); } of.close(); } int main() { cout << "Kafka consumer " << endl; string errstr; /* Create consumer configuer */ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); /* Set broker list */ if (RdKafka::Conf::CONF_OK != conf->set("bootstrap.servers", "localhost", errstr)) { fprintf(stderr, "Set bootstrap.servers failed: %s\n", errstr.c_str()); exit(1); } /* Set consumer group */ if (RdKafka::Conf::CONF_OK != conf->set("group.id", "123", errstr)) { fprintf(stderr, "Set group.id failed: %s\n", errstr.c_str()); exit(1); } /* Set pull msg size (max.partition.fetch.bytes) */ if (RdKafka::Conf::CONF_OK != conf->set("max.partition.fetch.bytes", "1024000", errstr)) { fprintf(stderr, "Set max.partition.fetch.bytes: %s\n", errstr.c_str()); exit(1); } /* Create consumer */ RdKafka::Consumer *kafka_consumer = RdKafka::Consumer::create(conf, errstr); if (!kafka_consumer) { fprintf(stderr, "Create consumer failed: \n", errstr.c_str()); exit(1); } delete conf; /* Create Topic configure */ RdKafka::Conf *topi_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if (!topi_conf) { fprintf(stderr, "Create Topic configuer failed: %s\n", errstr.c_str()); exit(1); } // largest || smallest if (topi_conf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) { fprintf(stderr, "Failed to set auto.offset.resetL %s\n", errstr.c_str()); exit(1); } delete topi_conf; /* Create Topic */ RdKafka::Topic *topic = RdKafka::Topic::create(kafka_consumer, "test", topi_conf, errstr); if (!topic) { fprintf(stderr, "Failed to create topic: %s\n", errstr.c_str()); exit(1); } /* Start consumer */ RdKafka::ErrorCode resp = kafka_consumer->start(topic, 0, RdKafka::Topic::OFFSET_END); if (resp != RdKafka::ERR_NO_ERROR) { fprintf(stderr, "Failed to start consumer: %s\n"); exit(1); } /* consume */ int64_t last_offset = 0; RdKafka::Message *msg = nullptr; ExampleConsumerCb ex_consume_cb; int use_ccb = 1; while (run_) { if (use_ccb) { kafka_consumer->consume_callback(topic, 0, 1000, &ex_consume_cb, &use_ccb); kafka_consumer->poll(0); } else { while (run_) { msg = kafka_consumer->consume(topic, 0, 100); switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: // cerr << msg->errstr() << endl; break; case RdKafka::ERR_NO_ERROR: cout << "msg length " << msg->len() << endl; last_offset = msg->offset(); cout << static_cast<char *>(msg->payload()) << endl; break; case RdKafka::ERR__UNKNOWN_TOPIC: break; case RdKafka::ERR__UNKNOWN_PARTITION: cerr << "Consumer failed: " << msg->errstr() << endl; run_ = false; break; default: cerr << "Consume failed: " << msg->err() << endl; run_ = false; break; } kafka_consumer->poll(0); } } kafka_consumer->stop(topic, 0); if (topic) { delete topic; topic = nullptr; } RdKafka::wait_destroyed(5000); return 0; } } producer
最新回复(0)