Developing a MapR Event Store For Apache Kafka C Application

This topic includes basic information about how to develop a MapR Event Store For Apache Kafka C application. Sample applications are provided.

Before you Begin

Confirm that your environment meets the following requirements:

  • The MapR Data Platform cluster version is 5.2.1 or greater.
  • MapR Data Platform core client (mapr-client) packagei s installed on the node and it is configured to access the cluster. Or, it is a MapR Data Platform cluster node. See Installing the MapR Client for more information.
  • The MapR Event Store For Apache Kafka C Client is installed and configured on the node. See Configuring the MapR Event Store For Apache Kafka C Client.
  • GNU Compiler Collection (GCC) is installed on the node.

Creating, Compiling and Running C Apps

The following sections describes how to create a producer and consumer in C, compile the source code, generate executables, and run the applications.

This topic describes how to create a MapR Event Store For Apache Kafka streams producer in C. While the code to generate a MapR Event Store For Apache Kafka stream producer varies depending on the use case, in general, the producer code should contain the following:
  1. Include the rdkafka.h header file (/opt/mapr/include/librdkafka/rdkafka.h)
  2. Use rd_kafka_conf_new() to create the producer configuration.
  3. Use rd_kafka_new () to create the producer handle.
  4. Use rd_kafka_topic_conf_new() to create the topic configuration.
  5. Use rd_kafka_topic_new () to create a topic handle for the producer.
  6. Use rd_kafka_produce() to produce messages.
  7. Optionally, use rd_kafka_poll()to poll for callbacks. This is useful to see if there are messages that have yet to be sent to the server.
  8. Use rd_kafka_topic_destroy () to destroy the topic handle destroy
  9. Use rd_kafka_destroy () to destroy the producer handle.

For example, the following source code produces 5 messages to topic /MapR_Streams:MapR-Topic1:

/*
 * This file contains the producer function.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rdkafka.h>
#include <errno.h>

/* msgDeliveryCB: Is the delivery callback.
 * The delivery report callback will be called once for each message
 * accepted by rd_kafka_produce() with err set to indicate
 * the result of the produce request. An application must call rd_kafka_poll()
 * at regular intervals to serve queued delivery report callbacks.
 */   
static void msgDeliveryCB (rd_kafka_t *rk,
                           const rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
        printf("FAILURE: Message not delivered to partition.\n");
        printf("ERROR: %s", rd_kafka_err2str(rkmessage->err));
    } else {
        printf("Produced: %.*s\n",(int)rkmessage->len, (const char*)rkmessage->payload); 
    }
}

/*
 * Method      : int producer(int nummsgs_p, const char *fullTopicName)
 * Description : This is a simple producer method. In this method the producer
 *               produces messages to a topic.
 */

