hdfs_read_revised.c

This application demonstrates how to read from files by using the APIs hdfsRead() and hdfsPread(): hdfs_read_revised.c

Before running this application:

  • Ensure that you have access to a cluster running MapR File System.
  • Ensure that a text-based file that you have access to exists on the cluster. Note the path to the file.
  • Decide on the number of bytes to read from the file.

To build and run it, download it from this page and copy it to a MapR client. Then, modify the run.sh script in Building and Running C Applications on MapR File System Clients to point to this sample application. Run the script and then run the application.

The application includes these header files:

  • stdio.h
  • hdfs.h
  • errno.h
  • fcntl.h

The APIs are defined in hdfs.h. The file fcntl.h defines the file-access flags.

The application performs the actions that are described in the following sections.

Takes a filename and buffer size as input

After compiling the application, type the following command to launch the application and pass in the path and name of the file, as well as the size of the buffer to read data into:

hdfs_read <filename> <buffer size>

Sets an RPC timeout

hdfsSetRpcTimeout() is specific to the libMapRClient version of libhdfs and takes a value that is specified in seconds. The default is 99 seconds. If you change this value, set it either to 0 (which eliminates timeouts) or to a value greater than 30.

int err = hdfsSetRpcTimeout(30);
if (err) {
  fprintf(stderr, "Failed to set rpc timeout!\n");
  exit(-1);
}
Connects to a filesystem, using an API that is supported in the hadoop-2.x version of libhdfs

The application tries to connect to the first MapR File System cluster that is specified in the mapr-clusters.conf file in the MAPR_HOME/conf directory on the client. After connecting to the filesystem, the application returns a handle to the filesystem.

hdfsFS fs = hdfsConnect("default", 0);
if (!fs) {
  fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
  exit(-1);
}
Stores the values of the arguments

The application stores them in a character array and in a variable of type tSize. This datatype is defined in hdfs.h and is a fixed-width, signed 32-byte integer type for storing the size of data for read or write operations.

const char* rfile = argv[1];
tSize bufferSize = strtoul(argv[2], NULL, 10);
Opens the file that you specified

The application opens the specified file, passing the following values to the hdfsOpenFile() function:

  • The handle to the filesystem
  • The name of the file, which you supplied when you launched the application.
  • A flag to indicate the mode in which to open the file. In this case, the flag is O_RDONLY, which specifies read-only mode.
  • The default chunk size for the directory in which the file is either located or will be created. This value is specified by the 0 in the last parameter.

Although there are two other parameters in the hdfsOpenFile() function – the fourth and fifth, the libMapRClient version of libhdfsignores them.

hdfsFile readFile = hdfsOpenFile(fs, rfile, O_RDONLY, 0, 0, 0);
if (!readFile) {
  fprintf(stderr, "Failed to open %s for reading!\n", rfile);
  exit(-2);
}
Creates a buffer of the size that you specified

This is the buffer that the application will read data into.

  char* buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) {
  fprintf(stderr, "Failed to allocate memory!\n");
  return -2;
}
Reads an entire file with hdfsRead

The application calls the function readEntireFile():

//Read entire file from the beginning
int ret = readEntireFile(fs, readFile, buffer, bufferSize);
if (ret < 0) {
  goto done;
}

This function uses a WHILE loop. In each loop iteration, the function reads an amount of data that is equal to the size of the buffer. When the amount of bytes read is less than the size of the buffer, the end of the file has been reached and the function breaks the loop. The number of bytes read is added to a total in each iteration.

int
readEntireFile(hdfsFS fs, hdfsFile readFile, char *buffer, tSize bufferSize)
{
  tSize readBytes = bufferSize;
  uint64_t totalRead = 0;
  if (fs == NULL || readFile == NULL || buffer == NULL) {
    return -1;
  }
  while (readBytes == bufferSize) {
    readBytes = hdfsRead(fs, readFile, (void*)buffer, bufferSize);
    if (readBytes > 0) {
      totalRead += readBytes; 
    } else {   
      if (readBytes < 0) {
        fprintf(stderr, "hdfsRead failed with error %d \n", errno);
        hdfsCloseFile(fs, readFile);
        return -1;
      }
      break;
    }
  }
  return 0; 
}
Seeks an offset and reads from that offset with hdfsRead()

The application next calls the function readAtOffset(), passing in 0 as the offset from which to start reading the file.

//Read file at a particular offset 
//In this case, we are reading from offset 0
tSize readBytes = readAtOffset(fs, readFile, 0, buffer, bufferSize);
if (readBytes < 0) {
  goto done;
}

This function calls hdfsSeek() to move to the specified offset in the file.

If the seek is successful, the function reads from that offset until the buffer is full. The function then returns the number of bytes that were read.

If the seek or the read is not successful (meaning hdfsSeek() or hdfsRead() returned -1), the function closes the file and returns -1.

The offset in the file is the next byte after the end of the data that was read.

tSize
readFromOffset(hdfsFS fs, hdfsFile readFile, tOffset offset,
               char *buffer, tSize bufferSize)
{
  tSize ret = 0;
  if (fs == NULL || readFile == NULL || buffer == NULL) {
    return -1;
  }
  ret = hdfsSeek(fs, readFile, offset);
  if (!ret) {
    ret = hdfsRead(fs, readFile, buffer, bufferSize);
    if (ret < 0) {
      fprintf(stderr, "hdfsRead failed with error %d \n", errno);
    }
  } else {
    fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
  }
  if (ret < 0) {
    hdfsCloseFile(fs, readFile);    
  }
  //Current offset within the file will be positioned at (offset + readBytes)th byte.
  return ret;
}
Performs a positional read with hdfsPread()

