Sample Cached Consumer Application for Audit Stream

The Consumer.java application demonstrates how to connect to the MapR Filesystem file system, and consume the messages in a stream topic.

Before running this application, ensure that you have access to a cluster running MapR File System. To build and run this application:

  1. Set the classpath as shown below:
    export CLASSPATH=`hadoop classpath`
  2. Compile the Java file as shown below:
    javac -cp .:`mapr classpath` Consumer.java
  3. Run the final Consumer.class file. For example:
    java -cp .:`mapr classpath` Consumer

This application requires the following imports:

  • org.apache.kafka.clients.consumer.ConsumerRecord
  • org.apache.kafka.clients.consumer.ConsumerRecords
  • org.apache.kafka.clients.consumer.KafkaConsumer
  • org.apache.hadoop.conf.Configuration
  • com.mapr.fs.MapRFileSystem
  • com.google.common.io.Resources
  • java.net.URI
  • java.io.IOException
  • java.io.InputStream
  • java.util.Iterator
  • java.util.Properties
  • java.util.Random
  • java.util.StringTokenizer
  • java.util.regex.Pattern

The application performs the actions described in the following sections.

Initializes the consumer properties
The configuration parameters for the consumer are stored in consumer.props file. This file should be present in the current directory or mapr classpath. For example, your consumer.props file could look similar to the following:
#bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
## apps specific ? session.timeout.ms=10000
              
# These buffer sizes are needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.
fetch.min.bytes=50000
# receive.buffer.bytes=262144 // fixed size buffer
max.partition.fetch.bytes=2097152
              
