[Blotter-commits] r884 - in pkg/FinancialInstrument: R inst/parser

noreply at r-forge.r-project.org noreply at r-forge.r-project.org
Thu Dec 22 18:43:25 CET 2011


Author: gsee
Date: 2011-12-22 18:43:25 +0100 (Thu, 22 Dec 2011)
New Revision: 884

Modified:
   pkg/FinancialInstrument/R/load.instruments.R
   pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R
Log:
 - New functionalized version of TRTH_BackFill
 - slightly better check to see if getInstruments.FI did not find data


Modified: pkg/FinancialInstrument/R/load.instruments.R
===================================================================
--- pkg/FinancialInstrument/R/load.instruments.R	2011-12-22 17:37:33 UTC (rev 883)
+++ pkg/FinancialInstrument/R/load.instruments.R	2011-12-22 17:43:25 UTC (rev 884)
@@ -388,7 +388,7 @@
         }
     }) #end loop over Symbols
 
-    if (length(datl[[1]]) == 0) {
+    if (is.null(unlist(datl)) {
         warning("No data found.")
         return(NULL) 
     }

Modified: pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R
===================================================================
--- pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R	2011-12-22 17:37:33 UTC (rev 883)
+++ pkg/FinancialInstrument/inst/parser/TRTH_BackFill.R	2011-12-22 17:43:25 UTC (rev 884)
@@ -1,423 +1,649 @@
-##############################################################################
-#                  Reuters Backfill Configuration Parameters                 #         
-##############################################################################
+#############################################################################################
+# This file contains functions that are used to parse zipped csv files from Reuters         
+# After sourcing these functions (this script),                                             
+# 1st run "configureTRTH" which will create an environment to hold parameter values         
+# 2nd run "download_reut" to download the big zipped csv files to your archive directory    
+# 3rd run "splitCSV" that will unzip the big zipped Reuters files that were downloaded.     
+#   Then it will split the file such that there will be a file for each day for each symbol 
+#   and it will put those split files in your csv directory                                 
+# 4th run "FEreut2xts" (requires foreach) which will read the csv files into R, 
+#   do a little data scrubbing, and save the xts data into your tick directory.  
+#   Then it will convert the data to 1 second frequency data and save that into your sec 
+#   directory.  Also, if tick.image and/or sec.image are TRUE, it will create plots of the 
+#   data and store them.             
+#                                                                                           
+#############################################################################################
+#                  Reuters Backfill Configuration Parameters                                #
+#############################################################################################
+## Arguments and their defaults
+# config.file (character name of config file) is optional.  
+# If provided, config.file will be sourced; 
+# i.e you can use it instead of specifying all the parameters in the dots. 
+# any arguments provided in dots will override arguments of same name from config.file
+##
+#path.output = '~/TRTH/'                # base directory for output
+#Tick2Sec_file = '~/TRTH/Tick2Sec.R'    # path/to/Tick2Sec.R which contains to_secBATV function definition
+#tick_dir = '~/TRTH/tick'               # directory in which to store tick data
+#archive_dir = '~/TRTH/archive'         # directory in which to store downloaded .gz files
+#csv_dir = '~/TRTH/csv'                 # directory in which to store zipped csv files
+#sec_dir = '~/TRTH/sec'                 # directory in which to store second data
+#sec.image = TRUE                       # save a chart of the second data?
+#tick.image = TRUE                      # save a chart of the tick data?
+#default_type = 'guaranteed_spread'     # passed to instrument.auto if type cannot be inferred from RIC 
+#default_currency = 'USD'               # passed to instrument.auto if type cannot be inferred from RIC
+#digits.sec = 6                         # for options(digits.secs=digits.secs)
+#width = 200                            # for options(width=width)
+#instrument_file = [searches path.output for filename containing 'instruments'] #name of instrument envir RData file
+#job.name = ""                          # the Reuters TRTH job name (by default all files not on disk)
+#no.cores = 4                           # number of cores for foreach
+#overwrite = FALSE                      # will not redownload, or overwrite files unless this is TRUE
+#username = stop("")                    #TRTH user name, usually your email address
+#password = stop("")                    #TRTH password
 #
-# You should create a configuration file that includes the following
+#email_to <- 'someuser at somehost.com'    #NOT IN USE
+#email_from <- 'someuser at somehost.com'  #NOT IN USE
+#############################################################################################
+#               Below is how you would typically use these functions                        #
+#############################################################################################
+##configureTRTH('~/TRTH/TRTH_config_file.R')
+## OR
+#configureTRTH(
+#    path.output = '~/TRTH/',
+#    width = 200,
+#    digits.secs = 6,
+#    instrument_file = '~/TRTH/instruments.RData',
+#    username = 'email at domain.com',
+#    password = 'password',
+#    default_type = 'guaranteed_spread',
+#    default_currecy = "USD",
+#    job.name = "ReutersJobName",
+#    overwrite = FALSE,
+#    tick.image = TRUE,
+#    sec.image = TRUE,
+#    no.cores = 20
+#)
 #
-#instrument_file <- '/full/path/to/instruments.rda' #where your instrument metadata is stored
-#
-#archive_dir <- "/full/path/to/archives/" # where the split CSV, job reports, and gz files will be stored
-#path.output <- "/full/path/to/output/" # root dir where the .rda files for the xts will be placed
-#
-#job.name <- "mybackfill" # the Reuters TRTH job name
-#
-#username <- "username" #TRTH user name, usually your email address
-#password <- "password" #TRTH password
-#image <- TRUE
-#default_type='future_series'
-#default_type='guaranteed_spread'
-#
-#email_to <- 'someuser at somehost.com'
-#email_from <- 'someuser at somehost.com'
-#
-#overwrite <- FALSE # whether to overwrite files from the same date, default FALSE
-#image <- TRUE # whether to write write Bid, Ask, and Price images to disk for diagnostics, default TRUE 
-#
-#no.cores <- 1 # for foreach
-#
-# # Also add any additional options that would make this script run better in your environment
-#
-##############################################################################
-config_file<-'TRTH_config_file.R'
-source(config_file)
+#download_reut(.TRTH)                   # Download big zipped CSV
+#system.time(splitCSV(.TRTH))           # Split into daily CSVs
+#system.time(Out <- FEreut2xts(.TRTH))  # Convert to xts data: tick and second
+#############################################################################################
 
-load(instrument_file)
 
-Sys.umask("0002")
+configureTRTH <- function(config.file, path.output='~/TRTH/', Tick2Sec_file, ...) {
+    ## Create environment to hold variables that more than one function needs to access    
+    .TRTH <- new.env(parent=.GlobalEnv)
+    dargs <- list(...)
 
-options(width=200)
-Date <- Sys.Date()
+    ## Load required packages
+    require(qmao)
+    #require(FinancialInstrument)
+    require(doMC)
+    #require(sendmailR) # for email on failure
 
-require(xts)
-require(quantmod)
-require(FinancialInstrument)
-require(doMC)
-#require(sendmailR) # for email on failure
-#error.codes<-read.csv('curl.errors.csv',stringsAsFactors=FALSE,header=TRUE,row.names=1)
+    ## Some convenience functions
+    addslash <- function(x) {
+        if (substr(x, nchar(x), nchar(x)) != '/') paste(x, "/", sep="")
+        else x   
+    }
+    makeDir <- function(x) { #if directory does not exist, create it
+        dir.create(x, showWarnings=FALSE, recursive=TRUE, mode="0775") #why not mode="0664" ???
+    }
 
-registerDoMC(no.cores)
-# registerDoSEQ()
+    ## Source the config_file -- this will be overwritten by any arguments in dots
+    if (!missing(config.file)) source(config_file)
 
-Archive.output <- list.files(archive_dir)
-Archive.output <- Archive.output[grep(".gz",Archive.output)]
-Archive.output <- Archive.output[-c(grep("confirmation",Archive.output),grep("report",Archive.output))]
+    # There are some variables that we need that should be in the config file.
+    # Anything passed in through dots will override arguments of the same name that were in config_file
+    # Some things (subdirectory names) we will create if they aren't in dots or config_file
+    pickDirArg <- function(x) {
+        if (!is.null(dargs[[x]])) return(dargs[[x]]) #passed through dots
+        if (exists(x)) return(addslash(get(x)))
+        addslash(paste(path.output, sub("_dir", "", x), sep=""))
+    }
 
-listflag=FALSE
-while(!listflag)#try to download file list 
-{
-    clear<-warnings() #currency loads from oanda alway generate warnings, clear them out
-    Reuters <- system(paste("curl ftp://tickhistory-ftp.thomsonreuters.com:15500/results/ -u ",username,":",password," --ftp-ssl -k -l",sep=""),intern=TRUE)
-    w<-''
-    w<-warnings()
-    if(!as.logical(length(Reuters)) || isTRUE(grep('curl',names(w))))
-    {
-        tmpmsg<-paste("curl returned error code", names(w),'\n','while attempting to download file list','\n','script will wait and retry in 30 min')
-        #sendmail(email_to,email_from,"error downloading Reuters file list",msg=tmpmsg)
-        Sys.sleep(180)
-    } else listflag=TRUE
-    
+    #if (!is.null(dargs$path.output)) 
+    .TRTH$path.output <- path.output <- addslash(path.output)
+    if (missing(Tick2Sec_file) && !exists('Tick2Sec_file')) 
+        Tick2Sec_file <- paste(path.output, "Tick2Sec.R", sep="") 
+    if (!file.exists(Tick2Sec_file)) stop("Please provide a valid filepath for 'Tick2Sec_file' or move 'Tick2Sec.R' into 'path.output'")
+    .TRTH$Tick2Sec_file <- Tick2Sec_file
+
+    .TRTH$archive_dir <- pickDirArg("archive_dir")
+    .TRTH$csv_dir <- pickDirArg("csv_dir")
+    .TRTH$tick_dir <- pickDirArg("tick_dir")
+    .TRTH$sec_dir <- pickDirArg("sec_dir")
+
+    # Make sure the directories we need exist.
+    makeDir(.TRTH$path.output)
+    makeDir(.TRTH$archive_dir)
+    makeDir(.TRTH$csv_dir)
+    makeDir(.TRTH$tick_dir)
+    makeDir(.TRTH$sec_dir)
+
+    pickArg <- function(x, default=NULL) {
+        # if argument "x" was passed through dots, use that
+        # otherwise, if it was in config_file, use that
+        # if it's neither in dots, nor in config_file, use default
+        if (!is.null(dargs[[x]])) return(dargs[[x]]) #passed through dots
+        if (exists(x)) return(get(x))
+        default
+    }
+
+    ## Set some options/preferences
+    .TRTH$width <- pickArg('width', 200)
+    options(width=.TRTH$width)
+    .TRTH$digits.sec <- pickArg('digits.sec', 6)
+    options(digits.secs=.TRTH$digits.secs)
+
+    .TRTH$username <- pickArg('username', stop("Please provide your username"))
+    .TRTH$password <- pickArg('password', stop("Please provide your password"))
+    .TRTH$job.name <- pickArg('job.name', "")
+    .TRTH$default_type <- pickArg('default_type', 'guaranteed_spread')
+    .TRTH$default_currency <- pickArg('default_currency', 'USD')
+    .TRTH$overwrite <- pickArg('overwrite', FALSE)
+    .TRTH$tick.image <- pickArg('tick.image', TRUE)
+    .TRTH$sec.image <- pickArg('sec.image', TRUE)
+    .TRTH$no.cores <- pickArg('no.cores', 4)
+
+    instr.file.bak <- tail(list.files(path.output)[grep("instruments", list.files(path.output))], 1)
+    .TRTH$instrument_file <- pickArg('instrument_file', instr.file.bak)
+    if (length(.TRTH$instrument_file) == 0 || is.na(.TRTH$instrument_file) || !file.exists(.TRTH$instrument_file))
+        stop("Please specify a valid filepath for instrument_file or move a file with 'instruments' in its name to 'path.output'")
+
+    registerDoMC(.TRTH$no.cores)
+    # registerDoSEQ()
+
+    assign('.TRTH', .TRTH, pos=.GlobalEnv)
+    .TRTH
 }
 
-# now we're past the while loop, so we have a file list
-Reuters.report <- Reuters[grep("report",Reuters)]
-Reuters.output <-  Reuters[-c(grep("report",Reuters),grep("confirmation",Reuters))]
-Reuters.output <-  Reuters.output[grep(job.name,Reuters.output)]
 
-Reuters.new <- Reuters.output[!(Reuters.output %in% Archive.output)]
+download_reut <- function(.TRTH) {
+    if (missing(.TRTH)) {
+        .TRTH <- try(get('.TRTH', pos=.GlobalEnv))
+        if (inherits(.TRTH, 'try-error')) stop("Run configureTRTH function first")
+    }
+    attach(.TRTH)
+    Sys.umask("0002")
 
-# or to start here in the script from the file list
-# (see comment above about fixing broken set)
-# Reuters.output <-  Archive.output[grep(job.name,Archive.output)]
-# Reuters.new <- Reuters.output
+    Archive.output <- list.files(archive_dir)
+    Archive.output <- Archive.output[grep("\\.gz",Archive.output)]
+    omit <- c(grep("confirmation",Archive.output),grep("report",Archive.output))
+    if (length(omit) > 0) Archive.output <- Archive.output[-omit]
 
-
-for(i in 1:length(Reuters.new))
-{	
-	filename.gz <- Reuters.new[i]
-	filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
-	
-	alias <- unlist(strsplit(filename.gz,"-"))[3]
-	alias <- unlist(strsplit(alias,".csv.gz"))
-	
-	## Download New Datasets
-	print(paste("Downloading ",filename.gz,sep=""))
-    fileflag=FALSE	
-    while(!fileflag) #try to download individual files
+    listflag=FALSE
+    while(!listflag)#try to download file list 
     {
-        Reuters2<-system(paste("curl -m 10800 --max-filesize 1610612736 ftp://tickhistory-ftp.thomsonreuters.com:15500/results/",filename.gz, " -u ",username,":",password," --ftp-ssl -k > ",archive_dir,"/",filename.gz,sep=""))
-        if(Reuters2!=0)
+        clear <- warnings() #currency loads from oanda alway generate warnings, clear them out
+        Reuters <- system(paste("curl ftp://tickhistory-ftp.thomsonreuters.com:15500/results/ -u ",
+                                username,":",password," --ftp-ssl -k -l",sep=""),intern=TRUE)
+        cat("\n")
+        w <- ''
+        w <- warnings()[!warnings() %in% clear]
+        if(!as.logical(length(Reuters)) || isTRUE(grep('curl',names(w))))
         {
-            w2<-''
-            w2<-warnings()
-            tmpmsg<-paste("curl returned error code", Reuters2,"\n",w2,'\n','while attempting to download',filename.gz,'\n','will wait and retry in 10 min')
-            #sendmail(email_to,email_from,paste("error downloading Reuters file",filename.gz),msg=tmpmsg)
-            Sys.sleep(600)
-        } else fileflag=TRUE
+            tmpmsg<-paste("curl returned error code", names(w),'\n',
+                        'while attempting to download file list','\n',
+                        'script will wait and retry in 30 min')
+            #sendmail(email_to,email_from,"error downloading Reuters file list",msg=tmpmsg)
+            Sys.sleep(180)
+        } else listflag=TRUE
+        
     }
+    # now we're past the while loop, so we have a file list
+    Reuters.report <- Reuters[grep("report",Reuters)]
+
+    Reuters.output <-  Reuters[-c(grep("report",Reuters),grep("confirmation",Reuters))]
+    Reuters.output <-  Reuters.output[grep(job.name, Reuters.output)]
+
+    files.gz <- Reuters.output[!(Reuters.output %in% Archive.output)]
+    #files.gz <- paste(username, "-", job.name, ".csv.gz", sep="")
+
+    if (length(files.gz) == 0) files.gz <- Reuters.output
+    .TRTH$files.gz = files.gz
+    assign(".TRTH", .TRTH, pos=.GlobalEnv)
     
-	## Download Report s
-	system(paste("curl ftp://tickhistory-ftp.thomsonreuters.com:15500/results/",Reuters.report[grep(alias,Reuters.report)], " -u ",username,":",password," --ftp-ssl -k > ",archive_dir,"/",Reuters.report[grep(alias,Reuters.report)],sep=""))
-	#system(paste("gzip -d -f ",archive_dir,"Report/",Reuters.report[grep(alias,Reuters.report)],sep=""))
+    for(i in 1:length(files.gz))
+    {	
+        filename.gz <- files.gz[i]
+        filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
 	
-}	
+        alias <- unlist(strsplit(filename.gz,"-"))[3]
+        alias <- unlist(strsplit(alias,".csv.gz"))
 
-## now unzip, split, rezip
-setwd(archive_dir)
-#foreach(j=1:length(Reuters.new)) %dopar%
-if (!length(Reuters.new)){
-    stop('you have no files to download, maybe you need to define a file list manually and run from there?')
-} else for(j in 1:length(Reuters.new))
-{
-	
-	#	system(paste("split --lines=10000000 -d ",filename.csv," split.",sep=""))
-	#	files.split <- list.files("/home/mktdata/ReutersData/Archives", pattern="split.")
-	
-	filename.gz <- Reuters.new[j]
-	filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
+	    ## Download New Datasets
+        print(paste("Downloading ",filename.gz,sep=""))
+        fileflag=FALSE
+        Reuters2 <- 0
+        while(!fileflag) #try to download individual files
+        {
+            if (!file.exists(paste(archive_dir, filename.gz, sep="")) || overwrite) {
+                Reuters2 <- system(paste("curl -m 10800 --max-filesize 1610612736 ftp://tickhistory-ftp.thomsonreuters.com:15500/results/", 
+                                    filename.gz, " -u ", username, ":", password, " --ssl -k > ", archive_dir, filename.gz, sep=""))
+            } #else cat(paste(filename.gz, 'already exists, and overwrite==FALSE; not re-downloading.'), "\n")
+            if(Reuters2 != 0)
+            {
+                w2 <- ''
+                w2 <- warnings()
+                tmpmsg <- paste("curl returned error code", Reuters2,"\n",
+                                w2,'\n','while attempting to download',filename.gz,'\n',
+                                'will wait and retry in 10 min')
+                #sendmail(email_to,email_from,paste("error downloading Reuters file",filename.gz),msg=tmpmsg)
+                Sys.sleep(600)
+            } else fileflag=TRUE
+        }
+        
+	    ## Download Report s
+        if (!file.exists(paste(archive_dir, Reuters.report[grep(alias,Reuters.report)], sep="")) || overwrite) { 
+    	    system(paste("curl ftp://tickhistory-ftp.thomsonreuters.com:15500/results/",
+                        Reuters.report[grep(alias,Reuters.report)], " -u ", username, ":", password,
+                        " --ftp-ssl -k > ", archive_dir, Reuters.report[grep(alias,Reuters.report)], sep=""))
+	        #system(paste("gzip -d -f ",archive_dir,"Report/",Reuters.report[grep(alias,Reuters.report)],sep=""))
+	        cat("\n")
+        } #else cat(paste(Reuters.report[grep(alias,Reuters.report)], 
+          #      "already exists, and overwrite==FALSE; not re-downloading.\n"))
+    }
 
-	#unzip the file
-	print(paste("unzipping ",filename.gz, sep=""))
-	system(paste("gzip -d -f ",archive_dir,'/',filename.gz,sep=""))
-	
-	alias <- unlist(strsplit(filename.gz,"-"))[3]
-	alias <- unlist(strsplit(alias,".csv.gz"))
-		
-	## Split the Files 
-	## command line awkstring would look like this:
-	#awk -F "," '{print >>$1"."$2".csv"}' sourcefile.csv
-	#
-	## modified to deal with 'too many open files', thanks to Josh Ulrich
-	#	awk -v f2="" -F "," '{
-	#			f1 = $1"."$2".csv";
-	#			print >> f1;
-	#			if(f1 != f2) {
-	#			close(f2);
-	#			f2=f1;
-	#			}
-	#			}' sourcefile.csv
-	## NOTE: if you get errors on 'too many open files' from awk, you'll need to adjust ulimit/nolimit
-	print(paste("Splitting ",filename.csv,sep=""))
-	#system(paste('awk -F "," ',"'{print >> $1", '"."$2".csv','"}',"' ",filename.csv, sep=""))
-	system(paste('awk -v f2="" -F "," '," '",'{
-			f1 = $1"."$2".csv";
-			print >> f1;
-			if(f1 != f2) {
-			close(f2);
-			f2=f1;
-			}
-			}',"' ",filename.csv, sep="")) #Improved awk w/ file close thanks to Josh Ulrich
-	## Zip the File
-	print(paste("zipping ",filename.csv,sep=""))
-	system(paste("gzip -f ",archive_dir,'/',filename.csv,sep=""))
-	
-}	
+    #save(files.gz, file=paste(archive_dir, 'files.gz.tmp.rda', sep=""))
+    #files.gz
+    detach(.TRTH)
+    .TRTH
+}
 
-#initialize output list empty
-files.xts <- NULL
 
-#build the file list to work on
-files.csv <- list.files(archive_dir)
-files.csv <- files.csv[-grep(".csv.gz",files.csv)]
-files.csv <- files.csv[grep(".csv",files.csv)]
-files.header <- files.csv[grep("RIC",files.csv)]
-files.csv <- files.csv[-grep("RIC",files.csv)]
+get_files.gz <- function(archive_dir, job.name){
+    # Don't _really_ need this function now that .TRTH envir is being passed around
+    # but might as well use it since it's already written
+    if (!file.exists(archive_dir)) stop("archive_dir does not exist")
 
-for(k in 1:length(files.csv))
-{
-	#print(k)
-	name.csv <- files.csv[k]
-	name <- unlist(strsplit(name.csv,".",fixed=TRUE))[1]
-	RIC.date <- try(as.Date(unlist(strsplit(name.csv,".",fixed=TRUE))[2], format="%d-%b-%Y"))
-    #if this failed, see if it is one of Reuters .O or similar symbols
-    if(class(RIC.date)=='try-error') {
-        RIC.date <- try(as.Date(unlist(strsplit(name.csv,".",fixed=TRUE))[3], format="%d-%b-%Y"))
-        if(!class(RIC.date)=='try-error'){
-            tmp<-unlist(strsplit(name.csv,".",fixed=TRUE))
-            name <- paste(tmp[1],tmp[2],sep=.)
-        } else {
-            print(paste('File',name.csv,'could not be converted.'))
-            next
-        }
-    }
-	date.format <- gsub("-",".",RIC.date)
-	
-	try(if( weekdays(RIC.date)=="Saturday" | weekdays(RIC.date)=="Sunday")
-	{
-		file.remove(paste(archive_dir,name.csv,sep=""))
-		next
-				
-	})
-		
-	## Handle leading digits and VIX and Cash
-	if(substr(name,1,1)==1){name.new <- substr(name,2,nchar(name));name.new <- make.names(name.new)}
-    else{name.new <- make.names(name)}
-	
-	## Does directory exist?
-	ex <- file.exists(paste(path.output,date.format,"/",sep=""))
-	if(ex != TRUE){dir.create(paste(path.output,date.format,"/",sep=""), mode="775")}
-	
-	## Move files to appropriate place
-	#system(paste("mv -f ", path.output,"Archives/",name.csv, " ", path.output,date.format,"/",date.format,".",name.new,".csv", sep=""))
-	system(paste("mv -f ", name.csv, " ", path.output,date.format,"/",date.format,".",name.new,".csv", sep=""))
-	
-	print(paste(date.format, name.new, "moved", sep=" "))
-	files.xts <- rbind(files.xts,as.data.frame(cbind(name.new,date.format),stringsAsFactors=FALSE))
-	
+    Archive.output <- list.files(archive_dir)
+    Archive.output <- Archive.output[grep("\\.gz",Archive.output)]
+    omit <- c(grep("confirmation",Archive.output),grep("report",Archive.output))
+    if (length(omit) > 0) Archive.output <- Archive.output[-omit]
+    Reuters.output <-  Archive.output[grep(job.name, Archive.output)]
+    #if (length(Reuters.output) == 0) Reuters.output <- Archive.output
+    Reuters.output
 }
 
-# now get instrument data
-files.xts$type<-rep(NA,nrow(files.xts))
-missing_i<-''
-instr_s<-unique(files.xts[,'name.new'])
-for(i in 1:length(instr_s)){
-    instr<-suppressWarnings(getInstrument(instr_s[i]))
-    if(is.instrument(instr)){ 
-        files.xts[files.xts$name.new ==instr_s[i],]$type<-as.character(instr$type[1])
-    } else {
-        print(paste(instr_s[i], 'does not appear to be an instrument, setting it to', default_type))
-        files.xts[files.xts$name.new==instr_s[i],]$type<-default_type
-        missing_i<-c(missing_i,instr_s[i])
+
+splitCSV <- function(.TRTH) {
+    #FIXME: respect overwrite argument
+    if (missing(.TRTH) && !exists(".TRTH")) stop("Run configureTRTH function first")
+    attach(.TRTH)
+    if (substr(path.output, nchar(path.output), nchar(path.output)) != "/") {
+        .TRTH$path.output <- path.output <- paste(path.output, "/", sep="")
     }
-}
-missing_i<-missing_i[-1]
-missing_i<-data.frame(symbol=missing_i,type=default_type)
-write.csv(missing_i,file=paste(archive_dir,'missing_instruments.CSV',sep='')) 
 
-##If trying to fix a broken set:
-#files.csv<-'';for(dir in list.files(getwd(),pattern="20")) {files.csv<-c(files.csv,list.files(paste(getwd(),'/',dir,'/',sep=''),pattern=".csv"))}[-1]
-#files.xts<-NULL
-#for(l in 1:length(files.csv)) { rsplit<-as.vector(strsplit(files.csv[l],'.',fixed=TRUE)[[1]]); files.xts<-rbind(files.xts,cbind(rsplit[4],paste(rsplit[1],rsplit[2],rsplit[3],sep='.'))); print(files.xts[l,])}
-#colnames(files.xts)<-c('name.new','date.format')
+    if (!exists('files.gz')) .TRTH$files.gz <- files.gz <- get_files.gz(archive_dir, job.name)
 
-save(files.xts,file='files.xts.tmp.rda')
+    if (!exists('instrument_file')) { #Don't need this anymore
+        tmp <- list.files(paste(path.output))
+        instrument_file <- paste(path.output, tail(tmp[grep("instruments", tmp)], 1), sep="")
+        if (!file.exists(instrument_file)) {
+            stop("Could not find instrument_file; please specify")
+        } else .TRTH$instrument_file <- instrument_file
+    }
 
-H <- read.csv(paste(path.output,"Archives/#RIC.Date[G].csv",sep=""),header=FALSE,stringsAsFactors=FALSE)
-H <- H[nrow(H),]
-H <- make.names(H)
+    loadInstruments(instrument_file)
+    registerDoMC(no.cores)
 
-## Into xts###############################################################
-reut2xts <- function( data, datapath, image=TRUE, overwrite=FALSE, xFUN=NULL )
-{
-	prod <- data[,'name.new']
-	date <- data[,'date.format']
-    type <- data[,'type']
-    
-	print(paste(date, prod, "xts", sep=" "))
-	
-	RIC.code <- prod
-	
-	file.name <- paste(datapath,"xts/",RIC.code,"/",date,".",RIC.code,".RData",sep="")
-	if(!isTRUE(overwrite) && file.exists(file.name)){
-		return(paste(file.name,"already exists, not overwriting"))	
-	}
+    ## unzip and split (new unzip method does not require rezip; keeps original gz file)
+    setwd(archive_dir)
 
-	Data <- read.csv(paste(datapath,date,'/',date,'.',prod,'.csv',sep=''),stringsAsFactors=FALSE,header=FALSE)
-    if(ncol(Data)!=length(H)){
-        warning("length of headers and downloaded data do not match, trying to adjuust, but be careful!")
-        H<-H[1:ncol(Data)]
+    foreach(i = 1:length(files.gz)) %dopar% 
+    { # unzip in parallel
+        filename.gz <- files.gz[i]
+        filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
+	    #unzip the file
+	    print(paste("unzipping ",filename.gz, sep=""))
+        #system(paste("gzip -d -f ",archive_dir,filename.gz,sep=""))
+        system(paste("gunzip -f < ", archive_dir, filename.gz, " > ", archive_dir, filename.csv, sep=""))
     }
-	names(Data) <- H
-	
-	OTC.remove <- grep("IRGCOND",Data$Qualifiers)
-	OTC.remove <- c(OTC.remove,grep("High[USER]",Data$Qualifiers,fixed=TRUE))
-	OTC.remove <- c(OTC.remove,grep("Low[USER]",Data$Qualifiers,fixed=TRUE))
-	
-	if(substr(prod,1,(nchar(prod)-2))=="ICF"){OTC.remove <- NULL}
-	if(substr(prod,1,(nchar(prod)-2))=="DOL"){OTC.remove <- NULL}
-	if(dim(Data)[1]<=25){return(NULL)}
-	
-	index.new <- as.POSIXct(paste(Data$Date.G.,Data$Time.G,sep=" "),format="%d-%b-%Y%H:%M:%OS",tz="GMT")
-	
-	## Force Everything to numerics
-	Data <- Data[,c("Price","Volume","Bid.Price","Bid.Size","Ask.Price","Ask.Size")]
-	Data$Price <- as.numeric(Data$Price)
-	Data$Volume <- as.numeric(Data$Volume)
-	Data$Bid.Price <- as.numeric(Data$Bid.Price)
-	Data$Bid.Size <- as.numeric(Data$Bid.Size)
-	Data$Ask.Price <- as.numeric(Data$Ask.Price)
-	Data$Ask.Size <- as.numeric(Data$Ask.Size)
-	
-	Data <- xts(Data,order.by=index.new,tz="GMT")
-	
-	## Remove block trades
-	if(length(OTC.remove)){Data <- Data[-OTC.remove]}
 
-	## Turn bids/offers that are zero or less into NA for outrights
-	
-	if(type !="guaranteed_spread")
-	{
-        #remove prints with Prices less than or equal to zero
-		Data$Bid.Price[Data$Bid.Price<=0,] <- NA
-		Data$Ask.Price[Data$Ask.Price<=0,] <- NA
-		Data$Price[Data$Ask.Price<=0,] <- NA
-        ## Remove Trades with Price or Volume of zero   
-        Price.remove <- which(Data$Price == 0)    
-    } else {
-        Price.remove<-NULL
+    for (i in 1:length(files.gz)) 
+    {
+        filename.gz <- files.gz[i]
+        filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
+        # Use awk to split the big CSV into daily CSVs.  Each CSV will have a single
+        # row which we will then overwrite with the column headers.  Then we'll
+        # use awk again to put the data into the split files
+        #
+        # First, make empty files (er, 1-row files that will be overwritten with header)
+        # awk string says to make a file and put this row in it if the RIC or date are different than the previous row's RIC/date
+        print(paste('Making headers from', filename.csv))
+        system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";if(f1 != f2) { print >> f1; close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
     }
+
+    tmpfiles <- list.files(archive_dir)
+    files.header <- tmpfiles[grep("RIC",tmpfiles)]        
+    big.files <- tmpfiles[grep("@", tmpfiles)] #Big zipped CSVs from Reuters have e-mail address in name
+    #big.files <- tmpfiles[grep(job.name, tmpfiles)]
+    # csv files will be everthing that is not in "ignore" below
+    # all these things we're ignoring should actually be in path.output, not here
+    ignore <- c(big.files, files.header, 'NA', "Report", 
+                tmpfiles[grep("Tick2Sec|TRTH_config_file", tmpfiles)],
+                tmpfiles[grep("\\.rda", tmpfiles)], 
+                tmpfiles[grep("\\.RData", tmpfiles)],
+                tmpfiles[grep("missing_instruments", tmpfiles)]               
+                )
+
+    files.csv <- tmpfiles[!tmpfiles %in% ignore]
+    .TRTH$files.csv <- files.csv
+    # files.header now has several identical rows. Delete all but first by extracting first row, and overwritting file with it
+    system(paste('head -1 "', files.header, '" > header.csv', sep="")) # extract 1st line
+    # head -1 "RIC.Date[G].csv" > header.csv
+    system(paste('mv header.csv "', files.header, '"', sep=""))        # replace files.header with only 1st line
+    # mv header.csv "RIC.Date[G].csv"
+
+    for (fl in files.csv) {
+        system(paste('cp "', files.header, '" ', paste(archive_dir, fl, sep=""), sep=""))
+        #cp "#RIC.Date[G].csv" /home/garrett/TRTH/archive/GEM1-U1.01-APR-2008.csv
+    }
+
+    for (j in 1:length(files.gz))
+    {   
+        filename.gz <- files.gz[j]
+        filename.csv <- substr(filename.gz,1,(nchar(filename.gz)-3))
+
+	    ## Split the Files 
+	    print(paste("Splitting ",filename.csv,sep=""))
+
+        # The following awk will put data in our CSV files which currently only have column headers;
+        #  Improved awk w/ file close to deal with 'too many open files', thanks to Josh Ulrich
+        system(paste('awk -v f2="" -F "," '," '",'{f1 = $1"."$2".csv";print >> f1; if(f1 != f2) { close(f2); f2=f1; } }',"' ",filename.csv, sep=""))
+        ## command line awkstring would look like this:
+        # awk -v f2="" -F ","  '{f1 = $1"."$2".csv"; print >> f1; if(f1 != f2) { close(f2); f2=f1; } }' sourcefile.csv
+        ## NOTE: if you get errors on 'too many open files' from awk, you'll need to adjust ulimit/nolimit
+	    print(paste('Done splitting ', filename.csv, sep=""))
+
+        # remove header file
+        invisible(file.remove(paste(archive_dir, files.header, sep="")))
+        # remove unzipped csv
+        invisible(file.remove(paste(archive_dir, filename.csv, sep="")))
+	    ## Zip the File
+        # print(paste("zipping ",filename.csv,sep=""))
+        # system(paste("gzip -f ",archive_dir,filename.csv,sep=""))
+    }	
+
+    # Move
+    #mv -vf paste(files) csv_dir
+
+    # Move split CSVs into csv_dir
+    files.xts <- NULL
+#    foreach (k = icount(length(files.csv))) %dopar%
+    for (k in 1:length(files.csv))
+    {
+        #print(k)
+        name.csv <- files.csv[k]                        # "ASBC.O.08-JAN-2011.csv"
+        #name <- unlist(strsplit(name.csv,".",fixed=TRUE))[1]
+        spl.name <- unlist(strsplit(name.csv, "\\."))   # "ASBC" "O" "08-JAN-2011" "csv" 
+        last2 <- (length(spl.name) - 1):length(spl.name)# 3 4
+        name <- paste(spl.name[-last2], collapse=".")   # "ASBC.O"
+        #RIC.date <- try(as.Date(unlist(strsplit(name.csv,".",fixed=TRUE))[2], format="%d-%b-%Y"))
+        RIC.date <- try(as.Date(spl.name[last2[1]], format="%d-%b-%Y"))
+        date.format <- gsub("-",".",RIC.date)
+
+        ## Handle leading digits and VIX and Cash
+        name.new <- if(substr(name,1,1)==1){
+            make.names(substr(name,2,nchar(name)))
+        } else make.names(name)
+
+        ## Create directory if it does not exist
+        dir.create(paste(csv_dir, date.format, "/", sep=""), showWarnings=FALSE, recursive=TRUE, mode='0775') #mode='0664'
+
+        ## Move files to appropriate place
+        #system(paste("mv -vf ", path.output,"Archives/",name.csv, " ", path.output,date.format,"/",date.format,".",name.new,".csv", sep=""))
+        system(paste("mv -f ", name.csv, " ", csv_dir, date.format, "/", date.format, ".", name.new, ".csv", sep=""))
+
+        print(paste(date.format, name.new, "moved", sep=" "))
+        files.xts <- rbind(files.xts,as.data.frame(cbind(name.new,date.format),stringsAsFactors=FALSE))
+    }
+    files.xts$type <- rep(NA, NROW(files.xts))
+
+    .TRTH$files.xts <- files.xts
+    assign('.TRTH', .TRTH, pos=.GlobalEnv)
     
-	## Carry last bid/offer forward
-	
-	Data$Bid.Price <- na.locf(Data$Bid.Price)
-	Data$Ask.Price <- na.locf(Data$Ask.Price)
-	
-	Data$Bid.Size <- na.locf(Data$Bid.Size)
-	Data$Ask.Size <- na.locf(Data$Ask.Size)
-	
-    ## Remove empty Trade rows (Volume of zero)   
-    Volume.remove <- which(Data$Volume == 0)
-	
-	if(length(c(Price.remove,Volume.remove))!=0)
-	{
[TRUNCATED]

To get the complete diff run:
    svnlook diff /svnroot/blotter -r 884


More information about the Blotter-commits mailing list