The application calls positionalRead(), passing 100 as the offset from which to start the read.

  readBytes = positionalRead(fs, readFile, 100, buffer, bufferSize);
if (readBytes < 0) {
  goto done;
}

The function reads data into the buffer, starting at offset 100, without first calling hdfsSeek() to move the offset to that position. The offset is not moved to 100 before the read begins. The offset stays where it is, the read begins at offset 100, and (after the read) the offset remains where it was before the read. The offset in the file is ignored by the positional read.

tSize
positionalRead(hdfsFS fs, hdfsFile readFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize readBytes = 0;
  if (fs == NULL || readFile == NULL || buffer == NULL) {
    return -1;
  }
  readBytes = hdfsPread(fs, readFile, offset, buffer, bufferSize);
  if (readBytes < 0) { 
    fprintf(stderr, "hdfsPread failed with error %d \n", errno);
    hdfsCloseFile(fs, readFile);
  }
  //Current offset within the file will not be advanced if hdfsPread is used
  return readBytes;
}
Closes the file
hdfsCloseFile(fs, readFile);
Frees the buffer
free(buffer);
Disconnects from the filesystem
hdfsDisconnect(fs);
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include "hdfs.h" 
#include <errno.h>
#include <fcntl.h>

tSize
readFromOffset(hdfsFS fs, hdfsFile readFile, tOffset offset,
               char *buffer, tSize bufferSize)
{
  tSize ret = 0;

  if (fs == NULL || readFile == NULL || buffer == NULL) {
    return -1;
  }

  ret = hdfsSeek(fs, readFile, offset);
  if (!ret) {
    ret = hdfsRead(fs, readFile, buffer, bufferSize);
    if (ret < 0) {
      fprintf(stderr, "hdfsRead failed with error %d \n", errno);
    }
  } else {
    fprintf(stderr, "hdfsSeek failed with error %d \n", errno);
  }

  if (ret < 0) {
    hdfsCloseFile(fs, readFile);    
  }

  //Current offset within the file will be positioned at (offset + readBytes)th byte.
  return ret;
}

tSize
positionalRead(hdfsFS fs, hdfsFile readFile, tOffset offset, 
               char *buffer, tSize bufferSize)
{
  tSize readBytes = 0;

  if (fs == NULL || readFile == NULL || buffer == NULL) {
    return -1;
  }

  readBytes = hdfsPread(fs, readFile, offset, buffer, bufferSize);
  if (readBytes < 0) { 
    fprintf(stderr, "hdfsPread failed with error %d \n", errno);
    hdfsCloseFile(fs, readFile);
  }

  //Current offset within the file will not be advanced if hdfsPread is used
  return readBytes;
}

int
readEntireFile(hdfsFS fs, hdfsFile readFile, char *buffer, tSize bufferSize)
{
  tSize readBytes = bufferSize;
  uint64_t totalRead = 0;

  if (fs == NULL || readFile == NULL || buffer == NULL) {
    return -1;
  }

  while (readBytes == bufferSize) {
    readBytes = hdfsRead(fs, readFile, (void*)buffer, bufferSize);
    if (readBytes > 0) {
      totalRead += readBytes; 
    } else {   
      if (readBytes < 0) {
        fprintf(stderr, "hdfsRead failed with error %d \n", errno);
        hdfsCloseFile(fs, readFile);
        return -1;
      }
      break;
    }
  }

  return 0; 
}

int 
main(int argc, char **argv) 
{
  if (argc != 3) {
    fprintf(stderr, "Usage: hdfs_read <filename> <buffersize>\n");
    exit(-1);
  }

  int err = hdfsSetRpcTimeout(30);
  if (err) {
    fprintf(stderr, "Failed to set rpc timeout!\n");
    exit(-1);
  }

  hdfsFS fs = hdfsConnect("default", 0);
  if (!fs) {
    fprintf(stderr, "Failed to connect to hdfs!\n");
    exit(-1);
  } 

  const char* rfile = argv[1];
  tSize bufferSize = strtoul(argv[2], NULL, 10);

  hdfsFile readFile = hdfsOpenFile(fs, rfile, O_RDONLY, 0, 0, 0);
  if (!readFile) {
    fprintf(stderr, "Failed to open %s for reading!\n", rfile);
    exit(-2);
  }

  char* buffer = malloc(sizeof(char) * bufferSize);
  if(buffer == NULL) {
    fprintf(stderr, "Failed to allocate memory!\n");
    return -2;
  }

  //Read entire file from the beginning
  int ret = readEntireFile(fs, readFile, buffer, bufferSize);
  if (ret < 0) {
    goto done;
  }

  //Read file at a particular offset 
  //In this case, we are reading from offset 0
  tSize readBytes = readFromOffset(fs, readFile, 0, buffer, bufferSize);
  if (readBytes < 0) {
    goto done;
  }

  //Read file at a particular offset using positional read
  //In this case, read from offset 100
  readBytes = positionalRead(fs, readFile, 100, buffer, bufferSize);
  if (readBytes < 0) {
    goto done;
  }

  hdfsCloseFile(fs, readFile);
done:
  free(buffer);
  hdfsDisconnect(fs);

  return 0;
}

/**
 * vim: ts=4: sw=4: et:
 */