[Blotter-commits] r922 - in pkg/FinancialInstrument: . inst/parser
noreply at r-forge.r-project.org
noreply at r-forge.r-project.org
Sun Feb 12 01:15:37 CET 2012
Author: gsee
Date: 2012-02-12 01:15:36 +0100 (Sun, 12 Feb 2012)
New Revision: 922
Modified:
pkg/FinancialInstrument/DESCRIPTION
pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R
Log:
- removed some obvious roadblocks to running TRTH_BackFill functions in
multiple instances of R simultaneously, but it is still not recommended,
especially if there are overlapping files in the jobs.
- removed doCleanup option because unzipping and splitting now takes place
in a temp dir. So, if a process is interrupted, no partial files will be left in
archive_dir. Although it is no longer called, CleanUpArchive still exists.
- after making headers, if all header files already exist in csv_dir and
overwrite==FALSE, there is no need to split
Modified: pkg/FinancialInstrument/DESCRIPTION
===================================================================
--- pkg/FinancialInstrument/DESCRIPTION 2012-02-07 20:09:32 UTC (rev 921)
+++ pkg/FinancialInstrument/DESCRIPTION 2012-02-12 00:15:36 UTC (rev 922)
@@ -11,7 +11,7 @@
meta-data and relationships. Provides support for
multi-asset class and multi-currency portfolios. Still
in heavy development.
-Version: 0.10.9
+Version: 0.10.10
URL: https://r-forge.r-project.org/projects/blotter/
Date: $Date$
Depends:
Modified: pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R
===================================================================
--- pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R 2012-02-07 20:09:32 UTC (rev 921)
+++ pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R 2012-02-12 00:15:36 UTC (rev 922)
@@ -37,8 +37,6 @@
#instrument_file = [searches path.output for filename containing 'instruments'] #name of instrument envir RData file
#job.name = "" # the Reuters TRTH job name (by default all files not on disk)
#no.cores = 4 # number of cores for foreach
-#doCleanUp = TRUE # should files that do not end with .csv.gz be
- # removed from archive_dir (see CleanUpArchive function)
#overwrite = FALSE # will not redownload, or overwrite files unless this is TRUE
#username = stop("") #TRTH user name, usually your email address
#password = stop("") #TRTH password
@@ -64,8 +62,7 @@
# overwrite = FALSE,
# tick.image = TRUE,
# sec.image = TRUE,
-# no.cores = 20,
-# doCleanUp = TRUE
+# no.cores = 20
#)
#
#download_reut(.TRTH) # Download big zipped CSV
@@ -130,7 +127,6 @@
#if (!is.null(dargs$path.output))
.TRTH$path.output <- path.output <- addslash(path.output)
-
.TRTH$archive_dir <- pickDirArg("archive_dir")
.TRTH$csv_dir <- pickDirArg("csv_dir")
.TRTH$tick_dir <- pickDirArg("tick_dir")
@@ -143,6 +139,16 @@
makeDir(.TRTH$tick_dir)
makeDir(.TRTH$sec_dir)
+ # make a temp dir to use for splitting so that (fingers crossed)
+ # more than one instance can be run at a time in separate R sessions.
+ tmp <- list()
+ tmp$path.output <- addslash(tempdir())
+ dir.create(tmp$archive_dir <- paste(tmp$path.output, "archive/", sep=""), showWarnings=FALSE, mode='0775')
+ dir.create(tmp$csv_dir <- paste(tmp$path.output, "csv/", sep=""), showWarnings=FALSE, mode='0775')
+ dir.create(tmp$tick_dir <- paste(tmp$path.output, "archive/", sep=""), showWarnings=FALSE, mode='0775')
+ dir.create(tmp$sec_dir <- paste(tmp$path.output, "archive/", sep=""), showWarnings=FALSE, mode='0775')
+ .TRTH$tmp <- tmp
+
pickArg <- function(x, default=NULL) {
# if argument "x" was passed through dots, use that
# otherwise, if it was in config_file, use that
@@ -168,7 +174,6 @@
.TRTH$tick.image <- pickArg('tick.image', TRUE)
.TRTH$sec.image <- pickArg('sec.image', TRUE)
.TRTH$no.cores <- pickArg('no.cores', 4)
- .TRTH$doCleanUp <- pickArg('doCleanUp', TRUE)
.TRTH$use.instrument <- pickArg('use.instrument', FALSE)
# if `use.instrument` is TRUE, then the code will remove negative prices from
@@ -184,8 +189,6 @@
stop("Please specify a valid filepath for instrument_file or move a file with 'instruments' in its name to 'path.output'")
}
- if (isTRUE(.TRTH$doCleanUp)) CleanUpArchive(.TRTH$archive_dir)
-
registerDoMC(.TRTH$no.cores)
# registerDoSEQ()
@@ -199,7 +202,7 @@
.TRTH <- try(get('.TRTH', pos=.GlobalEnv))
if (inherits(.TRTH, 'try-error')) stop("Run configureTRTH function first")
}
-# attach(.TRTH)
+
Sys.umask("0002")
Archive.output <- list.files(.TRTH$archive_dir)
@@ -305,9 +308,7 @@
splitCSV <- function(.TRTH) {
#FIXME: respect overwrite argument
if (missing(.TRTH) && !exists(".TRTH")) stop("Run configureTRTH function first")
- on.exit(CleanUpArchive(.TRTH$archive_dir))
- if (isTRUE(.TRTH$doCleanUp)) CleanUpArchive(.TRTH$archive_dir)
-
+
if (substr(.TRTH$path.output, nchar(.TRTH$path.output), nchar(.TRTH$path.output)) != "/") {
.TRTH$path.output <- paste(.TRTH$path.output, "/", sep="")
}
@@ -327,7 +328,7 @@
if (isTRUE(.TRTH$use.instrument)) loadInstruments(.TRTH$instrument_file)
registerDoMC(.TRTH$no.cores)
- ## unzip and split (new unzip method does not require rezip; keeps original gz file)
+ ## unzip to tempdir and split (new unzip method does not require rezip; keeps original gz file)
setwd(.TRTH$archive_dir)
foreach(i = 1:length(.TRTH$files.gz)) %dopar%
@@ -337,13 +338,14 @@
#unzip the file
print(paste("unzipping ",filename.gz, sep=""))
#system(paste("gzip -d -f ",archive_dir,filename.gz,sep=""))
- system(paste("gunzip -f < ", .TRTH$archive_dir, filename.gz, " > ", .TRTH$archive_dir, filename.csv, sep=""))
+ system(paste("gunzip -f < ", .TRTH$archive_dir, filename.gz, " > ", .TRTH$tmp$archive_dir, filename.csv, sep=""))
}
- ignored.csvs <- NULL #this will hold the names of CSVs that already have a header
- for (i in 1:length(.TRTH$files.gz))
+ ignored.csvs <- NULL #this will hold the names of CSVs that already have a header
+ setwd(.TRTH$tmp$archive) #this directory contains the big CSVs that were unzipped
+ .TRTH$files.csv <- list.files(.TRTH$tmp$archive)
+ for (i in 1:length(.TRTH$files.csv))
{
- filename.gz <- .TRTH$files.gz[i]
- filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
+ filename.csv <- .TRTH$files.csv[i]
# Use awk to split the big CSV into daily CSVs. Each CSV will have a single
# row which we will then overwrite with the column headers. Then we'll
# use awk again to put the data into the split files
@@ -353,7 +355,7 @@
print(paste('Making headers from', filename.csv))
system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";if(f1 != f2) { print >> f1; close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
- tmpfiles <- list.files(.TRTH$archive_dir)
+ tmpfiles <- list.files(.TRTH$tmp$archive_dir)
files.header <- tmpfiles[grep("RIC",tmpfiles)]
big.files <- tmpfiles[grep("@", tmpfiles)] #Big zipped CSVs from Reuters have e-mail address in name
@@ -376,26 +378,39 @@
# mv header.csv "RIC.Date[G].csv"
for (fl in tmp.files.csv) { # make files with header that awk will later populate
- system(paste('cp "', files.header, '" ', paste(.TRTH$archive_dir, fl, sep=""), sep=""))
+ # -n means don't overwrite files; useful if e.g. part001.csv.gz ends halfway through the day
+ # and part002.csv.gz has a different number of columns (I'm not sure if that ever happens, though)
+ system(paste('cp -n "', files.header, '" ', paste(.TRTH$tmp$archive_dir, fl, sep=""), sep=""))
#cp "#RIC.Date[G].csv" /home/garrett/TRTH/archive/GEM1-U1.01-APR-2008.csv
}
-
# after we've put a header in a file, we need to ignore that file the
# next time through the loop so that we don't overwrite data.
ignored.csvs <- c(ignored.csvs, tmp.files.csv)
- ## Split the Files
- print(paste("Splitting ",filename.csv,sep=""))
- # The following awk will put data in our CSV files which currently only have column headers;
- # Improved awk w/ file close to deal with 'too many open files', thanks to Josh Ulrich
- system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";print >> f1; if(f1 != f2) { close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
- ## command line awkstring would look like this:
- # awk -v f2="" -F "," '{f1 = $1"."$2".csv"; print >> f1; if(f1 != f2) { close(f2); f2=f1; } }' sourcefile.csv
- ## NOTE: if you get errors on 'too many open files' from awk, you'll need to adjust ulimit/nolimit
- print(paste('Done splitting ', filename.csv, sep=""))
+
+ # If all of the files that awk just created already exist in csv_dir and overwrite==FALSE, then
+ # there is no need to split this csv because we're not going to move any to csv_dir anyway.
+ # a file in csv_dir might be "2012.02.10.AAPL.O.csv", but we have "AAPL.O.10-FEB-2012.csv"
+ tmp <- gsub("\\.csv", "", tmp.files.csv)
+ new.names <- do.call(c, lapply(strsplit(tmp, "\\."), function(x) {
+ day <- gsub("-", ".", as.Date(x[length(x)], format='%d-%b-%Y'))
+ fl <- make.names(paste(x[-length(x)], collapse="."))
+ paste(day, "/", day, ".", fl, sep="")
+ }))
+ if (!all(file.exists(paste(paste(.TRTH$csv_dir, new.names, sep=""), ".csv", sep=""))) || isTRUE(.TRTH$overwrite)) {
+ ## Split the Files
+ print(paste("Splitting ",filename.csv,sep=""))
+ # The following awk will put data in our CSV files which currently only have column headers;
+ # Improved awk w/ file close to deal with 'too many open files', thanks to Josh Ulrich
+ system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";print >> f1; if(f1 != f2) { close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
+ ## command line awkstring would look like this:
+ # awk -v f2="" -F "," '{f1 = $1"."$2".csv"; print >> f1; if(f1 != f2) { close(f2); f2=f1; } }' sourcefile.csv
+ ## NOTE: if you get errors on 'too many open files' from awk, you'll need to adjust ulimit/nolimit
+ print(paste('Done splitting ', filename.csv, sep=""))
+ } else print('All CSVs created by awk already exist. Not re-splitting')
# remove header file
- invisible(file.remove(paste(.TRTH$archive_dir, files.header, sep="")))
+ invisible(file.remove(paste(.TRTH$tmp$archive_dir, files.header, sep="")))
# remove unzipped csv
- invisible(file.remove(paste(.TRTH$archive_dir, filename.csv, sep="")))
+ invisible(file.remove(paste(.TRTH$tmp$archive_dir, filename.csv, sep="")))
## Zip the File
# print(paste("zipping ",filename.csv,sep=""))
# system(paste("gzip -f ",archive_dir,filename.csv,sep=""))
@@ -427,9 +442,12 @@
## Move files to appropriate place
#system(paste("mv -vf ", path.output,"Archives/",name.csv, " ", path.output,date.format,"/",date.format,".",name.new,".csv", sep=""))
- system(paste("mv -f ", name.csv, " ", .TRTH$csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
-
- print(paste(date.format, name.new, "moved", sep=" "))
+ if (isTRUE(.TRTH$overwrite)) {
+ system(paste("mv -fv ", name.csv, " ", .TRTH$csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
+ } else {
+ system(paste("mv -nv ", name.csv, " ", .TRTH$csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
+ }
+ #print(paste(date.format, name.new, "moved", sep=" "))
files.xts <- rbind(files.xts,as.data.frame(cbind(name.new,date.format),stringsAsFactors=FALSE))
}
files.xts$type <- rep(NA, NROW(files.xts))
@@ -566,7 +584,7 @@
# if xts and sec data already exist for this product/Date, and overwrite == FALSE,
# there is nothing to be done -- return NULL
if (!any(c(write.tick, write.sec))) return(NULL)
-
+ #TODO: unzip to a tempdir
CSV.name <- paste(.TRTH$csv_dir, date, '/', date, '.', RIC, '.csv', sep="")
if (!file.exists(CSV.name) && file.exists(paste(CSV.name, ".gz", sep=""))) {
#only zipped file on disk. We'll have to unzip.
More information about the Blotter-commits
mailing list