[Hadoopstreaming-commits] r10 - in pkg: . R inst/wordCntDemo man

noreply at r-forge.r-project.org noreply at r-forge.r-project.org
Tue Apr 13 01:27:04 CEST 2010


Author: drosen
Date: 2010-04-13 01:27:03 +0200 (Tue, 13 Apr 2010)
New Revision: 10

Modified:
   pkg/DESCRIPTION
   pkg/R/hsCmdLineArgs.R
   pkg/R/hsLineReader.R
   pkg/R/hsTableReader.R
   pkg/inst/wordCntDemo/hsWordCnt.R
   pkg/inst/wordCntDemo/runHadoop.sh
   pkg/man/HadoopStreaming-package.Rd
   pkg/man/hsCmdLineArgs.Rd
   pkg/man/hsTableReader.Rd
   pkg/man/hsWriteTable.Rd
Log:
Bunch of changes.

Modified: pkg/DESCRIPTION
===================================================================
--- pkg/DESCRIPTION	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/DESCRIPTION	2010-04-12 23:27:03 UTC (rev 10)
@@ -1,8 +1,8 @@
 Package: HadoopStreaming
 Type: Package
 Title: Utilities for using R scripts in Hadoop streaming
-Version: 0.15
-Date: 2009-04-03
+Version: 0.2
+Date: 2009-09-28
 Author: David S. Rosenberg <drosen at sensenetworks.com>
 Maintainer: David S. Rosenberg <drosen at sensenetworks.com>
 Depends: getopt

Modified: pkg/R/hsCmdLineArgs.R
===================================================================
--- pkg/R/hsCmdLineArgs.R	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/R/hsCmdLineArgs.R	2010-04-12 23:27:03 UTC (rev 10)
@@ -7,13 +7,16 @@
     'infile'   ,  'i', 1, "character","Specifies an input file, otherwise use stdin.",NA,
     'outfile',    'o', 1, "character","Specifies an output file, otherwise use stdout.",NA,
     'skip',       's',1,"numeric","Number of lines of input to skip at the beginning.",0,
-    'chunksize','C',1,"numeric","Number of lines to read at once, a la scan.",-1,
+    'chunksize',  'c',1,"numeric","Number of lines to read at once, a la scan.",-1,
+    'memlim',     'z',1,"numeric","Max number of bytes allowed for use in carry.",-1,
     'numlines',   'n',1,"numeric","Max number of lines to read from input, per mapper or reducer job.",0,
     'sepr',       'e',1,"character","Separator character, as used by scan.",'\t',
     'insep',      'f',1,"character","Separator character for input, defaults to sepr.",NA,
     'outsep',     'g',1,"character","Separator character output, defaults to sepr.",NA,
+    'debug',      'd',0,"logical","Turn on debugging output.",FALSE,
     'help',       'h',0,"logical","Get a help message.",FALSE
     )
+
   specmat = matrix(c(spec,basespec),ncol=6,byrow=TRUE)
 
   opt = getopt(specmat[,1:5],opt=args)
@@ -36,7 +39,7 @@
     ## self = commandArgs()[1];
     cat(getopt(specmat,usage=TRUE))
     opt$set = FALSE
-    return(opt)
+##    return(opt)
   }
 
   if (openConnections) {
@@ -58,6 +61,7 @@
       opt$outcon = file(description=opt$outfile,open="w")
     }
   }
+
   opt$set = TRUE
   return(opt)
 }

Modified: pkg/R/hsLineReader.R
===================================================================
--- pkg/R/hsLineReader.R	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/R/hsLineReader.R	2010-04-12 23:27:03 UTC (rev 10)
@@ -1,5 +1,5 @@
 `hsLineReader` <-
