[Blotter-commits] r922 - in pkg/FinancialInstrument: . inst/parser

noreply at r-forge.r-project.org noreply at r-forge.r-project.org
Sun Feb 12 01:15:37 CET 2012


Author: gsee
Date: 2012-02-12 01:15:36 +0100 (Sun, 12 Feb 2012)
New Revision: 922

Modified:
   pkg/FinancialInstrument/DESCRIPTION
   pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R
Log:
 - removed some obvious roadblocks to running TRTH_BackFill functions in 
   multiple instances of R simultaneously, but it is still not recommended, 
   especially if there are overlapping files in the jobs.
 - removed doCleanup option because unzipping and splitting now takes place
   in a temp dir. So, if a process is interrupted, no partial files will be left in
   archive_dir.  Although it is no longer called, CleanUpArchive still exists.
 - after making headers, if all header files already exist in csv_dir and 
   overwrite==FALSE, there is no need to split

Modified: pkg/FinancialInstrument/DESCRIPTION
===================================================================
--- pkg/FinancialInstrument/DESCRIPTION	2012-02-07 20:09:32 UTC (rev 921)
+++ pkg/FinancialInstrument/DESCRIPTION	2012-02-12 00:15:36 UTC (rev 922)
@@ -11,7 +11,7 @@
     meta-data and relationships. Provides support for
     multi-asset class and multi-currency portfolios. Still
     in heavy development.
-Version: 0.10.9
+Version: 0.10.10
 URL: https://r-forge.r-project.org/projects/blotter/
 Date: $Date$
 Depends:

Modified: pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R
===================================================================
--- pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R	2012-02-07 20:09:32 UTC (rev 921)
+++ pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R	2012-02-12 00:15:36 UTC (rev 922)
@@ -37,8 +37,6 @@
 #instrument_file = [searches path.output for filename containing 'instruments'] #name of instrument envir RData file
 #job.name = ""                          # the Reuters TRTH job name (by default all files not on disk)
 #no.cores = 4                           # number of cores for foreach
-#doCleanUp = TRUE                       # should files that do not end with .csv.gz be 
-                                        # removed from archive_dir (see CleanUpArchive function)
 #overwrite = FALSE                      # will not redownload, or overwrite files unless this is TRUE
 #username = stop("")                    #TRTH user name, usually your email address
 #password = stop("")                    #TRTH password
@@ -64,8 +62,7 @@
 #    overwrite = FALSE,
 #    tick.image = TRUE,
 #    sec.image = TRUE,
-#    no.cores = 20,
-#    doCleanUp = TRUE
+#    no.cores = 20
 #)
 #
 #download_reut(.TRTH)                   # Download big zipped CSV
@@ -130,7 +127,6 @@
 
     #if (!is.null(dargs$path.output)) 
     .TRTH$path.output <- path.output <- addslash(path.output)
-
     .TRTH$archive_dir <- pickDirArg("archive_dir")
     .TRTH$csv_dir <- pickDirArg("csv_dir")
     .TRTH$tick_dir <- pickDirArg("tick_dir")
@@ -143,6 +139,16 @@
     makeDir(.TRTH$tick_dir)
     makeDir(.TRTH$sec_dir)
 
+    # make a temp dir to use for splitting so that (fingers crossed)
+    # more than one instance can be run at a time in separate R sessions.
+    tmp <- list()
+    tmp$path.output <- addslash(tempdir())
+    dir.create(tmp$archive_dir <- paste(tmp$path.output, "archive/", sep=""), showWarnings=FALSE, mode='0775')
+    dir.create(tmp$csv_dir <- paste(tmp$path.output, "csv/", sep=""), showWarnings=FALSE, mode='0775')
+    dir.create(tmp$tick_dir <- paste(tmp$path.output, "archive/", sep=""), showWarnings=FALSE, mode='0775')
+    dir.create(tmp$sec_dir <- paste(tmp$path.output, "archive/", sep=""), showWarnings=FALSE, mode='0775')
+    .TRTH$tmp <- tmp
+    
     pickArg <- function(x, default=NULL) {
         # if argument "x" was passed through dots, use that
         # otherwise, if it was in config_file, use that
@@ -168,7 +174,6 @@
     .TRTH$tick.image <- pickArg('tick.image', TRUE)
     .TRTH$sec.image <- pickArg('sec.image', TRUE)
     .TRTH$no.cores <- pickArg('no.cores', 4)
-    .TRTH$doCleanUp <- pickArg('doCleanUp', TRUE)
 
     .TRTH$use.instrument <- pickArg('use.instrument', FALSE) 
     # if `use.instrument` is TRUE, then the code will remove negative prices from
@@ -184,8 +189,6 @@
             stop("Please specify a valid filepath for instrument_file or move a file with 'instruments' in its name to 'path.output'")
     }
 