auto.offset.reset=earliest
The application initializes the consumer properties stored in the consumer.props file.
public static void main(String[] args) throws IOException,InterruptedException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        if (properties.getProperty("group.id") == null) {
           properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
        }
        consumer = new KafkaConsumer<>(properties);
    }
}
Subscribes to the topic to read from
The application initializes the file system object, with the last parameter as true so that the audit logs generated by the operations for converting fid to file path and volid to volume name are sent to the ExpandAudit.json.log file used by the expandaudit utility and not to the stream. It then selects the stream and subscribes to the topic to read at path /var/mapr/auditstream/auditlogstream:my.cluster.com.
Configuration conf = new Configuration();
String uri = MAPRFS_URI;
uri = uri + "mapr/";
conf.set("fs.default.name", uri);
MapRFileSystem fs = new MapRFileSystem();
fs.initialize(URI.create(uri), conf, true);
Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:my.cluster.com.auth.+");
consumer.subscribe(pattern,null);
Requests unread messages from the topic
The application requests to read unread messages in the subscribed topic. It then iterates through the returned records, extracts the value of each message, and prints the value to the standard output.
boolean stop = false;
int pollTimeout = 1000;
while (!stop) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
    Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
    if (iterator.hasNext()) {
      while (iterator.hasNext()) {
        ConsumerRecord<String, String> record = iterator.next();
        String value = record.value();
        String rvalue = value.replace("\"","");
        String recordValue = processRecord(fs, rvalue, value);              
        System.out.println((" Consumed Record: " + recordValue));
      }
    } else {
      Thread.sleep(1000);
      //stop = true;
    }
}
Gets the record and expands individual fields
The application then takes the record and expands fid in the message to path to file using the getMountPathFidCached() API and volid in the message to volume name using the getVolumeNameCached() API.
while (st.hasMoreTokens()) {
    String field = st.nextToken();
    StringTokenizer st1 = new StringTokenizer(field, ":");
    while (st1.hasMoreTokens()) {
        String token = st1.nextToken();
        if (token.endsWith("Fid")) {
           String lfidStr = st1.nextToken();
           String path= null;
           try {
              path = fs.getMountPathFidCached(lfidStr); // Expand FID to path
           } catch (IOException e){
        }
        lfidPath = "\"FidPath\":\""+path+"\",";
        // System.out.println("\nPAth for fid " + lfidStr +  "is " +  path);
    }              
    if (token.endsWith("volumeId")) {
        String volid = st1.nextToken();
        String name= null;
        try {
            int volumeId = Integer.parseInt(volid);
            name = fs.getVolumeNameCached(volumeId); // Cached API to convert volume Id to volume Name
        } catch (IOException e){
        }
        lvolName = "\"VolumeName\":\""+name+"\",";
        //  System.out.println("\nVolume Name for volid " + volid +  "is " +  name);
    }
}
Returns the record
The application finally returns the record after expanding the fid and volid to file path and volume name respectively.
String result = "";
StringTokenizer st2 = new StringTokenizer(value,",");
while (st2.hasMoreTokens()) {
     String tokens = st2.nextToken();
     result = result + tokens + ",";
     if (tokens.contains("Fid")) {
         result = result + lfidPath;
     }
     if (tokens.contains("volumeId")) {
         result = result + lvolName;
     }	
}
return result.substring(0, result.length() - 1);
//package com.mapr.examples;
//import com.mapr.fs;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.*;

import com.google.common.io.Resources;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.util.*;
import java.io.InputStream;
import java.util.regex.*;

public class Consumer {
  // Set the stream and topic to read from.
  private static final String MAPRFS_URI = "maprfs:///";
  public static void main(String[] args) throws IOException,InterruptedException {
    //configureConsumer(args);
    // and the consumer
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      if (properties.getProperty("group.id") == null) {
        properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
      }

      consumer = new KafkaConsumer<>(properties);
    }

    Configuration conf = new Configuration();
    String uri = MAPRFS_URI;
    uri = uri + "mapr/";
    conf.set("fs.default.name", uri);
    MapRFileSystem fs = new MapRFileSystem();
    fs.initialize(URI.create(uri), conf, true);
    //final String topic = "/var/mapr/auditstream/auditlogstream:my.cluster.com_atsqa4-130.qa.lab";
    Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:my.cluster.com.auth.+");
	// Subscribe to the topic.
    consumer.subscribe(pattern,null);

    boolean stop = false;
    int pollTimeout = 1000;
    while (!stop) {
      // Request unread messages from the topic.
      ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
      Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
      if (iterator.hasNext()) {
        while (iterator.hasNext()) {
          ConsumerRecord<String, String> record = iterator.next();
          // Iterate through returned records, extract the value
          // of each message, and print the value to standard output.
          //System.out.println((" Consumed Record: " + record.toString()));
          String value = record.value();
          String rvalue = value.replace("\"","");
          String recordValue = processRecord(fs, rvalue, value);
                   
          System.out.println((" Consumed Record: " + recordValue));
          //System.out.println((" Consumed Record: " + value));
        }
      } else {
	Thread.sleep(1000);
        //stop = true;
      }
    }
    consumer.close();
    System.out.println("All done.");
  }

  /* Get the record and expand individual fields */
  public static String processRecord(MapRFileSystem fs, String rvalue, String value) 
  {
     StringTokenizer st = new StringTokenizer(rvalue, ",");
     String lfidPath = "";
     String lvolName = "";

     while (st.hasMoreTokens())
     {
       String field = st.nextToken();
       StringTokenizer st1 = new StringTokenizer(field, ":");
       while (st1.hasMoreTokens())
       {
         String token = st1.nextToken();
	 /* If the field has fid, expand it using Cached API */
         if (token.endsWith("Fid")) {
           String lfidStr = st1.nextToken();
           String path= null;
           try {
               path = fs.getMountPathFidCached(lfidStr); // Expand FID to path
           } catch (IOException e){
           }
           lfidPath = "\"FidPath\":\""+path+"\",";
           // System.out.println("\nPAth for fid " + lfidStr +  "is " +  path);
         }

         if (token.endsWith("volumeId")) {
           String volid = st1.nextToken();
           String name= null;
           try {
             int volumeId = Integer.parseInt(volid);
               name = fs.getVolumeNameCached(volumeId); // Cached API to convert volume Id to volume Name
             }
           catch (IOException e){
           }
           lvolName = "\"VolumeName\":\""+name+"\",";
           //  System.out.println("\nVolume Name for volid " + volid +  "is " +  name);
         }
       }
     }
     String result = "";
     StringTokenizer st2 = new StringTokenizer(value,",");
     while (st2.hasMoreTokens()) {
       String tokens = st2.nextToken();
       result = result + tokens + ",";
       if (tokens.contains("Fid")) {
         result = result + lfidPath;
       }
       if (tokens.contains("volumeId")) {
         result = result + lvolName;
       }	
     }
     //return record after expansion of fid and volume id
     return result.substring(0, result.length() - 1);
  }
}