int producer(int nummsgs_p, const char *fullTopicName) {
    printf("************************ PRODUCER ************************\n");
    rd_kafka_t *prodHndle;
    rd_kafka_conf_t *prodCfg;
    char errstr[1000];
    int totalMsgs = nummsgs_p;

    printf("Create producer configuration object\n");
    /*
     * rd_kafka_conf_new(): This API creates default rd_kafka_conf_t object to
     * be passed at the time of producer object creation using rd_kafka_new call.
     */
    prodCfg = rd_kafka_conf_new();
    if (prodCfg == NULL) {
        printf("Failed to create conf\n");
        return (EXIT_FAILURE);
    }
    /* rd_kafka_conf_set_dr_msg_cb(): This API sets the producer callback
     * 'msgDeliveryCB' in producer config 'prodCfg'
     * The delivery report callback will be called once for each message
     * accepted by rd_kafka_produce() with err set to indicate
     * the result of the produce request. An application must call rd_kafka_poll()
     * at regular intervals to serve queued delivery report callbacks.
     */
    rd_kafka_conf_set_dr_msg_cb(prodCfg, msgDeliveryCB);

    printf("Create Producer Kafka handle\n");
    /*
     * rd_kafka_new():Creates a new Kafka handle and starts its operation
     * according to the specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER).
     * prodCfg object passed here is freed by this function and must not be used
     * or destroyed by the application subsequently. errstr must be a pointer to
     * memory of at least size errstr_size where
     * `rd_kafka_new()` may write a human readable error message in case the
     * creation of a new handle fails. In which case the function returns NULL. 
     */
    prodHndle = rd_kafka_new(RD_KAFKA_PRODUCER, prodCfg, errstr, sizeof(errstr));
    if (prodHndle == NULL) {
        printf("Failed to create producer: %s\n", errstr);
        return (EXIT_FAILURE);
    }

    /*
     * Following code does following:
     * 1. Create a topic handle for each producer-topic combination
     * 2. Produce 'totalMsgs' # of messages using topic handle created in step 1
     * 3. Wait for all messages to be produced and callback to be delivered.
     * 4. Move on to next topic and repeat.
     */

    int totalTopics = 1;
    for (int nTopics = 0; nTopics < totalTopics ; nTopics++) {
        printf("Create topic handle\n");
        rd_kafka_topic_conf_t *prodTopicCfg;
        /*
         * rd_kafka_topic_conf_new(): This API Creates topic conf object
         * required to create topic handle which then will be used for each
         * producer-topic combination
         */

        prodTopicCfg = rd_kafka_topic_conf_new();
        if (prodTopicCfg == NULL) {
            printf("Failed to create new topic conf\n");
            return (EXIT_FAILURE);
        }

            rd_kafka_topic_t *prodTopicHndl;
            /*
             * rd_kafka_topic_new(): This API Creates topic handle for a given
             * producer, topic name and topic config. Topic handles are refcounted
             * internally and calling rd_kafka_topic_new()
             * again with the same topic name will return the previous topic handle
             * without updating the original handle's configuration.
             * Applications must eventually call rd_kafka_topic_destroy() for each
             * succesfull call to rd_kafka_topic_new() to clear up resources.
             */
            prodTopicHndl = rd_kafka_topic_new(prodHndle, fullTopicName, prodTopicCfg);
            if (prodTopicHndl == NULL) {
                printf("Failed to create new topic handle\n");
                return (EXIT_FAILURE);
            }
            prodTopicCfg = NULL; /* Now owned by topic */

            const char* key ="Key";
            printf("Send/Produce message to topic: %s\n", fullTopicName); 
            for (int i = 0; i < totalMsgs; i++) {
                    char payload[1000];
                    if (i == 0)
                         sprintf(payload, "%s", "Welcome to MapR Streams CAPI");
                    else
                         sprintf(payload, "MapR Streams CAPI Message Payload %d", i);
                    /*
                    * rd_kafka_produce(): This API produces a single message
                    * to the cluster. prodTopicHandle must be created using
                    * rd_kafka_topic_new() api. This is an asynch non-blocking API.
                    * RD_KAFKA_PARTITION_UA is used to indicate automatic
                    * partitioning, using topics partitioner or fixed partition
                    * can be provided. RD_KAFKA_MSG_F_COPY flag indicates that
                    * library copies the payload and application manages its own
                    * payload memory. If API fails to send, errno will be set
                    * accordingly and will be able to access librdkafka specific
                    * error using rd_kafka_last_error() api.
                    */  
                    if (rd_kafka_produce(prodTopicHndl,
                                         RD_KAFKA_PARTITION_UA, 
                                         RD_KAFKA_MSG_F_COPY,
                                         payload, 
                                         strlen(payload),
                                         key,
                                         strlen(key),
                                         NULL) == -1) {
                        int errNum = errno;
                        printf("Failed to produce to topic : %s\n", rd_kafka_topic_name(prodTopicHndl));
                        printf("Error Number: %d ERROR NAME: %s\n"
                                   ,errNum, rd_kafka_err2str(rd_kafka_last_error()));
                        return (errNum);
                    }
            }

            printf("Wait for messages to be delivered\n");
            /*
            * rd_kafka_outq_len(): This API out queue contains messages waiting
            * to be sent to, or acknowledged by, server.
            * An application should wait for this queue to reach zero before
            * terminating to make sure outstanding requests are fully processed.
            * 
            * rd_kafka_poll(): This API polls the producer handle for events,
            * which will cause application provided callbacks to be called.
            * An application must call rd_kafka_poll() at regular intervals to 
            * serve queued delivery report callbacks. In this case
            * 'msgDeliveryCB' will get called.
            */
            while (rd_kafka_outq_len(prodHndle) > 0)
                rd_kafka_poll(prodHndle, 100);

            printf("\nDestroy topic handle\n");
            /*
            * Applications must eventually call rd_kafka_topic_destroy() for each
            * succesfull call to rd_kafka_topic_new() to clear up resources.
            */
            rd_kafka_topic_destroy(prodTopicHndl);
    }
    printf("Destroy producer handle\n");
    /*
     * rd_kafka_destroy(): This API destroys the producer handle created using 
     * rd_kafka_new call and frees resources.
     */
    rd_kafka_destroy(prodHndle);

    return(EXIT_SUCCESS);
}


