KSQL Demo

The following demo example creates a stream, performs a non-persistent query, and a persistent query.

Setup

Complete the following steps to prepare your environment for querying:

  1. Create a default stream using /sample-stream:
    maprcli stream create -path /sample-stream 
    	-produceperm p -consumeperm p -topicperm p
  2. Run the following script to generate test data that writes to an HPE Ezmeral Data Fabric Streams topic:
    ./bin/ksql-datagen quickstart=pageviews format=delimited 
    topic=/sample-stream:pageviews maxInterval=10000
  3. Run KSQL CLI and create a KSQL table:
    > ./bin/ksql
    ksql> CREATE TABLE pageviews_original_table 
    	(viewtime bigint, userid varchar, pageid varchar) 
    	WITH (kafka_topic='/sample-stream:pageviews', value_format='DELIMITED', key='viewtime');
    ATTENTION Note the following for KSQL 6.0:
    • A table created on top of a Kafka topic does not materialize into a view because queries on such tables would be highly inefficient.
    • KSQL does not support pull queries on tables created by the CREATE TABLE command; however, you can issue pull queries against tables created by the CREATE TABLE AS SELECT statement.
      
      ksql> CREATE STREAM pageviews
        (viewtime BIGINT,
         userid VARCHAR,
         pageid VARCHAR)
        WITH (KAFKA_TOPIC='/sample-stream:pageviews',
              VALUE_FORMAT='DELIMITED');
              
      
      ksql> CREATE TABLE PAGEVIEWS_TABLE 
        WITH(KAFKA_TOPIC='/sample-stream:pageviews_table')
        AS
        SELECT userid,
               MAX(viewtime)
        FROM pageviews
        GROUP BY userid;        
      
  4. Run the SHOW TABLES command to list your KSQL tables:
    ksql> SHOW TABLES;

Run a Non-persistent Query

For a non-persistent query, run:
ksql> SELECT * FROM pageviews_original_table;
For a non-persistent query in KSQL 6.0, run:
ksql> SELECT * FROM PAGEVIEWS_TABLE WHERE userid='User_1';

Run a Persistent Query

For a persistent query, do the following:

  1. Create the topic, /sample-stream:input-topic:
    maprcli stream topic create -path /sample-stream -topic input-topic
  2. Create a KSQL input stream:
    ksql> CREATE STREAM stream1 (message varchar) WITH 
    	(kafka_topic='/sample-stream:input-topic' , value_format='DELIMITED');
  3. Create persistent query with filtering:
    ksql> CREATE STREAM stream2 
    	WITH (kafka_topic='/sample-stream:output-topic' , value_format='DELIMITED') 
    	AS SELECT * FROM stream1 WHERE LEN(message) > 2;
  4. List your queries:
    ksql> SHOW QUERIES;
  5. Run the provided sample code for the console producer:
    /opt/mapr/kafka/kafka-<version>/bin/kafka-console-producer.sh 
    	--broker-list fake.server.id:9092 --topic /sample-stream:input-topic
  6. Run the provided sample code for the console consumer:
    /opt/mapr/kafka/kafka-<version>/bin/kafka-console-consumer.sh 
    	--bootstrap-server fake.server.id:9092 
    	--topic /sample-stream:output-topic
  7. Produce some data:
    >Hi
    >Hello
    >No
    >Yes
  8. Get the next results:
    Hello 
    Yes

Auxiliary Scripts Location

The sample code for kafka-console-producer.sh and kafka-console-consumer.sh is packaged with MapR Kafka. Once MapR Kafka is installed, you can find them at:
/opt/mapr/kafka/kafka-<version>/bin/