-    if (isTRUE(.TRTH$doCleanUp)) CleanUpArchive(.TRTH$archive_dir)
-
     registerDoMC(.TRTH$no.cores)
     # registerDoSEQ()
 
@@ -199,7 +202,7 @@
         .TRTH <- try(get('.TRTH', pos=.GlobalEnv))
         if (inherits(.TRTH, 'try-error')) stop("Run configureTRTH function first")
     }
-#    attach(.TRTH)
+
     Sys.umask("0002")
 
     Archive.output <- list.files(.TRTH$archive_dir)
@@ -305,9 +308,7 @@
 splitCSV <- function(.TRTH) {
     #FIXME: respect overwrite argument
     if (missing(.TRTH) && !exists(".TRTH")) stop("Run configureTRTH function first")
-    on.exit(CleanUpArchive(.TRTH$archive_dir))
-    if (isTRUE(.TRTH$doCleanUp)) CleanUpArchive(.TRTH$archive_dir)
-
+    
     if (substr(.TRTH$path.output, nchar(.TRTH$path.output), nchar(.TRTH$path.output)) != "/") {
         .TRTH$path.output <- paste(.TRTH$path.output, "/", sep="")
     }
@@ -327,7 +328,7 @@
     if (isTRUE(.TRTH$use.instrument)) loadInstruments(.TRTH$instrument_file)
     registerDoMC(.TRTH$no.cores)
 
-    ## unzip and split (new unzip method does not require rezip; keeps original gz file)
+    ## unzip to tempdir and split (new unzip method does not require rezip; keeps original gz file)
     setwd(.TRTH$archive_dir)
 
     foreach(i = 1:length(.TRTH$files.gz)) %dopar% 
@@ -337,13 +338,14 @@
 	    #unzip the file
 	    print(paste("unzipping ",filename.gz, sep=""))
         #system(paste("gzip -d -f ",archive_dir,filename.gz,sep=""))
-        system(paste("gunzip -f < ", .TRTH$archive_dir, filename.gz, " > ", .TRTH$archive_dir, filename.csv, sep=""))
+        system(paste("gunzip -f < ", .TRTH$archive_dir, filename.gz, " > ", .TRTH$tmp$archive_dir, filename.csv, sep=""))
     }