/* MAIN */
int main(int argc, char *argv[]) {

  /* Number of messages the producer will produce */
  int nummsgs_p = 5;

  /* This is pre created Stream with one topic and one partition*/
  const char* fullTopicName = "/MapR_Streams:MapR-Topic1";
  int ret_val;

  /* Produce Messages */
  ret_val = producer(nummsgs_p, fullTopicName);
  if (EXIT_SUCCESS != ret_val) {
    printf("\nFAIL: producer failed\n");
  } else {
    printf("\nPASS: %d messages produced and sent to topic partition %s \n", nummsgs_p, fullTopicName);
  }
}
MapR Event Store For Apache KafkaMapR Event Store For Apache Kafka
  1. Include the rdkafka.h header file (/opt/mapr/include/librdkafka/rdkafka.h).
  2. Use rd_kafka_conf_new() to create the consumer configuration.
  3. Use rd_kafka_conf_set() to set the configuration parameters. For this API, you must set the "group.id."
  4. Use rd_kafka_new () to create the consumer handle.
  5. Use rd_kafka_subscribe() or rd_kafka_assign() to specify which topics to consume.
  6. Use rd_kafka_consumer_poll() to poll for messages that are ready to be consumed.
  7. Use rd_kafka_consumer_close() to perform auto commits and prepare to destroy the consumer handle.
  8. Use rd_kafka_destroy () to destroy the consumer handle.

For example, the following source code consumes 5 messages from topic /MapR_Streams:MapR-Topic1:

/*
 * This file contains the consumer function.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <rdkafka.h>
#include <string.h>

/*
 * Method      : int consumer(int expected_nummsgs, const char *fullTopicName)
 * Description : This is a simple consumer method. In this method the consumer
 *               consumes messages from a topic.
 */

