consumer
string errstr
;
RdKafka
::Conf
*conf
= RdKafka
::Conf
::create(RdKafka
::Conf
::CONF_GLOBAL
);
conf
->set("bootstrap.servers", "localhost", errstr
);
RdKafka
::Consumer
*kafka_consumer
= Rdkafa
::Consumer
::create(RdKafka
::Consumer
::create(conf
, errstr
);
if (!kafka_consumer
)
RdKafka
::Conf
*tconf
=
RdKafka
::Conf
::create(RdKafa
::Conf
::CONF_TOPIC
);
RdKafka
::Topic
*topic
= RdKafka
::Topic
::create(kafka_consumer
, topic_str_name
,tconf
, errstr
)
RdKafka
::ErrCode
*resp
= kafka_consumer
->start(topic
, partition
, RdKafka
::TOPIC
::OFFSSET_BEGINNING
);
code
#include <iostream>
#include <librdkafka/rdkafkacpp.h>
#include <string>
#include <thread>
#include <fstream>
#include <unistd.h>
using namespace std
;
bool run_
= true;
static void sigterm(int sig
) {
run_
= false;
}
void msg_consume(RdKafka
::Message
*msg
, void *opaque
) {
switch (msg
->err()) {
case RdKafka
::ERR__TIMED_OUT
:
break;
case RdKafka
::ERR_NO_ERROR
:
cout
<< static_cast<char *>(msg
->payload()) << endl
;
break;
default:
break;
}
}
class ExampleConsumerCb : public RdKafka
::ConsumeCb
{
public:
void consume_cb(RdKafka
::Message
&msg
, void *opaque
) {
msg_consume(&msg
, opaque
);
}
};
static void *thread_consume(RdKafka
::Consumer
*consumer
, RdKafka
::Topic
*topic
) {
ofstream
of("text.txt");
while (run_
) {
RdKafka
::Message
*msg
= nullptr;
msg
= consumer
->consume(topic
, 0, 100);
switch (msg
->err()) {
case RdKafka
::ERR__TIMED_OUT
:
break;
case RdKafka
::ERR_NO_ERROR
:
cout
<< "len " << msg
->len() << endl
;
of
<< static_cast<char *>(msg
->payload()) << endl
;
break;
default:
break;
}
of
.flush();
}
of
.close();
}
int main() {
cout
<< "Kafka consumer " << endl
;
string errstr
;
RdKafka
::Conf
*conf
= RdKafka
::Conf
::create(RdKafka
::Conf
::CONF_GLOBAL
);
if (RdKafka
::Conf
::CONF_OK
!=
conf
->set("bootstrap.servers", "localhost", errstr
)) {
fprintf(stderr, "Set bootstrap.servers failed: %s\n", errstr
.c_str());
exit(1);
}
if (RdKafka
::Conf
::CONF_OK
!=
conf
->set("group.id", "123", errstr
)) {
fprintf(stderr, "Set group.id failed: %s\n", errstr
.c_str());
exit(1);
}
if (RdKafka
::Conf
::CONF_OK
!= conf
->set("max.partition.fetch.bytes", "1024000", errstr
)) {
fprintf(stderr, "Set max.partition.fetch.bytes: %s\n", errstr
.c_str());
exit(1);
}
RdKafka
::Consumer
*kafka_consumer
= RdKafka
::Consumer
::create(conf
, errstr
);
if (!kafka_consumer
) {
fprintf(stderr, "Create consumer failed: \n", errstr
.c_str());
exit(1);
}
delete conf
;
RdKafka
::Conf
*topi_conf
= RdKafka
::Conf
::create(RdKafka
::Conf
::CONF_TOPIC
);
if (!topi_conf
) {
fprintf(stderr, "Create Topic configuer failed: %s\n", errstr
.c_str());
exit(1);
}
if (topi_conf
->set("auto.offset.reset", "smallest", errstr
) != RdKafka
::Conf
::CONF_OK
) {
fprintf(stderr, "Failed to set auto.offset.resetL %s\n", errstr
.c_str());
exit(1);
}
delete topi_conf
;
RdKafka
::Topic
*topic
= RdKafka
::Topic
::create(kafka_consumer
, "test", topi_conf
, errstr
);
if (!topic
) {
fprintf(stderr, "Failed to create topic: %s\n", errstr
.c_str());
exit(1);
}
RdKafka
::ErrorCode resp
= kafka_consumer
->start(topic
, 0, RdKafka
::Topic
::OFFSET_END
);
if (resp
!= RdKafka
::ERR_NO_ERROR
) {
fprintf(stderr, "Failed to start consumer: %s\n");
exit(1);
}
int64_t last_offset
= 0;
RdKafka
::Message
*msg
= nullptr;
ExampleConsumerCb ex_consume_cb
;
int use_ccb
= 1;
while (run_
) {
if (use_ccb
) {
kafka_consumer
->consume_callback(topic
, 0, 1000, &ex_consume_cb
, &use_ccb
);
kafka_consumer
->poll(0);
} else {
while (run_
) {
msg
= kafka_consumer
->consume(topic
, 0, 100);
switch (msg
->err()) {
case RdKafka
::ERR__TIMED_OUT
:
break;
case RdKafka
::ERR_NO_ERROR
:
cout
<< "msg length " << msg
->len() << endl
;
last_offset
= msg
->offset();
cout
<< static_cast<char *>(msg
->payload()) << endl
;
break;
case RdKafka
::ERR__UNKNOWN_TOPIC
:
break;
case RdKafka
::ERR__UNKNOWN_PARTITION
:
cerr
<< "Consumer failed: " << msg
->errstr() << endl
;
run_
= false;
break;
default:
cerr
<< "Consume failed: " << msg
->err() << endl
;
run_
= false;
break;
}
kafka_consumer
->poll(0);
}
}
kafka_consumer
->stop(topic
, 0);
if (topic
) {
delete topic
;
topic
= nullptr;
}
RdKafka
::wait_destroyed(5000);
return 0;
}
}
producer
转载请注明原文地址: https://lol.8miu.com/read-28451.html