Developing MapR Event Store For Apache Kafka Python Applications

This topic includes basic information about how to develop a MapR Event Store For Apache Kafka Python application and an example program that you can run.

Before you Begin

Confirm that your environment meets the following requirements:

Create a MapR Event Store For Apache Kafka Producer Application

In general, you want to create a producer that performs the following steps:
  1. Import the producer class.
  2. Define the producer and its configuration.
  3. Produce data.
  4. Wait for all messages to be sent to consumer.
As of EEP 5.0 MapR Event Store For Apache Kafka Python Client: In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
from confluent_kafka import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
     p.produce('mytopic', data.encode('utf-8'))
     p.flush()
As of EEP 3.0 MapR Event Store For Apache Kafka Python Client: In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
from mapr_streams_python import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
    p.produce('mytopic', data.encode('utf-8'))
    p.flush()

Create a MapR Event Store For Apache Kafka Consumer Application

In general, you want to create a consumer that performs the following steps:
  1. Import the consumer class.
  2. Define the consumer and its configuration.
  3. Consume data.
  4. Wait for all messages to be consumed.
As of EEP 5.0 MapR Event Store For Apache Kafka Python Client: In following example code, the MapR Event Store For Apache Kafka consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.
from confluent_kafka import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    print('Received message: %s' % msg.value().decode('utf-8'))
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()
As of EEP 3.0 MapR Event Store For Apache Kafka Python Client: In following example code, the MapR Event Store For Apache Kafka consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.
from mapr_streams_python import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    print('Received message: %s' % msg.value().decode('utf-8'))
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()

Run the Example Applications

To run the sample producer and consumer applications:
  1. Create a stream named mystream.
  2. Create a file named producer.py.
  3. Add the producer example code into the producer.py file.
  4. Create a file named consumer.py.
  5. Add the consumer example code into the consumer.py file.
  6. Verify that you have completed the steps to configure the MapR Event Store For Apache Kafka C client or complete the steps now. See Configuring the MapR Event Store For Apache Kafka C Client.
    NOTE The MapR Event Store For Apache Kafka Python Client is dependent on the MapR Event Store For Apache Kafka C Client. Therefore, the MapR Event Store For Apache Kafka C Client must be configured before you can run the application.
  7. Run producer.py from the command line to generate messages.
    $ python producer.py
  8. Run consumer.py from the command line:
    $ python consumer.py