int consumer(int expected_nummsgs, const char *fullTopicName) {
    printf("*********  CONSUMER START  *********\n");
    rd_kafka_t *consHndle;
    rd_kafka_conf_t *consCfg;
    rd_kafka_topic_conf_t *consTopicCfg;
    char errstr[1000];
    rd_kafka_resp_err_t errCode;

    printf("Create new consumer configuration object\n");
    /*
    * rd_kafka_conf_new(): This API creates default rd_kafka_conf_t object to
    * be passed at the time of consumer object creation using rd_kafka_new call.
    */
    consCfg = rd_kafka_conf_new();
    if(consCfg == NULL) {
        printf("Failed to create consumer conf\n");
        return(EXIT_FAILURE);
    }
    /*
    * rd_kafka_conf_set(): This API is used to set config parameters in the
    * rd_kafka_conf_t object. group.id Must be set for all the consumers.
    * All changes to the consCfg must be done before creating consumer object.
    */
    if(RD_KAFKA_CONF_OK != rd_kafka_conf_set(consCfg,
                      "group.id", "consumerGroup",
                      errstr, sizeof(errstr))) {
        printf("rd_kafka_conf_set() failed with error: %s\n", errstr);
        return (EXIT_FAILURE);
    }
   /*
    * rd_kafka_topic_conf_new(): This API Creates topic conf object
    * required to set the default topic configuration.
    */ 
    printf("Set topic configurations\n");
    consTopicCfg = rd_kafka_topic_conf_new();

    /* rd_kafka_topic_conf_set(): This API sets the config property by name.
    * consTopicCfg should have been previously set up with `rd_kafka_topic_conf_new()`
    * property set in this call is 'auto.offset.reset', when set to
    * earliest will return messages on rd_kafka_consumer_poll from beginning of
    * time (for the very first time consumption) or from last commited offset
    * for online consumer. If property is set to 'latest' it will return the
    * messages produced after consumer has started(for first time consumer) or
    * from the last committed offset for online consumer 
    */ 
    if (RD_KAFKA_CONF_OK != rd_kafka_topic_conf_set(consTopicCfg, "auto.offset.reset",
                            "earliest" ,errstr, sizeof(errstr))) {
        printf("rd_kafka_topic_conf_set() failed with error: %s\n", errstr);
        return (EXIT_FAILURE);
    }
    
    /*
    * rd_kafka_conf_set_default_topic_conf(): This API sets the default topic
    * configuration to use for automatically subscribed topics
    * The topic config object is not usable after this call.
    */
    rd_kafka_conf_set_default_topic_conf(consCfg, consTopicCfg);

    printf("Create consumer Kafka handle\n");
    /*
     * rd_kafka_new():Creates a new Kafka handle and starts its operation
     * according to the specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER).
     * consCfg object passed here is freed by this function and must not be used
     * or destroyed by the application subsequently. errstr must be a pointer to
     * memory of at least size errstr_size where
     * `rd_kafka_new()` may write a human readable error message in case the
     * creation of a new handle fails. In which case the function returns NULL. 
     */

    consHndle = rd_kafka_new(RD_KAFKA_CONSUMER, consCfg, errstr, sizeof(errstr));
    if(consHndle == NULL) {
        printf("Failed to create consumer:%s", errstr);
        return (EXIT_FAILURE);
    }
    
    /* rd_kafka_poll_set_consumer() is used to redirect the main queue which is
    *  serviced using rd_kafka_poll() to the rd_kafka_consumer_poll(). With one api
    *  'rd_kafka_consumer_poll()' both callbacks and message are serviced.
    *  Once queue is forwarded using this API, it is not permitted to call
    *  rd_kafka_poll to service non message delivery callbacks.
    */
    rd_kafka_poll_set_consumer(consHndle);

    /* Topic partition list (tp_list) is supplied as an input to the consumer
    *  subscribe(using rd_kafka_subscribe()). The api rd_kafka_subscribe() expects
    *  that the partition argument to be set to RD_KAFKA_PARTITION_UA and internally
    *  all partitions are assigned to the consumer.
    *  Note: partition balancing/assignment is done if more consumers are part
    *  of the same consumer group.
    */

    printf("Create topic partition list for topic: %s\n", fullTopicName);
    rd_kafka_topic_partition_list_t *tp_list = rd_kafka_topic_partition_list_new(0);
    rd_kafka_topic_partition_t* tpObj = rd_kafka_topic_partition_list_add(tp_list,
                                                fullTopicName, RD_KAFKA_PARTITION_UA);
    if (NULL == tpObj) {
        printf("Could not add the topic partition to the list.\n");
        return (EXIT_FAILURE);
    }

    printf("Subscribe consumer to the topic:\n");
    /*
    * rd_kafka_subscribe(): This API subscribes given consumer to the topic list
    * provided in tp_list, depending upon number of consumers in a consumer group 
    * partitions will be balanced and assigned to each consumer.
    */
    errCode = rd_kafka_subscribe(consHndle, tp_list);
    if (errCode  != RD_KAFKA_RESP_ERR_NO_ERROR) {
        printf("Topic partition subscription failed. ERROR: %d\n", errCode);
        return(errCode);
    }
    printf("Destroy topic partition list:\n");
    /*
    * rd_kafka_topic_partition_list_destroy(): This API is used to free all
    * resources used by the list and the list itself.
    */

    rd_kafka_topic_partition_list_destroy(tp_list);
   
    printf("\nStart message consumption:\n"); 
    int msg_count = 0;
    while(1) {
        /*
        * rd_kafka_consumer_poll(): This API returns one message or callback at
        * a time. An application should make sure to call consumer_poll() at regular
        * intervals, even if no messages are expected, to serve any
        * queued callbacks waiting to be called.  When the application is finished
        * with a message it must call rd_kafka_message_destroy() to destroy and
        * message.
        */
        rd_kafka_message_t *msg = rd_kafka_consumer_poll(consHndle, 1000);
        if (msg != NULL) {
            if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
                msg_count++;
                printf("%d Consumed: %.*s\n", msg_count,(int) msg->len,
                   (const char*)msg->payload);
                if (msg_count == expected_nummsgs){
                    rd_kafka_message_destroy(msg);
                    break;
                }
            }
            rd_kafka_message_destroy(msg);
        }
    }
   
    printf("\nCommit the offsets before closing the consumer\n");
    /*
    * Commit offsets on broker for the provided list of topic partitions.
    * when input is NULL the current partition assignment will be used instead.
    * If async is false this operation will block until the offset commit
    * is done, returning the resulting success or error code.
    * This call is made to be sure that offsets are committed before closing 
    * consumer.
    */
    int retVal = rd_kafka_commit(consHndle, NULL, false/*async*/);
    if(retVal != RD_KAFKA_RESP_ERR_NO_ERROR) {
        printf("rd_kafka_commit() failed");
        return(EXIT_FAILURE);
    }


    printf("\nClose and destroy consumer handle\n");
    /*
    * Consumer shutdown sequense: 
    * 1. rd_kafka_consumer_close(): This is blocking call. It makes sure to revoke
    *    assignments, commit offsets, leave consumer group.
    *    The application still needs to call rd_kafka_destroy() after
    *    this call finishes to clean up the underlying handle resources.
    * 2. rd_kafka_destroy(): This API destroys the consumer handle created using 
    *    rd_kafka_new call and frees resources
    */

    rd_kafka_consumer_close(consHndle);
    rd_kafka_destroy(consHndle);
    return(EXIT_SUCCESS);
}