-    ignored.csvs <- NULL #this will hold the names of CSVs that already have a header
-    for (i in 1:length(.TRTH$files.gz)) 
+    ignored.csvs <- NULL #this will hold the names of CSVs that already have a header    
+    setwd(.TRTH$tmp$archive) #this directory contains the big CSVs that were unzipped
+    .TRTH$files.csv <- list.files(.TRTH$tmp$archive)
+    for (i in 1:length(.TRTH$files.csv)) 
     {
-        filename.gz <- .TRTH$files.gz[i]
-        filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
+        filename.csv <- .TRTH$files.csv[i]
         # Use awk to split the big CSV into daily CSVs.  Each CSV will have a single
         # row which we will then overwrite with the column headers.  Then we'll
         # use awk again to put the data into the split files
@@ -353,7 +355,7 @@
         print(paste('Making headers from', filename.csv))
         system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";if(f1 != f2) { print >> f1; close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
         
-        tmpfiles <- list.files(.TRTH$archive_dir)
+        tmpfiles <- list.files(.TRTH$tmp$archive_dir)
         files.header <- tmpfiles[grep("RIC",tmpfiles)]
 
         big.files <- tmpfiles[grep("@", tmpfiles)] #Big zipped CSVs from Reuters have e-mail address in name
@@ -376,26 +378,39 @@
         # mv header.csv "RIC.Date[G].csv"
 
         for (fl in tmp.files.csv) { # make files with header that awk will later populate
-            system(paste('cp "', files.header, '" ', paste(.TRTH$archive_dir, fl, sep=""), sep=""))
+            # -n means don't overwrite files; useful if e.g. part001.csv.gz ends halfway through the day
+            # and part002.csv.gz has a different number of columns (I'm not sure if that ever happens, though)
+            system(paste('cp -n "', files.header, '" ', paste(.TRTH$tmp$archive_dir, fl, sep=""), sep=""))
             #cp "#RIC.Date[G].csv" /home/garrett/TRTH/archive/GEM1-U1.01-APR-2008.csv
         }
-
         # after we've put a header in a file, we need to ignore that file the
         # next time through the loop so that we don't overwrite data.
         ignored.csvs <- c(ignored.csvs, tmp.files.csv)
-	    ## Split the Files 
-	    print(paste("Splitting ",filename.csv,sep=""))
-        # The following awk will put data in our CSV files which currently only have column headers;
-        #  Improved awk w/ file close to deal with 'too many open files', thanks to Josh Ulrich
-        system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";print >> f1; if(f1 != f2) { close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
-        ## command line awkstring would look like this:
-        # awk -v f2="" -F ","  '{f1 = $1"."$2".csv"; print >> f1; if(f1 != f2) { close(f2); f2=f1; } }' sourcefile.csv
-        ## NOTE: if you get errors on 'too many open files' from awk, you'll need to adjust ulimit/nolimit
-	    print(paste('Done splitting ', filename.csv, sep=""))
+
+        # If all of the files that awk just created already exist in csv_dir and overwrite==FALSE, then
+        # there is no need to split this csv because we're not going to move any to csv_dir anyway.
+        # a file in csv_dir might be "2012.02.10.AAPL.O.csv", but we have "AAPL.O.10-FEB-2012.csv"
+        tmp <- gsub("\\.csv", "", tmp.files.csv)
+        new.names <- do.call(c, lapply(strsplit(tmp, "\\."), function(x) {
+            day <- gsub("-", ".", as.Date(x[length(x)], format='%d-%b-%Y'))
+            fl <- make.names(paste(x[-length(x)], collapse="."))
+            paste(day, "/", day, ".", fl, sep="")
+        }))
+        if (!all(file.exists(paste(paste(.TRTH$csv_dir, new.names, sep=""), ".csv", sep=""))) || isTRUE(.TRTH$overwrite)) {
+            ## Split the Files 
+            print(paste("Splitting ",filename.csv,sep=""))
+            # The following awk will put data in our CSV files which currently only have column headers;
+            #  Improved awk w/ file close to deal with 'too many open files', thanks to Josh Ulrich
+            system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";print >> f1; if(f1 != f2) { close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
+            ## command line awkstring would look like this:
+            # awk -v f2="" -F ","  '{f1 = $1"."$2".csv"; print >> f1; if(f1 != f2) { close(f2); f2=f1; } }' sourcefile.csv
+            ## NOTE: if you get errors on 'too many open files' from awk, you'll need to adjust ulimit/nolimit
+            print(paste('Done splitting ', filename.csv, sep=""))
+        } else print('All CSVs created by awk already exist. Not re-splitting')
         # remove header file
-        invisible(file.remove(paste(.TRTH$archive_dir, files.header, sep="")))
+        invisible(file.remove(paste(.TRTH$tmp$archive_dir, files.header, sep="")))
         # remove unzipped csv
-        invisible(file.remove(paste(.TRTH$archive_dir, filename.csv, sep="")))
+        invisible(file.remove(paste(.TRTH$tmp$archive_dir, filename.csv, sep="")))
 	    ## Zip the File
         # print(paste("zipping ",filename.csv,sep=""))
         # system(paste("gzip -f ",archive_dir,filename.csv,sep=""))
@@ -427,9 +442,12 @@
 
         ## Move files to appropriate place
         #system(paste("mv -vf ", path.output,"Archives/",name.csv, " ", path.output,date.format,"/",date.format,".",name.new,".csv", sep=""))
-        system(paste("mv -f ", name.csv, " ", .TRTH$csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
-
-        print(paste(date.format, name.new, "moved", sep=" "))
+        if (isTRUE(.TRTH$overwrite)) {
+            system(paste("mv -fv ", name.csv, " ", .TRTH$csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
+        } else {
+            system(paste("mv -nv ", name.csv, " ", .TRTH$csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
+        }
+        #print(paste(date.format, name.new, "moved", sep=" "))
         files.xts <- rbind(files.xts,as.data.frame(cbind(name.new,date.format),stringsAsFactors=FALSE))
     }
     files.xts$type <- rep(NA, NROW(files.xts))
@@ -566,7 +584,7 @@
         # if xts and sec data already exist for this product/Date, and overwrite == FALSE, 
         # there is nothing to be done -- return NULL
         if (!any(c(write.tick, write.sec))) return(NULL) 
-
+        #TODO: unzip to a tempdir
         CSV.name <- paste(.TRTH$csv_dir, date, '/', date, '.', RIC, '.csv', sep="")
         if (!file.exists(CSV.name) && file.exists(paste(CSV.name, ".gz", sep=""))) {
             #only zipped file on disk. We'll have to unzip.



More information about the Blotter-commits mailing list