-function(file="",chunkSize=-1, skip=0, FUN=function(x) cat(x,sep='\n')) {
+function(file="",chunkSize=-1L, skip=0L, FUN=function(x) cat(x,sep='\n')) {
   if (skip>0) {
     junk = readLines(file, n=skip)
   }

Modified: pkg/R/hsTableReader.R
===================================================================
--- pkg/R/hsTableReader.R	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/R/hsTableReader.R	2010-04-12 23:27:03 UTC (rev 10)
@@ -1,15 +1,15 @@
 `hsTableReader` <-
-  function(file="",cols='character',chunkSize=-1,FUN=print, ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t',keyCol='key',PFUN=NULL) {
+  function(file="",cols='character',chunkSize=-1,FUN=print, ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t', keyCol='key', PFUN=NULL,carryMemLimit=512e6,carryMaxRows=Inf,stringsAsFactors=FALSE,debug=FALSE) {
     ## flush=TRUE  (allows comment, but precludes more than one record per line, which is good)
     if (skip > 0) {
-      junk = scan(file,what='character',quiet=TRUE,sep=sep,nlines=skip)
+      junk = scan(file,what='character',quiet=!debug,sep=sep,nlines=skip)
     }
     
     if (ignoreKey) {
       repeat {
-        a = scan(file,what=cols,quiet=TRUE,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+        a = scan(file,what=cols,quiet=!debug,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
         if ( length(a[[1]]) ==0 ) break
-        FUN(data.frame(a,stringsAsFactors=FALSE))
+        FUN(data.frame(a,stringsAsFactors=stringsAsFactors))
       }
       return(invisible())
     }
@@ -19,33 +19,36 @@
       aCarry = data.frame()
       fileEmpty = TRUE
       repeat {
-        a = scan(file,what=cols,quiet=TRUE,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+        if (object.size(aCarry)>carryMemLimit || nrow(aCarry) > carryMaxRows) {
+          cat('In hsTableReader, aCarry has exceeded defined limits on memory and/or rows\n',file=stderr())
+          cat('Key=',aCarry[1,keyCol],' MemoryUsed=',object.size(aCarry)/(2^20),'MB; NumRows=',nrow(aCarry),'\n',sep='',file=stderr())
+          cat('Consider using higher values for carryMemLimit and carryMaxRows,\nOR use PFUN to handle incomplete keys.\n',file=stderr())
+          ## Throw out the carryover data because getting too big
+          aCarry=data.frame()
+        }
+        a = scan(file,what=cols,quiet=!debug,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+        ## Memory Report
+        if (debug) {
+          cat('In hsTableReader, we have just scanned ',object.size(a)/(2^20),'MB. Current carry size is ',object.size(aCarry)/(2^20),'\n',file=stderr())
+        }
+        ## Done processing, because scan returned nothing
         if ( length(a[[keyCol]]) == 0 ) break
         fileEmpty = FALSE
-        ## Prepend last carry to new data and stick last user into carry
-        a = rbind(aCarry,data.frame(a,stringsAsFactors=FALSE))
-        r = rle(a[,keyCol])
-        numDistinctKeys = length(r$values)
-        if (numDistinctKeys == 1) {
-          aCarry = a
-          next
-        }
-        firstRowOfLastKey = nrow(a) - r$lengths[numDistinctKeys] + 1
-        if (firstRowOfLastKey <=1 || firstRowOfLastKey > nrow(a)) stop("Problem with firstRowOfLastKey")
+        ## Prepend last carry to new data and convert scanned stuff to data.frame
+        a = rbind(aCarry,data.frame(a,stringsAsFactors=stringsAsFactors))
+        ##  Stick last user into aCarry
+        lastKey = a[nrow(a),keyCol]
+        firstRowOfLastKey = match(lastKey,a[,keyCol])
         aCarry = a[ firstRowOfLastKey:nrow(a) , ]
         if (!singleKey) {
           ## Process all complete keys at once
           FUN(a[1:(firstRowOfLastKey-1) , ])
           next
         }
-        ## Process all complete keys, one at a time
-        startPos = 1
-        for (keyNum in 1:(numDistinctKeys-1)) {
-          endPos = startPos+r$lengths[keyNum]-1
-          FUN(a[startPos:endPos,])
-          startPos = endPos+1
+        if (firstRowOfLastKey >= 2) {
+          ## Process all complete keys, one at a time
+          by(a[1:(firstRowOfLastKey-1),], a[1:(firstRowOfLastKey-1),keyCol], FUN)        
         }
-        if (startPos != firstRowOfLastKey) stop("startPos != firstRowOfLastKey")
       }
       if (!ignoreKey && !fileEmpty && nrow(aCarry)==0) stop ("empty aCarry at end -- this should never happen!!!")
       if (nrow(aCarry)>0) {
@@ -57,9 +60,9 @@
     ## !is.null(PFUN)   here we handle partial keys in a streaming faction, rather than waiting for full key
     prevKey = NA
     repeat {
-      a = scan(file,what=cols,quiet=TRUE,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+      a = scan(file,what=cols,quiet=!debug,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
       if ( length(a[[keyCol]]) == 0 ) break
-      a = data.frame(a,stringsAsFactors=FALSE)
+      a = data.frame(a,stringsAsFactors=stringsAsFactors)
       r = rle(a[,keyCol])
       numDistinctKeys = length(r$values)
       startKey = 1

Modified: pkg/inst/wordCntDemo/hsWordCnt.R
===================================================================
--- pkg/inst/wordCntDemo/hsWordCnt.R	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/inst/wordCntDemo/hsWordCnt.R	2010-04-12 23:27:03 UTC (rev 10)
@@ -4,7 +4,7 @@
 library(HadoopStreaming)
 
 ## Additional command line arguments for this script (rest are default in hsCmdLineArgs)
-spec = c('chunkSize','c',1,"numeric","Number of lines to read at once, a la scan.",-1)
+spec = c('printDone','D',0,"logical","A flag to write DONE at the end.",FALSE)
 
 opts = hsCmdLineArgs(spec, openConnections=T)
 
@@ -32,7 +32,7 @@
     hsWriteTable(df[,mapperOutCols],file=opts$outcon,sep=opts$outsep)
   }
 
-  hsLineReader(opts$incon,chunkSize=opts$chunkSize,FUN=mapper)
+  hsLineReader(opts$incon,chunkSize=opts$chunksize,FUN=mapper)
 
 } else if (opts$reducer) {
 
@@ -40,7 +40,10 @@
     cat(d[1,'word'],sum(d$cnt),'\n',sep=opts$outsep)
   }
   cols=list(word='',cnt=0)  # define the column names and types (''-->string 0-->numeric)
-  hsTableReader(opts$incon,cols,chunkSize=opts$chunkSize,skip=opts$skip,sep=opts$insep,keyCol='word',singleKey=T, ignoreKey= F, FUN=reducer)
+  hsTableReader(opts$incon,cols,chunkSize=opts$chunksize,skip=opts$skip,sep=opts$insep,keyCol='word',singleKey=T, ignoreKey= F, FUN=reducer)
+  if (opts$printDone) {
+    cat("DONE\n");
+  }
 }
 
 if (!is.na(opts$infile)) {

Modified: pkg/inst/wordCntDemo/runHadoop.sh
===================================================================
--- pkg/inst/wordCntDemo/runHadoop.sh	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/inst/wordCntDemo/runHadoop.sh	2010-04-12 23:27:03 UTC (rev 10)
@@ -1,24 +1,25 @@
 #!/bin/env bash
 HADOOP="$HADOOP_HOME/bin/hadoop"   # Hadoop command
 HADOOPSTREAMING="$HADOOP jar $HADOOP_HOME/contrib/streaming/hadoop-0.19.1-streaming.jar" # change version number as appropriate
-RLIBPATH="~/RLibrary"   # can specify additional R Library paths here
+RLIBPATH=$HOME/RLibrary  # can specify additional R Library paths here
 
 INPUTFILE="anna.txt"
-INPUTDIR="input"
+HFSINPUTDIR="input"
 OUTDIR="annaWordCnt"
 RFILE="hsWordCnt.R"
 LOCALOUT="annaWordCnts.out"
 # Put the file into the Hadoop file system
-$HADOOP fs -put $INPUTFILE $INPUTDIR
+$HADOOP fs -put $INPUTFILE $HFSINPUTDIR
 
 # Remove the directory if already exists (otherwise, won't run)
 $HADOOP fs -rmr $OUTDIR
 
 MAPARGS="--mapper"  
 REDARGS="--reducer"
+JOBARGS="-cmdenv R_LIBS=$RLIBPATH" # numReduceTasks 0
+echo $HADOOPSTREAMING -cmdenv R_LIBS=$RLIBPATH  -input $HFSINPUTDIR/$INPUTFILE -output $OUTDIR -mapper "$RFILE $MAPARGS" -reducer "$RFILE $REDARGS" -file $RFILE 
+$HADOOPSTREAMING $JOBARGS   -input $HFSINPUTDIR/$INPUTFILE -output $OUTDIR -mapper "$RFILE $MAPARGS" -reducer "$RFILE $REDARGS" -file $RFILE 
 
-$HADOOPSTREAMING -input $INPUTDIR/$INPUTFILE -output $OUTDIR -mapper "$RFILE $MAPARGS" -reducer "$RFILE $REDARGS" -file $RFILE -cmdenv R_LIBS=$RLIBPATH 
-
 # Extract output
 ./$RFILE --reducecols > $LOCALOUT
 $HADOOP fs -cat $OUTDIR/part* >> $LOCALOUT

Modified: pkg/man/HadoopStreaming-package.Rd
===================================================================
--- pkg/man/HadoopStreaming-package.Rd	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/HadoopStreaming-package.Rd	2010-04-12 23:27:03 UTC (rev 10)
@@ -46,7 +46,7 @@
 doing things such as specifying an input file, the number of lines of
 input to read, the input and output column separators, etc.
 The \code{hsCmdLineArgs} function also facilitates packaging both the mapper
-and reducer scripts into a single R script by accept arguments
+and reducer scripts into a single R script by accepting arguments
 --mapper and --reducer to specify whether the call to the script
 should execute the mapper branch or the reducer.
 

Modified: pkg/man/hsCmdLineArgs.Rd
===================================================================
--- pkg/man/hsCmdLineArgs.Rd	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/hsCmdLineArgs.Rd	2010-04-12 23:27:03 UTC (rev 10)
@@ -73,7 +73,7 @@
 \author{David S. Rosenberg \email{drosen at sensenetworks.com} }
 \seealso{This package relies heavily on package \pkg{getopt}}
 \examples{
-spec = c('chunkSize','c',1,"numeric","Number of lines to read at once, a la scan.",-1)
+spec = c('myChunkSize','C',1,"numeric","Number of lines to read at once, a la scan.",-1)
 ## Displays the help string
 hsCmdLineArgs(spec, args=c('-h'))
 ## Call with the mapper flag, and request that connections be opened

Modified: pkg/man/hsTableReader.Rd
===================================================================
--- pkg/man/hsTableReader.Rd	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/hsTableReader.Rd	2010-04-12 23:27:03 UTC (rev 10)
@@ -9,9 +9,10 @@
 a handler for processing.  This continues until the end of file.
 }
 \usage{
-hsTableReader(file = "", cols = "character", chunkSize = -1,
-FUN = print, ignoreKey = TRUE, singleKey = TRUE,
- skip = 0, sep = "\t", keyCol = "key",PFUN=NULL)
+hsTableReader(file="",cols='character',chunkSize=-1,FUN=print,
+ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t',
+keyCol='key', PFUN=NULL,carryMemLimit=512e6,carryMaxRows=Inf,
+stringsAsFactors=FALSE,debug=FALSE)
 }
 %- maybe also 'usage' for other objects documented here.
 \arguments{
@@ -29,6 +30,10 @@
   \item{sep}{Any separator character accepted by scan}
   \item{keyCol}{The column name of the column with the keys.}
   \item{PFUN}{Same as FUN, except handles incomplete keys. See below.}
+  \item{carryMemLimit}{Max memory used for values of a single key}
+  \item{carryMaxRows}{Max number of values allowed for a key.}
+  \item{stringsAsFactors}{Whether strings converted to factors.}
+  \item{debug}{Whether to print debug messages.}
 }
 \details{
   With ignoreKey=TRUE, hsTableReader reads from file, chunkSize lines at

Modified: pkg/man/hsWriteTable.Rd
===================================================================
--- pkg/man/hsWriteTable.Rd	2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/hsWriteTable.Rd	2010-04-12 23:27:03 UTC (rev 10)
@@ -11,7 +11,7 @@
 \arguments{
   \item{d}{A data frame}
   \item{file}{A connection, as taken by write.table()}
-  \item{sep}{The column separator, defaults to a tab character.}
+  \item{sep}{The column separator, defaults to a tab character}
 }
 \value{
   No return value.



More information about the Hadoopstreaming-commits mailing list