/* MAIN */
int main(int argc, char *argv[]) {

  /* Number of expected messages for the consumer */
  int expected_nummsgs = 5;

  /* This is pre created Stream with one topic and one partition*/
  const char* fullTopicName = "/MapR_Streams:MapR-Topic1";
  int ret_val;

  /* Consume Messages */
  ret_val = consumer(expected_nummsgs, fullTopicName);
  if (EXIT_SUCCESS != ret_val) {
    printf("\nFAIL: consumer failed\n");
  } else {
    printf("\nPASS: %d messages consumed from topic %s\n", expected_nummsgs, fullTopicName);
  }
}
This topic describes how to compile MapR Event Store For Apache Kafka streams producers and consumers in C. When you compile a MapR Event Store For Apache Kafka C application, you must link it with the librdkafka library in the /opt/mapr/lib/ library path and include the header file directory to ensure that your application references the header file included with MapR Event Store For Apache Kafka C Client.
IMPORTANT For MapR 6.0.0 and earlier, When you compile a MapR Event Store For Apache Kafka C application, you must link it with the librdkafka library in the /opt/mapr/lib/ library path, the libjvm library, and include the header file directory to ensure that your application references the header file included with MapR Event Store For Apache Kafka C Client.

The following steps compile the source code and generate executables in the same directory as the Makefile. For example, in the librdkafka_example directory, the consumer and producer executables are generated from the producer.c and consumer.c source files.

  1. On your node, create a directory. For example: librdkafka_example.
  2. In your directory (librdkafka_example), create a producer application. For example, if you are using the provided sample producer application:
    1. Create a file named producer.c.
    2. Copy the contents of the sample producer application into that file.
  3. In your directory (librdkafka_example), create a consumer application. For example, if you are using the provided sample consumer application:
    1. Create a file named consumer.c.
    2. Copy the contents of the sample consumer application into that file.
  4. In your directory (librdkafka_example), create a file named Makefile with the following content:
    CC= g++
    HEADERDIR=/opt/mapr/include/librdkafka/
    CCFLAGS= -Wall -I$(HEADERDIR) -g -std=c99
    
    export LD_LIBRARY_PATH=/opt/mapr/lib
    
    LIBDIR= /opt/mapr/lib/
    %.o: %.c
    	gcc $(CCFLAGS) -c $< 
    
    consumer: consumer.o
    	gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
    
    producer: producer.o
    	gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
    
    all: consumer producer
    
    clean:
    	/bin/rm -f *.o consumer producer
    IMPORTANT For MapR 6.0.0 and earlier, use the following Makefile:
    CC= g++
    HEADERDIR=/opt/mapr/include/librdkafka/
    CCFLAGS= -Wall -I$(HEADERDIR) -g -std=c99
    
    #Edit JAVA_HOME to be appropriate for your environment
    JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/
    export LD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/amd64/server
    
    LIBDIR= /opt/mapr/lib/
    %.o: %.c
    	gcc $(CCFLAGS) -c $< 
    
    consumer: consumer.o
    	gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
    
    producer: producer.o
    	gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
    
    all: consumer producer
    
    clean:
    	/bin/rm -f *.o consumer producer
  5. Complete the following edits to the Makefile:
    For Mac users, locate the following line of code:
    export LD_LIBRARY_PATH=/opt/mapr/lib 
    Then, replace this line with the following line of code:
    export DYLD_LIBRARY_PATH=/opt/mapr/lib
    IMPORTANT For MapR 6.0.0 and earlier, the following steps apply:
    1. For Mac users, locate the following line of code:
      export LD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/amd64/server
      Then, replace this line with the following line of code:
      export DYLD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/server
    2. Based on your environment, edit JAVA_HOME. This ensures that LD_LIBRARY_PATH or DYLD_LIBRARY_PATH will include the full path to the directory containing the libjvm library.
      NOTE You can use find / -name libjvm* to determine the JAVA_HOME directory on your machine. However, note that the results of this command include the full path to the libjvm file not just the JAVA_HOME directory.

      For example, JAVA_HOME may be set to Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/ on a Mac and JAVA_HOME may be set to /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/ on Linux.

  6. From your directory (librdkafka_example), run the following commands to compile the source code:
    make clean
    make all

