[Hadoopstreaming-commits] r10 - in pkg: . R inst/wordCntDemo man
noreply at r-forge.r-project.org
noreply at r-forge.r-project.org
Tue Apr 13 01:27:04 CEST 2010
Author: drosen
Date: 2010-04-13 01:27:03 +0200 (Tue, 13 Apr 2010)
New Revision: 10
Modified:
pkg/DESCRIPTION
pkg/R/hsCmdLineArgs.R
pkg/R/hsLineReader.R
pkg/R/hsTableReader.R
pkg/inst/wordCntDemo/hsWordCnt.R
pkg/inst/wordCntDemo/runHadoop.sh
pkg/man/HadoopStreaming-package.Rd
pkg/man/hsCmdLineArgs.Rd
pkg/man/hsTableReader.Rd
pkg/man/hsWriteTable.Rd
Log:
Bunch of changes.
Modified: pkg/DESCRIPTION
===================================================================
--- pkg/DESCRIPTION 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/DESCRIPTION 2010-04-12 23:27:03 UTC (rev 10)
@@ -1,8 +1,8 @@
Package: HadoopStreaming
Type: Package
Title: Utilities for using R scripts in Hadoop streaming
-Version: 0.15
-Date: 2009-04-03
+Version: 0.2
+Date: 2009-09-28
Author: David S. Rosenberg <drosen at sensenetworks.com>
Maintainer: David S. Rosenberg <drosen at sensenetworks.com>
Depends: getopt
Modified: pkg/R/hsCmdLineArgs.R
===================================================================
--- pkg/R/hsCmdLineArgs.R 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/R/hsCmdLineArgs.R 2010-04-12 23:27:03 UTC (rev 10)
@@ -7,13 +7,16 @@
'infile' , 'i', 1, "character","Specifies an input file, otherwise use stdin.",NA,
'outfile', 'o', 1, "character","Specifies an output file, otherwise use stdout.",NA,
'skip', 's',1,"numeric","Number of lines of input to skip at the beginning.",0,
- 'chunksize','C',1,"numeric","Number of lines to read at once, a la scan.",-1,
+ 'chunksize', 'c',1,"numeric","Number of lines to read at once, a la scan.",-1,
+ 'memlim', 'z',1,"numeric","Max number of bytes allowed for use in carry.",-1,
'numlines', 'n',1,"numeric","Max number of lines to read from input, per mapper or reducer job.",0,
'sepr', 'e',1,"character","Separator character, as used by scan.",'\t',
'insep', 'f',1,"character","Separator character for input, defaults to sepr.",NA,
'outsep', 'g',1,"character","Separator character output, defaults to sepr.",NA,
+ 'debug', 'd',0,"logical","Turn on debugging output.",FALSE,
'help', 'h',0,"logical","Get a help message.",FALSE
)
+
specmat = matrix(c(spec,basespec),ncol=6,byrow=TRUE)
opt = getopt(specmat[,1:5],opt=args)
@@ -36,7 +39,7 @@
## self = commandArgs()[1];
cat(getopt(specmat,usage=TRUE))
opt$set = FALSE
- return(opt)
+## return(opt)
}
if (openConnections) {
@@ -58,6 +61,7 @@
opt$outcon = file(description=opt$outfile,open="w")
}
}
+
opt$set = TRUE
return(opt)
}
Modified: pkg/R/hsLineReader.R
===================================================================
--- pkg/R/hsLineReader.R 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/R/hsLineReader.R 2010-04-12 23:27:03 UTC (rev 10)
@@ -1,5 +1,5 @@
`hsLineReader` <-
-function(file="",chunkSize=-1, skip=0, FUN=function(x) cat(x,sep='\n')) {
+function(file="",chunkSize=-1L, skip=0L, FUN=function(x) cat(x,sep='\n')) {
if (skip>0) {
junk = readLines(file, n=skip)
}
Modified: pkg/R/hsTableReader.R
===================================================================
--- pkg/R/hsTableReader.R 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/R/hsTableReader.R 2010-04-12 23:27:03 UTC (rev 10)
@@ -1,15 +1,15 @@
`hsTableReader` <-
- function(file="",cols='character',chunkSize=-1,FUN=print, ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t',keyCol='key',PFUN=NULL) {
+ function(file="",cols='character',chunkSize=-1,FUN=print, ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t', keyCol='key', PFUN=NULL,carryMemLimit=512e6,carryMaxRows=Inf,stringsAsFactors=FALSE,debug=FALSE) {
## flush=TRUE (allows comment, but precludes more than one record per line, which is good)
if (skip > 0) {
- junk = scan(file,what='character',quiet=TRUE,sep=sep,nlines=skip)
+ junk = scan(file,what='character',quiet=!debug,sep=sep,nlines=skip)
}
if (ignoreKey) {
repeat {
- a = scan(file,what=cols,quiet=TRUE,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+ a = scan(file,what=cols,quiet=!debug,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
if ( length(a[[1]]) ==0 ) break
- FUN(data.frame(a,stringsAsFactors=FALSE))
+ FUN(data.frame(a,stringsAsFactors=stringsAsFactors))
}
return(invisible())
}
@@ -19,33 +19,36 @@
aCarry = data.frame()
fileEmpty = TRUE
repeat {
- a = scan(file,what=cols,quiet=TRUE,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+ if (object.size(aCarry)>carryMemLimit || nrow(aCarry) > carryMaxRows) {
+ cat('In hsTableReader, aCarry has exceeded defined limits on memory and/or rows\n',file=stderr())
+ cat('Key=',aCarry[1,keyCol],' MemoryUsed=',object.size(aCarry)/(2^20),'MB; NumRows=',nrow(aCarry),'\n',sep='',file=stderr())
+ cat('Consider using higher values for carryMemLimit and carryMaxRows,\nOR use PFUN to handle incomplete keys.\n',file=stderr())
+ ## Throw out the carryover data because getting too big
+ aCarry=data.frame()
+ }
+ a = scan(file,what=cols,quiet=!debug,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+ ## Memory Report
+ if (debug) {
+ cat('In hsTableReader, we have just scanned ',object.size(a)/(2^20),'MB. Current carry size is ',object.size(aCarry)/(2^20),'\n',file=stderr())
+ }
+ ## Done processing, because scan returned nothing
if ( length(a[[keyCol]]) == 0 ) break
fileEmpty = FALSE
- ## Prepend last carry to new data and stick last user into carry
- a = rbind(aCarry,data.frame(a,stringsAsFactors=FALSE))
- r = rle(a[,keyCol])
- numDistinctKeys = length(r$values)
- if (numDistinctKeys == 1) {
- aCarry = a
- next
- }
- firstRowOfLastKey = nrow(a) - r$lengths[numDistinctKeys] + 1
- if (firstRowOfLastKey <=1 || firstRowOfLastKey > nrow(a)) stop("Problem with firstRowOfLastKey")
+ ## Prepend last carry to new data and convert scanned stuff to data.frame
+ a = rbind(aCarry,data.frame(a,stringsAsFactors=stringsAsFactors))
+ ## Stick last user into aCarry
+ lastKey = a[nrow(a),keyCol]
+ firstRowOfLastKey = match(lastKey,a[,keyCol])
aCarry = a[ firstRowOfLastKey:nrow(a) , ]
if (!singleKey) {
## Process all complete keys at once
FUN(a[1:(firstRowOfLastKey-1) , ])
next
}
- ## Process all complete keys, one at a time
- startPos = 1
- for (keyNum in 1:(numDistinctKeys-1)) {
- endPos = startPos+r$lengths[keyNum]-1
- FUN(a[startPos:endPos,])
- startPos = endPos+1
+ if (firstRowOfLastKey >= 2) {
+ ## Process all complete keys, one at a time
+ by(a[1:(firstRowOfLastKey-1),], a[1:(firstRowOfLastKey-1),keyCol], FUN)
}
- if (startPos != firstRowOfLastKey) stop("startPos != firstRowOfLastKey")
}
if (!ignoreKey && !fileEmpty && nrow(aCarry)==0) stop ("empty aCarry at end -- this should never happen!!!")
if (nrow(aCarry)>0) {
@@ -57,9 +60,9 @@
## !is.null(PFUN) here we handle partial keys in a streaming faction, rather than waiting for full key
prevKey = NA
repeat {
- a = scan(file,what=cols,quiet=TRUE,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
+ a = scan(file,what=cols,quiet=!debug,sep=sep,strip.white=TRUE,nlines=chunkSize,flush=TRUE)
if ( length(a[[keyCol]]) == 0 ) break
- a = data.frame(a,stringsAsFactors=FALSE)
+ a = data.frame(a,stringsAsFactors=stringsAsFactors)
r = rle(a[,keyCol])
numDistinctKeys = length(r$values)
startKey = 1
Modified: pkg/inst/wordCntDemo/hsWordCnt.R
===================================================================
--- pkg/inst/wordCntDemo/hsWordCnt.R 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/inst/wordCntDemo/hsWordCnt.R 2010-04-12 23:27:03 UTC (rev 10)
@@ -4,7 +4,7 @@
library(HadoopStreaming)
## Additional command line arguments for this script (rest are default in hsCmdLineArgs)
-spec = c('chunkSize','c',1,"numeric","Number of lines to read at once, a la scan.",-1)
+spec = c('printDone','D',0,"logical","A flag to write DONE at the end.",FALSE)
opts = hsCmdLineArgs(spec, openConnections=T)
@@ -32,7 +32,7 @@
hsWriteTable(df[,mapperOutCols],file=opts$outcon,sep=opts$outsep)
}
- hsLineReader(opts$incon,chunkSize=opts$chunkSize,FUN=mapper)
+ hsLineReader(opts$incon,chunkSize=opts$chunksize,FUN=mapper)
} else if (opts$reducer) {
@@ -40,7 +40,10 @@
cat(d[1,'word'],sum(d$cnt),'\n',sep=opts$outsep)
}
cols=list(word='',cnt=0) # define the column names and types (''-->string 0-->numeric)
- hsTableReader(opts$incon,cols,chunkSize=opts$chunkSize,skip=opts$skip,sep=opts$insep,keyCol='word',singleKey=T, ignoreKey= F, FUN=reducer)
+ hsTableReader(opts$incon,cols,chunkSize=opts$chunksize,skip=opts$skip,sep=opts$insep,keyCol='word',singleKey=T, ignoreKey= F, FUN=reducer)
+ if (opts$printDone) {
+ cat("DONE\n");
+ }
}
if (!is.na(opts$infile)) {
Modified: pkg/inst/wordCntDemo/runHadoop.sh
===================================================================
--- pkg/inst/wordCntDemo/runHadoop.sh 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/inst/wordCntDemo/runHadoop.sh 2010-04-12 23:27:03 UTC (rev 10)
@@ -1,24 +1,25 @@
#!/bin/env bash
HADOOP="$HADOOP_HOME/bin/hadoop" # Hadoop command
HADOOPSTREAMING="$HADOOP jar $HADOOP_HOME/contrib/streaming/hadoop-0.19.1-streaming.jar" # change version number as appropriate
-RLIBPATH="~/RLibrary" # can specify additional R Library paths here
+RLIBPATH=$HOME/RLibrary # can specify additional R Library paths here
INPUTFILE="anna.txt"
-INPUTDIR="input"
+HFSINPUTDIR="input"
OUTDIR="annaWordCnt"
RFILE="hsWordCnt.R"
LOCALOUT="annaWordCnts.out"
# Put the file into the Hadoop file system
-$HADOOP fs -put $INPUTFILE $INPUTDIR
+$HADOOP fs -put $INPUTFILE $HFSINPUTDIR
# Remove the directory if already exists (otherwise, won't run)
$HADOOP fs -rmr $OUTDIR
MAPARGS="--mapper"
REDARGS="--reducer"
+JOBARGS="-cmdenv R_LIBS=$RLIBPATH" # numReduceTasks 0
+echo $HADOOPSTREAMING -cmdenv R_LIBS=$RLIBPATH -input $HFSINPUTDIR/$INPUTFILE -output $OUTDIR -mapper "$RFILE $MAPARGS" -reducer "$RFILE $REDARGS" -file $RFILE
+$HADOOPSTREAMING $JOBARGS -input $HFSINPUTDIR/$INPUTFILE -output $OUTDIR -mapper "$RFILE $MAPARGS" -reducer "$RFILE $REDARGS" -file $RFILE
-$HADOOPSTREAMING -input $INPUTDIR/$INPUTFILE -output $OUTDIR -mapper "$RFILE $MAPARGS" -reducer "$RFILE $REDARGS" -file $RFILE -cmdenv R_LIBS=$RLIBPATH
-
# Extract output
./$RFILE --reducecols > $LOCALOUT
$HADOOP fs -cat $OUTDIR/part* >> $LOCALOUT
Modified: pkg/man/HadoopStreaming-package.Rd
===================================================================
--- pkg/man/HadoopStreaming-package.Rd 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/HadoopStreaming-package.Rd 2010-04-12 23:27:03 UTC (rev 10)
@@ -46,7 +46,7 @@
doing things such as specifying an input file, the number of lines of
input to read, the input and output column separators, etc.
The \code{hsCmdLineArgs} function also facilitates packaging both the mapper
-and reducer scripts into a single R script by accept arguments
+and reducer scripts into a single R script by accepting arguments
--mapper and --reducer to specify whether the call to the script
should execute the mapper branch or the reducer.
Modified: pkg/man/hsCmdLineArgs.Rd
===================================================================
--- pkg/man/hsCmdLineArgs.Rd 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/hsCmdLineArgs.Rd 2010-04-12 23:27:03 UTC (rev 10)
@@ -73,7 +73,7 @@
\author{David S. Rosenberg \email{drosen at sensenetworks.com} }
\seealso{This package relies heavily on package \pkg{getopt}}
\examples{
-spec = c('chunkSize','c',1,"numeric","Number of lines to read at once, a la scan.",-1)
+spec = c('myChunkSize','C',1,"numeric","Number of lines to read at once, a la scan.",-1)
## Displays the help string
hsCmdLineArgs(spec, args=c('-h'))
## Call with the mapper flag, and request that connections be opened
Modified: pkg/man/hsTableReader.Rd
===================================================================
--- pkg/man/hsTableReader.Rd 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/hsTableReader.Rd 2010-04-12 23:27:03 UTC (rev 10)
@@ -9,9 +9,10 @@
a handler for processing. This continues until the end of file.
}
\usage{
-hsTableReader(file = "", cols = "character", chunkSize = -1,
-FUN = print, ignoreKey = TRUE, singleKey = TRUE,
- skip = 0, sep = "\t", keyCol = "key",PFUN=NULL)
+hsTableReader(file="",cols='character',chunkSize=-1,FUN=print,
+ignoreKey=TRUE,singleKey=TRUE, skip=0, sep='\t',
+keyCol='key', PFUN=NULL,carryMemLimit=512e6,carryMaxRows=Inf,
+stringsAsFactors=FALSE,debug=FALSE)
}
%- maybe also 'usage' for other objects documented here.
\arguments{
@@ -29,6 +30,10 @@
\item{sep}{Any separator character accepted by scan}
\item{keyCol}{The column name of the column with the keys.}
\item{PFUN}{Same as FUN, except handles incomplete keys. See below.}
+ \item{carryMemLimit}{Max memory used for values of a single key}
+ \item{carryMaxRows}{Max number of values allowed for a key.}
+ \item{stringsAsFactors}{Whether strings converted to factors.}
+ \item{debug}{Whether to print debug messages.}
}
\details{
With ignoreKey=TRUE, hsTableReader reads from file, chunkSize lines at
Modified: pkg/man/hsWriteTable.Rd
===================================================================
--- pkg/man/hsWriteTable.Rd 2009-04-03 22:39:39 UTC (rev 9)
+++ pkg/man/hsWriteTable.Rd 2010-04-12 23:27:03 UTC (rev 10)
@@ -11,7 +11,7 @@
\arguments{
\item{d}{A data frame}
\item{file}{A connection, as taken by write.table()}
- \item{sep}{The column separator, defaults to a tab character.}
+ \item{sep}{The column separator, defaults to a tab character}
}
\value{
No return value.
More information about the Hadoopstreaming-commits
mailing list