Once you have the application executables, complete the following steps to run the application:

  1. On a cluster node, use the maprcli to create a stream. For example, MapR_Streams.
    maprcli stream create -path /MapR_Streams
    NOTE As long as autocreate is enabled for the stream when you run stream create,the producer will create the topic. By default, autocreate is enabled. For more information, see stream create.
  2. At the command line, set the library path to include /opt/mapr/lib and the path to the directory that contains the libjvm library. For more information, see Configuring the MapR Event Store For Apache Kafka C Client.
    NOTE You must complete this step at the command line even though you already set the library path in the Makefile. If you do not complete the step, an error similar to the following displays when you run the application in the next step: error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
  3. From your directory (librdkafka_example), run the producer application from the command line. For example, if the application is called producer:
    ./producer
    The following appears on the console assuming that the stream name is MapR_Streams:
    ************************ PRODUCER ************************
    Create producer configuration object
    Create Producer Kafka handle
    Create topic handle
    Send/Produce message to topic: /MapR_Streams:MapR-Topic1
    Wait for messages to be delivered
    Produced: Welcome to MapR Streams CAPI
    Produced: MapR Streams CAPI Message Payload 1
    Produced: MapR Streams CAPI Message Payload 2
    Produced: MapR Streams CAPI Message Payload 3
    Produced: MapR Streams CAPI Message Payload 4
    
    Destroy topic handle
    Destroy producer handle
    
    PASS: 5 messages produced and sent to topic partition /MapR_Streams:MapR-Topic1
    
  4. From your directory (librdkafka_example), run the consumer application from the command line. For example, if the application is called consumer:
    ./consumer
    The following appears on the console assuming that the stream name is MapR_Streams:
    *********  CONSUMER START  *********
    Create new consumer configuration object
    Set topic configurations
    Create consumer Kafka handle
    Create topic partition list for topic: /MapR_Streams:MapR-Topic1
    Subscribe consumer to the topic:
    Destroy topic partition list:
    
    Start message consumption:
    1 Consumed: Welcome to MapR-ES CAPI
    2 Consumed: MapR Streams CAPI Message Payload 1
    3 Consumed: MapR Streams CAPI Message Payload 2
    4 Consumed: MapR Streams CAPI Message Payload 3
    5 Consumed: MapR Streams CAPI Message Payload 4
    
    Commit the offsets before closing the consumer
    
    Close and destroy consumer handle
    
    PASS: 5 messages consumed from topic /MapR_Streams:MapR-Topic1