[Genabel-commits] r1617 - pkg/OmicABELnoMM/src

noreply at r-forge.r-project.org noreply at r-forge.r-project.org
Tue Feb 18 18:17:26 CET 2014


Author: afrank
Date: 2014-02-18 18:17:25 +0100 (Tue, 18 Feb 2014)
New Revision: 1617

Modified:
   pkg/OmicABELnoMM/src/AIOwrapper.cpp
   pkg/OmicABELnoMM/src/AIOwrapper.h
   pkg/OmicABELnoMM/src/Utility.cpp
Log:
Added Async IO support for output file of betas to improve performance.

Modified: pkg/OmicABELnoMM/src/AIOwrapper.cpp
===================================================================
--- pkg/OmicABELnoMM/src/AIOwrapper.cpp	2014-02-18 16:05:44 UTC (rev 1616)
+++ pkg/OmicABELnoMM/src/AIOwrapper.cpp	2014-02-18 17:17:25 UTC (rev 1617)
@@ -1,749 +1,917 @@
-#include "AIOwrapper.h"
+#include "AIOwrapper.h"
+
+
+
+AIOwrapper::AIOwrapper()
+{
+    Fhandler = &FHandler;
+    io_overhead = "*";
+}
+
+AIOwrapper::~AIOwrapper()
+{
+
+}
+
+
+void AIOwrapper::initialize(struct Settings &params)
+{
+
+
+    pthread_mutex_init( &(FHandler.m_more), NULL);
+    pthread_mutex_init( &(FHandler.m_read), NULL);
+    pthread_mutex_init( &(FHandler.m_buff_upd), NULL);
+
+    pthread_cond_init(&(FHandler.condition_more), NULL);
+    pthread_cond_init(&(FHandler.condition_read), NULL);
+
+    pthread_barrier_init(&(FHandler.finalize_barrier),NULL,2);
+
+
+    Fhandler->fakefiles = params.use_fake_files;
+
+
+    if(!Fhandler->fakefiles)
+    {
+        Fhandler->fnameAL = params.fnameAL;
+        Fhandler->fnameAR = params.fnameAR;
+        Fhandler->fnameY = params.fnameY;
+        Fhandler->fnameOutB = params.fnameOutB;
+
+        Yfvi  = load_databel_fvi( (Fhandler->fnameY+".fvi").c_str() );
+        ALfvi = load_databel_fvi( (Fhandler->fnameAL+".fvi").c_str() );
+        ARfvi = load_databel_fvi( (Fhandler->fnameAR+".fvi").c_str() );
+        params.n = ALfvi->fvi_header.numObservations;
+        params.m = ARfvi->fvi_header.numVariables/params.r;
+        params.t = Yfvi->fvi_header.numVariables;
+        params.l = ALfvi->fvi_header.numVariables;
+
+	int opt_tb = 1000;
+        int opt_mb = 1000;
+
+        params.mb = min(params.m, opt_tb);
+        params.tb = min(params.t, opt_mb);
+
+    }
+    else
+    {
+
+    }
+
+    params.p = params.l + params.r;
+
+
+    //block size to keep mem under 1 gigabyte
+//    int opt_block = params.n/(4*1000^3)*(1/(2*params.r));
+//    int opt_tb = max(4*2000,opt_block);
+//    int opt_mb = max(2000,opt_block);
+//
+//    params.mb = min(params.m,opt_tb);
+//    params.tb = min(params.t,opt_mb);
+
+    prepare_AL(params.l,params.n);
+    prepare_AR(  params.mb,  params.n,  params.m,  params.r);
+    prepare_B(params.tb, params.l+params.r);
+    prepare_Y(params.tb, params.n, params.t);
+
+
+
+
+
+
+}
+
+void AIOwrapper::finalize()
+{
+    //cout << "f";
+    //void *status;
+
+    Fhandler->not_done = false;
+    pthread_mutex_lock(&(Fhandler->m_more));
+    pthread_cond_signal( &(Fhandler->condition_more ));
+    pthread_mutex_unlock(&(Fhandler->m_more));
+
+    pthread_barrier_wait(&(Fhandler->finalize_barrier));
+    finalize_Y();
+    finalize_AR();
+    finalize_AL();
+    finalize_B();
+
+    pthread_attr_destroy(&(Fhandler->attr));
+
+    pthread_mutex_destroy(&(Fhandler->m_more));
+    pthread_cond_destroy(&(Fhandler->condition_more));
+
+    pthread_mutex_destroy(&(Fhandler->m_read));
+    pthread_cond_destroy(&(Fhandler->condition_read));
+
+
+}
+
+
+
+void AIOwrapper::finalize_B()
+{
+
+}
+
+
+void* AIOwrapper::async_io( void *ptr )
+{
+    cout << "async_io\n" << flush;
+    type_fileh* Fhandler = (type_fileh *)ptr;
+    int size_buff,tmp_y_blockSize,tmp_ar_blockSize;
+
+    struct timespec timeToWait;
+    FILE*  fp_Y;
+    FILE*  fp_B;
+    FILE*  fp_Ar;
+    if(!Fhandler->fakefiles)
+    {
+        fp_Y = fopen((Fhandler->fnameY+".fvd").c_str(), "rb");
+        if(fp_Y == 0)
+        {
+            cout << "Error Reading File Y " << Fhandler->fnameY << endl;
+            exit(1);
+        }
+
+        fp_Ar = fopen((Fhandler->fnameAR+".fvd").c_str(), "rb");
+        if(fp_Ar == 0)
+        {
+            cout << "Error Reading File Xr " << Fhandler->fnameAR << endl;
+            exit(1);
+        }
+
+        fp_B = fopen((Fhandler->fnameOutB+".fvd").c_str(), "w+b");
+        if(fp_B == 0)
+        {
+            cout << "Error Opening File B " << Fhandler->fnameOutB << endl;
+            exit(1);
+        }
+    }
+    else
+    {
+        //cout << "\nPreping files\n" << flush;
+        fp_Y = fopen("tempY.bin", "rwb");
+        if(fp_Y == 0)
+        {
+            cout << "Error creating temp File Y " << endl;
+            exit(1);
+        }
+        type_precision* tempbuff1 = new type_precision[Fhandler->n*Fhandler->y_blockSize];
+        fwrite(tempbuff1, sizeof(type_precision), Fhandler->n*Fhandler->y_blockSize, fp_Y);
+        //fclose(fp_Y);
+        delete []tempbuff1;
+
+
+        fp_Ar = fopen("tempAR.bin", "rwb");
+        if(fp_Ar == 0)
+        {
+            cout << "Error creating temp File AR "  << endl;
+            exit(1);
+        }
+        type_precision* tempbuff2 = new type_precision[Fhandler->n*Fhandler->Ar_blockSize];
+        fwrite(tempbuff2, sizeof(type_precision), Fhandler->n*Fhandler->Ar_blockSize, fp_Ar);
+        //fclose(fp_Ar);
+        delete []tempbuff2;
+
+        fp_B = fopen("tempB.bin", "wb");
+        if(fp_B == 0)
+        {
+            cout << "Error setting up temp File B " << endl;
+            exit(1);
+        }
+        //cout << "\nEnd preping files\n" << flush;
+
+    }
+
+
+    Fhandler->not_done = true;
+    Fhandler->reset_wait = false;
+
+    while(Fhandler->not_done)
+    {
+
+        while(!Fhandler->empty_buffers.empty() && Fhandler->y_to_readSize)
+        {
+
+            tmp_y_blockSize = Fhandler->y_blockSize;
+            if(Fhandler->y_to_readSize < Fhandler->y_blockSize)
+                tmp_y_blockSize = Fhandler->y_to_readSize;
+
+            Fhandler->y_to_readSize -= tmp_y_blockSize;
+            size_buff = Fhandler->n * tmp_y_blockSize;
+            //cout << Fhandler->y_to_readSize << endl;
+
+            pthread_mutex_lock(&(Fhandler->m_buff_upd));
+            //cout << " pre;" << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
+            type_buffElement* tobeFilled = Fhandler->empty_buffers.front();
+            Fhandler->empty_buffers.pop();
+            //pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+            tobeFilled->size = tmp_y_blockSize;
+
+            if(Fhandler->fakefiles)
+            {
+                fseek ( fp_Y , 0 , SEEK_SET );
+                size_t result = fread (tobeFilled->buff,sizeof(type_precision),size_buff,fp_Y);
+                result++;
+                int old_seed = Fhandler->seed;
+                srand (old_seed);
+                re_random_vec(tobeFilled->buff, size_buff );
+                re_random_vec_nan(tobeFilled->buff, size_buff );
+                Fhandler->seed += 75;
+            }
+            else
+            {
+                size_t result = fread (tobeFilled->buff,sizeof(type_precision),size_buff,fp_Y);
+                result++;
+                if(Fhandler->y_to_readSize <= 0)
+                {
+                    fseek ( fp_Y , 0 , SEEK_SET );
+                }
+            }
+
+
+            //pthread_mutex_lock(&(Fhandler->m_buff_upd));
+            Fhandler->full_buffers.push(tobeFilled);
+            //  cout << "\nStoring " << tobeFilled << endl;
+            //cout << " post;" << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
+            pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+            pthread_mutex_lock(&(Fhandler->m_read));
+            pthread_cond_signal( &(Fhandler->condition_read ));
+            pthread_mutex_unlock(&(Fhandler->m_read));
+
+        }
+
+        while(!Fhandler->ar_empty_buffers.empty() && Fhandler->Ar_to_readSize)
+        {
+            tmp_ar_blockSize = Fhandler->Ar_blockSize;
+            if(Fhandler->Ar_to_readSize < Fhandler->Ar_blockSize)
+                tmp_ar_blockSize = Fhandler->Ar_to_readSize;
+
+            Fhandler->Ar_to_readSize -= tmp_ar_blockSize;
+            size_buff = Fhandler->n * tmp_ar_blockSize*Fhandler->r;
+
+            pthread_mutex_lock(&(Fhandler->m_buff_upd));
+            type_buffElement* tobeFilled = Fhandler->ar_empty_buffers.front();
+            Fhandler->ar_empty_buffers.pop();
+
+
+            tobeFilled->size = tmp_ar_blockSize;
+
+            if(Fhandler->fakefiles)
+            {
+                fseek ( fp_Ar , 0 , SEEK_SET );
+                size_t result = fread (tobeFilled->buff,sizeof(type_precision),size_buff,fp_Ar);
+                result++;
+
+                re_random_vec(tobeFilled->buff , Fhandler->n * tmp_ar_blockSize*Fhandler->r );
+                re_random_vec_nan(tobeFilled->buff , Fhandler->n * tmp_ar_blockSize*Fhandler->r );
+
+            }
+            else
+            {
+                size_t result = fread (tobeFilled->buff,sizeof(type_precision),size_buff,fp_Ar);
+                result++;
+                if (Fhandler->Ar_to_readSize <= 0)
+                {
+                    fseek ( fp_Ar , 0 , SEEK_SET );
+                }
+            }
+
+            Fhandler->ar_full_buffers.push(tobeFilled);
+            //  cout << "\nStoring " << tobeFilled << endl;
+            pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+            pthread_mutex_lock(&(Fhandler->m_read));
+            pthread_cond_signal( &(Fhandler->condition_read ));
+            pthread_mutex_unlock(&(Fhandler->m_read));
+
+        }
+        //B write
+
+        while(!Fhandler->b_full_buffers.empty())
+        {
+
+
+            pthread_mutex_lock(&(Fhandler->m_buff_upd));
+            type_buffElement* tobeWritten = Fhandler->b_full_buffers.front();
+            Fhandler->b_full_buffers.pop();
+            int size = Fhandler->p*Fhandler->b_blockSize;
+
+            if(Fhandler->fakefiles)
+            {
+                fseek ( fp_B , 0 , SEEK_SET );
+	  }
+		fwrite (tobeWritten->buff,sizeof(type_precision),size,fp_B);
+
+
+            Fhandler->b_empty_buffers.push(tobeWritten);
+            //  cout << "\nStoring " << tobeWritten << endl;
+            pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+            pthread_mutex_lock(&(Fhandler->m_read));
+            pthread_cond_signal( &(Fhandler->condition_read ));
+            pthread_mutex_unlock(&(Fhandler->m_read));
+
+        }
+
+
+
+
+#ifdef WINDOWS
+        SYSTEMTIME time;
+        GetSystemTime(&time);
+
+        timeToWait.tv_sec = time.wSecond + 500/1000;
+        long int morenanos = (500%1000)*1000000;
+        timeToWait.tv_nsec = time.wMilliseconds*1000 + morenanos ;
+#else
+        clock_gettime(CLOCK_REALTIME, &timeToWait);
+        timeToWait.tv_nsec += 150;
+#endif
+
+        pthread_mutex_lock(&(Fhandler->m_more));
+        pthread_cond_timedwait( &(Fhandler->condition_more), &(Fhandler->m_more), &timeToWait );
+        pthread_mutex_unlock( &(Fhandler->m_more ));
+
+        pthread_mutex_lock(&(Fhandler->m_read));
+        pthread_cond_signal( &(Fhandler->condition_read ));
+        pthread_mutex_unlock(&(Fhandler->m_read));
+
+        if(Fhandler->reset_wait)
+        {
+            pthread_barrier_wait(&(Fhandler->finalize_barrier));
+            //wait for main thread to reset everything
+            pthread_barrier_wait(&(Fhandler->finalize_barrier));
+        }
+
+
+    }
+    //cout << "k" << flush;
+    //barrier
+    pthread_barrier_wait(&(Fhandler->finalize_barrier));
+
+    type_buffElement* tmp;
+    while(!Fhandler->full_buffers.empty())
+    {
+       tmp= Fhandler->full_buffers.front();
+       Fhandler->full_buffers.pop();
+       delete []tmp->buff;
+       delete tmp;
+    }
+
+    while(!Fhandler->empty_buffers.empty())
+    {
+       tmp= Fhandler->empty_buffers.front();
+       Fhandler->empty_buffers.pop();
+       delete []tmp->buff;
+       delete tmp;
+    }
+
+    while(!Fhandler->ar_full_buffers.empty())
+    {
+       tmp= Fhandler->ar_full_buffers.front();
+       Fhandler->ar_full_buffers.pop();
+       delete []tmp->buff;
+       delete tmp;
+    }
+
+    while(!Fhandler->ar_empty_buffers.empty())
+    {
+       tmp= Fhandler->ar_empty_buffers.front();
+       Fhandler->ar_empty_buffers.pop();
+       delete []tmp->buff;
+       delete tmp;
+    }
+
+    while(!Fhandler->b_full_buffers.empty())
+    {
+       tmp= Fhandler->b_full_buffers.front();
+       Fhandler->b_full_buffers.pop();
+       delete []tmp->buff;
+       delete tmp;
+    }
+
+    while(!Fhandler->b_empty_buffers.empty())
+    {
+       tmp= Fhandler->b_empty_buffers.front();
+       Fhandler->b_empty_buffers.pop();
+       delete []tmp->buff;
+       delete tmp;
+    }
+
+
+    //
+
+    //pthread_exit(NULL);
+        fclose(fp_Y);
+        fclose(fp_Ar);
+        fclose(fp_B);
 
-
-
-AIOwrapper::AIOwrapper()
-{
-    Fhandler = &FHandler;
-    io_overhead = "*";
-}
-
-
-AIOwrapper::~AIOwrapper()
-{
-
-}
-
-
-void AIOwrapper::initialize(struct Settings &params)
-{
-    pthread_mutex_init( &(FHandler.m_more), NULL);
-    pthread_mutex_init( &(FHandler.m_read), NULL);
-    pthread_mutex_init( &(FHandler.m_buff_upd), NULL);
-
-    pthread_cond_init(&(FHandler.condition_more), NULL);
-    pthread_cond_init(&(FHandler.condition_read), NULL);
-
-    pthread_barrier_init(&(FHandler.finalize_barrier), NULL, 2);
-
-
-    Fhandler->fakefiles = params.use_fake_files;
-
-
-
-    if (!Fhandler->fakefiles)
-    {
-        Fhandler->fnameAL   = params.fnameAL;
-        Fhandler->fnameAR   = params.fnameAR;
-        Fhandler->fnameY    = params.fnameY;
-        Fhandler->fnameOutB = params.fnameOutB;
-
-        Yfvi  = load_databel_fvi( (Fhandler->fnameY+".fvi").c_str() );
-        ALfvi = load_databel_fvi( (Fhandler->fnameAL+".fvi").c_str() );
-        ARfvi = load_databel_fvi( (Fhandler->fnameAR+".fvi").c_str() );
-        params.n = ALfvi->fvi_header.numObservations;
-        params.m = ARfvi->fvi_header.numVariables / params.r;
-        params.t = Yfvi->fvi_header.numVariables;
-        params.l = ALfvi->fvi_header.numVariables;
-
-        //block size to keep mem under 1 gigabyte
-        //int opt_block = params.n / (4*1000^3) * (1/(2*params.r));
-        int opt_tb = 1000;
-        int opt_mb = 1000;
-
-        params.mb = min(params.m, opt_tb);
-        params.tb = min(params.t, opt_mb);
-    }
-    else
-    {
-    }
-
-    params.p = params.l + params.r;
-
-
-    //block size to keep mem under 1 gigabyte
-//    int opt_block = params.n/(4*1000^3)*(1/(2*params.r));
-//    int opt_tb = max(4*2000, opt_block);
-//    int opt_mb = max(2000, opt_block);
-//
-//    params.mb = min(params.m, opt_tb);
-//    params.tb = min(params.t, opt_mb);
-
-    prepare_B();
-    prepare_AL(params.l, params.n);
-    prepare_AR(params.mb, params.n, params.m, params.r);
-    prepare_Y(params.tb, params.n, params.t);
-}
-
-
-void AIOwrapper::finalize()
-{
-    //cout << "f";
-    //void *status;
-
-    Fhandler->not_done = false;
-    pthread_mutex_lock(&(Fhandler->m_more));
-    pthread_cond_signal( &(Fhandler->condition_more ));
-    pthread_mutex_unlock(&(Fhandler->m_more));
-
-    pthread_barrier_wait(&(Fhandler->finalize_barrier));
-    finalize_Y();
-    finalize_AR();
-    finalize_AL();
-    finalize_B();
-
-    delete Yfvi;
-    delete ALfvi;
-    delete ARfvi;
-
-    pthread_attr_destroy(&(Fhandler->attr));
-
-    pthread_mutex_destroy(&(Fhandler->m_more));
-    pthread_cond_destroy(&(Fhandler->condition_more));
-
-    pthread_mutex_destroy(&(Fhandler->m_read));
-    pthread_cond_destroy(&(Fhandler->condition_read));
-}
-
-
-void AIOwrapper::prepare_B()
-{
-    Fhandler->fp_B = fopen((Fhandler->fnameOutB + ".fvd").c_str(), "w+b");
-    if (Fhandler->fp_B == 0)
-    {
-        cout << "Error Opening File B " << Fhandler->fnameOutB << endl;
-        exit(1);
-    }
-}
-
-
-void AIOwrapper::finalize_B()
-{
-    fclose(Fhandler->fp_B);
-}
-
-
-void* AIOwrapper::async_io( void *ptr )
-{
-    //cout << "async_io\n" << flush;
-    type_fileh* Fhandler = (type_fileh *)ptr;
-    int size_buff, tmp_y_blockSize, tmp_ar_blockSize;
-
-    struct timespec timeToWait;
-    FILE*  fp_Y;
-
-    FILE*  fp_Ar;
-    if (!Fhandler->fakefiles)
-    {
-        fp_Y = fopen((Fhandler->fnameY+".fvd").c_str(), "rb");
-        if (fp_Y == 0)
-        {
-            cout << "Error Reading File Y " << Fhandler->fnameY << endl;
-            exit(1);
-        }
-
-        fp_Ar = fopen((Fhandler->fnameAR+".fvd").c_str(), "rb");
-        if (fp_Ar == 0)
-        {
-            cout << "Error Reading File Xr " << Fhandler->fnameAR << endl;
-            exit(1);
-        }
-    }
-    else
-    {
-//        fp_Y = fopen("tempY.bin", "w+b");
-//        if (fp_Y == 0)
-//        {
-//            cout << "Error creating temp File Y " << Fhandler->fnameY << endl;
-//            exit(1);
-//        }
-//        fwrite(Fhandler->Yb, sizeof(type_precision), Fhandler->n*Fhandler->y_blockSize, fp_Y);
-//        fclose(fp_Y);
-//
-//
-//        fp_Ar = fopen("tempAR.bin", "w+b");
-//        if (fp_Ar == 0)
-//        {
-//            cout << "Error creating temp File Y " << Fhandler->fnameY << endl;
-//            exit(1);
-//        }
-//        fwrite(Fhandler->Yb, sizeof(type_precision), Fhandler->n*Fhandler->y_blockSize, fp_Ar);
-//        fclose(fp_Ar);
-//
-//        fp_Y = fopen("tempY.bin", "rb");
-//        if (fp_Y == 0)
-//        {
-//            cout << "Error Reading File Y " << Fhandler->fnameY << endl;
-//            exit(1);
-//        }
-//
-//        fp_Ar = fopen("tempAR.bin", "rb");
-//        if (fp_Ar == 0)
-//        {
-//            cout << "Error Reading File Xr " << Fhandler->fnameAR << endl;
-//            exit(1);
-//        }
-
-    }
-
-
-    Fhandler->not_done = true;
-    Fhandler->reset_wait = false;
-
-    //cout << "c" << flush;
-
-    while (Fhandler->not_done)
-    {
-        while (!Fhandler->empty_buffers.empty() && Fhandler->y_to_readSize)
-        {
-            tmp_y_blockSize = Fhandler->y_blockSize;
-            if (Fhandler->y_to_readSize < Fhandler->y_blockSize)
-                tmp_y_blockSize = Fhandler->y_to_readSize;
-
-            Fhandler->y_to_readSize -= tmp_y_blockSize;
-            size_buff = Fhandler->n * tmp_y_blockSize;
-            //cout << Fhandler->y_to_readSize << endl;
-
-            pthread_mutex_lock(&(Fhandler->m_buff_upd));
-            //cout << " pre;" << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
-            type_buffElement* tobeFilled = Fhandler->empty_buffers.front();
-            Fhandler->empty_buffers.pop();
-            //pthread_mutex_unlock(&(Fhandler->m_buff_upd));
-
-            tobeFilled->size = tmp_y_blockSize;
-
-            if (Fhandler->fakefiles)
-            {
-                //fread (tobeFilled->buff, sizeof(type_precision), size, fp_Y);
-                //fseek ( fp_Y , 0 , SEEK_SET );
-                int old_seed = Fhandler->seed;
-                srand (old_seed);
-                re_random_vec(tobeFilled->buff, size_buff );
-                re_random_vec_nan(tobeFilled->buff, size_buff );
-                Fhandler->seed += 75;
-            }
-            else
-            {
-                fread (tobeFilled->buff, sizeof(type_precision), size_buff, fp_Y);
-                if (Fhandler->y_to_readSize <= 0)
-                {
-                    fseek ( fp_Y , 0 , SEEK_SET );
-                }
-            }
-
-
-            //pthread_mutex_lock(&(Fhandler->m_buff_upd));
-            Fhandler->full_buffers.push(tobeFilled);
-            //  cout << "\nStoring " << tobeFilled << endl;
-            //cout << " post;" << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
-            pthread_mutex_unlock(&(Fhandler->m_buff_upd));
-
-            pthread_mutex_lock(&(Fhandler->m_read));
-            pthread_cond_signal( &(Fhandler->condition_read ));
-            pthread_mutex_unlock(&(Fhandler->m_read));
-        }
-
-        while (!Fhandler->ar_empty_buffers.empty() && Fhandler->Ar_to_readSize)
-        {
-            tmp_ar_blockSize = Fhandler->Ar_blockSize;
-            if (Fhandler->Ar_to_readSize < Fhandler->Ar_blockSize)
-                tmp_ar_blockSize = Fhandler->Ar_to_readSize;
-
-            Fhandler->Ar_to_readSize -= tmp_ar_blockSize;
-            size_buff = Fhandler->n * tmp_ar_blockSize*Fhandler->r;
-
-            pthread_mutex_lock(&(Fhandler->m_buff_upd));
-            type_buffElement* tobeFilled = Fhandler->ar_empty_buffers.front();
-            Fhandler->ar_empty_buffers.pop();
-
-
-            tobeFilled->size = tmp_ar_blockSize;
-
-            if (Fhandler->fakefiles)
-            {
-                //fread (tobeFilled->buff, sizeof(type_precision), size, fp_Ar);
-                //fseek ( fp_Ar , 0 , SEEK_SET );
-                re_random_vec(tobeFilled->buff,
-                              Fhandler->n * tmp_ar_blockSize*Fhandler->r );
-                re_random_vec_nan(tobeFilled->buff,
-                                  Fhandler->n * tmp_ar_blockSize*Fhandler->r );
-            }
-            else
-            {
-                fread (tobeFilled->buff, sizeof(type_precision), size_buff, fp_Ar);
-                if (Fhandler->Ar_to_readSize <= 0)
-                {
-                    fseek ( fp_Ar , 0 , SEEK_SET );
-                }
-            }
-
-            Fhandler->ar_full_buffers.push(tobeFilled);
-            //  cout << "\nStoring " << tobeFilled << endl;
-            pthread_mutex_unlock(&(Fhandler->m_buff_upd));
-
-            pthread_mutex_lock(&(Fhandler->m_read));
-            pthread_cond_signal( &(Fhandler->condition_read ));
-            pthread_mutex_unlock(&(Fhandler->m_read));
-        }
-
-
-
-
-#ifdef WINDOWS
-        SYSTEMTIME time;
-        GetSystemTime(&time);
-
-        timeToWait.tv_sec = time.wSecond + 500/1000;
-        long int morenanos = (500%1000) * 1000000;
-        timeToWait.tv_nsec = time.wMilliseconds * 1000 + morenanos ;
-#else
-        clock_gettime(CLOCK_REALTIME, &timeToWait);
-        timeToWait.tv_nsec += 150;
-#endif
-
-        pthread_mutex_lock(&(Fhandler->m_more));
-        pthread_cond_timedwait( &(Fhandler->condition_more),
-                                &(Fhandler->m_more), &timeToWait );
-        pthread_mutex_unlock( &(Fhandler->m_more ));
-
-        pthread_mutex_lock( &(Fhandler->m_read) );
-        pthread_cond_signal( &(Fhandler->condition_read ));
-        pthread_mutex_unlock( &(Fhandler->m_read) );
-
-        if (Fhandler->reset_wait)
-        {
-            pthread_barrier_wait(&(Fhandler->finalize_barrier));
-            //wait for main thread to reset everything
-            pthread_barrier_wait(&(Fhandler->finalize_barrier));
-        }
-    }
-    //cout << "k" << flush;
-    //barrier
-    pthread_barrier_wait(&(Fhandler->finalize_barrier));
-
-    type_buffElement* tmp;
-    while (!Fhandler->full_buffers.empty())
-    {
-       tmp = Fhandler->full_buffers.front();
-       Fhandler->full_buffers.pop();
-       delete[] tmp->buff;
-       delete tmp;
-    }
-
-    while (!Fhandler->empty_buffers.empty())
-    {
-       tmp = Fhandler->empty_buffers.front();
-       Fhandler->empty_buffers.pop();
-       delete[] tmp->buff;
-       delete tmp;
-    }
-
-    while (!Fhandler->ar_full_buffers.empty())
-    {
-       tmp = Fhandler->ar_full_buffers.front();
-       Fhandler->ar_full_buffers.pop();
-       delete tmp->buff;
-       delete tmp;
-    }
-
-    while (!Fhandler->ar_empty_buffers.empty())
-    {
-       tmp = Fhandler->ar_empty_buffers.front();
-       Fhandler->ar_empty_buffers.pop();
-       delete[] tmp->buff;
-       delete tmp;
-    }
-
-    //
-
-    //pthread_exit(NULL);
-
-    if (!Fhandler->fakefiles)
-    {
-        fclose(fp_Y);
-        fclose(fp_Ar);
-    }
-    else
-    {
-        fclose(fp_Y);
-        fclose(fp_Ar);
-    }
-//
-//            //!induce realistic fileread delay
-
-    return 0;
-}
-
-
-void AIOwrapper::load_ARblock(type_precision** Ar, int &Ar_blockSize)
-{
-    //int status;
-    //int createstatus = 0;
-    //cout<<"^";
-
-    while (Fhandler->ar_full_buffers.empty())
-    {
-        pthread_mutex_lock(&(Fhandler->m_more));
-        pthread_cond_signal( &(Fhandler->condition_more ));
-        pthread_mutex_unlock(&(Fhandler->m_more));
-
-        io_overhead = "#";
-
-        pthread_mutex_lock(&(Fhandler->m_read));
-        pthread_cond_wait( &(Fhandler->condition_read), &(Fhandler->m_read ));
-        pthread_mutex_unlock(&(Fhandler->m_read));
-    }
-
-
-    //!read new rdy buffer
-    pthread_mutex_lock(&(Fhandler->m_buff_upd));
-    if (Fhandler->Ar_currentReadBuff)
-    {
-        Fhandler->ar_empty_buffers.push(Fhandler->Ar_currentReadBuff);
-    }
-
-    Fhandler->Ar_currentReadBuff = Fhandler->ar_full_buffers.front();
-    Fhandler->ar_full_buffers.pop();
-
-    //cout << "\nReading " << Fhandler->Ar_currentReadBuff << endl;
-    Fhandler->Ar = Fhandler->Ar_currentReadBuff->buff;
-    Ar_blockSize = Fhandler->Ar_currentReadBuff->size;
-
-    pthread_mutex_unlock(&(Fhandler->m_buff_upd));
-
-    pthread_mutex_lock(&(Fhandler->m_more));
-    pthread_cond_signal( &(Fhandler->condition_more ));
-    pthread_mutex_unlock(&(Fhandler->m_more));
-
-
-//secueantial
-//    int tmp_ar_blockSize = Fhandler->Ar_blockSize;
-//    if (Fhandler->Ar_to_readSize < Fhandler->Ar_blockSize)
-//        tmp_ar_blockSize = Fhandler->Ar_to_readSize;
-//
-//    Fhandler->Ar_to_readSize -= tmp_ar_blockSize;
-//    int size_buff = Fhandler->n * tmp_ar_blockSize * Fhandler->r;
-//    re_random_vec(Fhandler->Ar, size_buff);
-//Ar_blockSize=tmp_ar_blockSize;
-
-    (*Ar) = Fhandler->Ar;
-}
-
-
-void AIOwrapper::load_Yblock(type_precision** Y, int &y_blockSize)
-{
-    //int status;
-    //int createstatus = 0;
-
-    while (Fhandler->full_buffers.empty())
-    {
-        pthread_mutex_lock(&(Fhandler->m_more));
-        pthread_cond_signal( &(Fhandler->condition_more ));
-        pthread_mutex_unlock(&(Fhandler->m_more));
-
-        io_overhead = "!";
-
-        pthread_mutex_lock(&(Fhandler->m_read));
-        pthread_cond_wait( &(Fhandler->condition_read), &(Fhandler->m_read ));
-        pthread_mutex_unlock(&(Fhandler->m_read));
-    }
-
-    //!read new rdy buffer
-    pthread_mutex_lock(&(Fhandler->m_buff_upd));
-    //cout << " pre," << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
-
-    if (Fhandler->currentReadBuff)
-    {
-        //memset(Fhandler->currentReadBuff->buff, 0, y_blockSize);
-        Fhandler->empty_buffers.push(Fhandler->currentReadBuff);
-    }
-    Fhandler->currentReadBuff = Fhandler->full_buffers.front();
-    Fhandler->full_buffers.pop();
-
-    //cout << "\nReading " << Fhandler->currentReadBuff << endl;
-    Fhandler->Yb = Fhandler->currentReadBuff->buff;
-    y_blockSize = Fhandler->currentReadBuff->size;
-
-    (*Y) = Fhandler->Yb;
-
-     //cout << " post," << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
-
-    pthread_mutex_unlock(&(Fhandler->m_buff_upd));
-
-    pthread_mutex_lock(&(Fhandler->m_more));
-    pthread_cond_signal( &(Fhandler->condition_more ));
-    pthread_mutex_unlock(&(Fhandler->m_more));
-
-    //matlab_print_matrix("Y", Fhandler->n, y_blockSize,*Y);
-}
-
-
-void AIOwrapper::write_B(type_precision* B, int p, int blockSize)
-{
-    fwrite(B , sizeof(type_precision), p*blockSize, Fhandler->fp_B);
-}
-
-
-void AIOwrapper::prepare_Y(int y_blockSize, int n, int totalY)
-{
-    //for fake files
-
-    Fhandler->seed = 1337;
-    srand(Fhandler->seed);
-
-    Fhandler->y_blockSize = y_blockSize;
-
-    Fhandler->n = n;
-    Fhandler->Y_Amount = totalY;
-    Fhandler->y_to_readSize = Fhandler->Y_Amount;
-    Fhandler->buff_count = min(2,(totalY + y_blockSize - 1)/y_blockSize) ;
-    //cout << "buffcount " << Fhandler->buff_count;
-
-
-    Fhandler->currentReadBuff = 0;
-    type_buffElement* tmp;
-
-    for (int i = 0; i < Fhandler->buff_count; i++)
-    {
-        tmp = new type_buffElement();
-        tmp->buff = new type_precision[Fhandler->n * Fhandler->y_blockSize];
-        tmp->size = y_blockSize;
-//        for ( int i = 0; i < Fhandler->n*Fhandler->y_blockSize; i++)
-//        {
-//            (tmp->buff)[i] = 0;
-//        }
+        return 0;
+
+//
+//            //!induce realistic fileread delay
+
+}
+
+void AIOwrapper::load_ARblock(type_precision** Ar, int &Ar_blockSize)
+{
+
+    //int status;
+   // int createstatus = 0;
+    //cout<<"^";
+
+    while(Fhandler->ar_full_buffers.empty())
+    {
+        pthread_mutex_lock(&(Fhandler->m_more));
+        pthread_cond_signal( &(Fhandler->condition_more ));
+        pthread_mutex_unlock(&(Fhandler->m_more));
+
+        io_overhead = "#";
+
+        pthread_mutex_lock(&(Fhandler->m_read));
+        pthread_cond_wait( &(Fhandler->condition_read), &(Fhandler->m_read ));
+        pthread_mutex_unlock(&(Fhandler->m_read));
+    }
+
+
+    //!read new rdy buffer
+    pthread_mutex_lock(&(Fhandler->m_buff_upd));
+        if(Fhandler->Ar_currentReadBuff)
+        {
+            Fhandler->ar_empty_buffers.push(Fhandler->Ar_currentReadBuff);
+        }
+
+        Fhandler->Ar_currentReadBuff = Fhandler->ar_full_buffers.front();
+        Fhandler->ar_full_buffers.pop();
+
+    //cout << "\nReading " << Fhandler->Ar_currentReadBuff << endl;
+    Fhandler->Ar = Fhandler->Ar_currentReadBuff->buff;
+    Ar_blockSize = Fhandler->Ar_currentReadBuff->size;
+
+    pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+
+
+    pthread_mutex_lock(&(Fhandler->m_more));
+    pthread_cond_signal( &(Fhandler->condition_more ));
+    pthread_mutex_unlock(&(Fhandler->m_more));
+
+
+
+//secueantial
+//    int tmp_ar_blockSize = Fhandler->Ar_blockSize;
+//    if(Fhandler->Ar_to_readSize < Fhandler->Ar_blockSize)
+//        tmp_ar_blockSize = Fhandler->Ar_to_readSize;
+//
+//    Fhandler->Ar_to_readSize -= tmp_ar_blockSize;
+//    int size_buff = Fhandler->n * tmp_ar_blockSize * Fhandler->r;
+//    re_random_vec(Fhandler->Ar,size_buff);
+//Ar_blockSize=tmp_ar_blockSize;
+
+
+    (*Ar) = Fhandler->Ar;
+
+
+
+
+
+
+}
+
+void AIOwrapper::load_Yblock(type_precision** Y, int &y_blockSize)
+{
+
+    //int status;
+    //int createstatus = 0;
+
+    while(Fhandler->full_buffers.empty())
+    {
+
+        pthread_mutex_lock(&(Fhandler->m_more));
+        pthread_cond_signal( &(Fhandler->condition_more ));
+        pthread_mutex_unlock(&(Fhandler->m_more));
+
+        io_overhead = "!";
+
+        pthread_mutex_lock(&(Fhandler->m_read));
+        pthread_cond_wait( &(Fhandler->condition_read), &(Fhandler->m_read ));
+        pthread_mutex_unlock(&(Fhandler->m_read));
+
+    }
+
+    //!read new rdy buffer
+    pthread_mutex_lock(&(Fhandler->m_buff_upd));
+    //cout << " pre," << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
+
+        if(Fhandler->currentReadBuff)
+        {
+            //memset(Fhandler->currentReadBuff->buff,0,y_blockSize);
+            Fhandler->empty_buffers.push(Fhandler->currentReadBuff);
+        }
+        Fhandler->currentReadBuff = Fhandler->full_buffers.front();
+        Fhandler->full_buffers.pop();
+
+    //cout << "\nReading " << Fhandler->currentReadBuff << endl;
+    Fhandler->Yb = Fhandler->currentReadBuff->buff;
+    y_blockSize = Fhandler->currentReadBuff->size;
+
+    (*Y) = Fhandler->Yb;
+
+     //cout << " post," << Fhandler->full_buffers.size() << ";" << Fhandler->empty_buffers.size() << endl;
+
+    pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+
+
+    pthread_mutex_lock(&(Fhandler->m_more));
+    pthread_cond_signal( &(Fhandler->condition_more ));
+    pthread_mutex_unlock(&(Fhandler->m_more));
+
+    //matlab_print_matrix("Y",Fhandler->n,y_blockSize,*Y);
+
+
+}
+
+void AIOwrapper::write_B(type_precision* B, int p, int blockSize)
+{
+     //int status;
+    //int createstatus = 0;
+   // cout << " b: full" << Fhandler->b_full_buffers.size() << "; epty" << Fhandler->b_empty_buffers.size() << endl;
+
+    while(Fhandler->b_empty_buffers.empty())
+    {
+
+        pthread_mutex_lock(&(Fhandler->m_more));
+        pthread_cond_signal( &(Fhandler->condition_more ));
+        pthread_mutex_unlock(&(Fhandler->m_more));
+
+        io_overhead = "b";
+
+        pthread_mutex_lock(&(Fhandler->m_read));
+        pthread_cond_wait( &(Fhandler->condition_read), &(Fhandler->m_read ));
+        pthread_mutex_unlock(&(Fhandler->m_read));
+
+    }
+
+
+    pthread_mutex_lock(&(Fhandler->m_buff_upd));
+
+
+
+
+        Fhandler->currentWriteBuff = Fhandler->b_empty_buffers.front();
+        Fhandler->b_empty_buffers.pop();
+
+
+
+    Fhandler->B = Fhandler->currentWriteBuff->buff;
+    Fhandler->b_blockSize = blockSize;
+    copy_vec(B,Fhandler->B,p*blockSize);
+
+    Fhandler->b_full_buffers.push(Fhandler->currentWriteBuff);
+
+
+
+    // cout << " b: full" << Fhandler->b_full_buffers.size() << "; epty" << Fhandler->b_empty_buffers.size() << endl;
+
+    pthread_mutex_unlock(&(Fhandler->m_buff_upd));
+
+
+
+    pthread_mutex_lock(&(Fhandler->m_more));
+    pthread_cond_signal( &(Fhandler->condition_more ));
+    pthread_mutex_unlock(&(Fhandler->m_more));
+}
+
+void AIOwrapper::prepare_Y(int y_blockSize, int n, int totalY)
+{
+    //for fake files
+
+    Fhandler->seed = 1337;
+    srand (Fhandler->seed);
+
+    Fhandler->y_blockSize = y_blockSize;
+
+    Fhandler->n= n;
+    Fhandler->Y_Amount=totalY;
+    Fhandler->y_to_readSize = Fhandler->Y_Amount;
+    Fhandler->buff_count = min(3,(totalY+ y_blockSize - 1)/y_blockSize) ;
+    //cout << "buffcount " << Fhandler->buff_count;
+
+
+    Fhandler->currentReadBuff = 0;
+    type_buffElement* tmp;
+
+    for(int i = 0; i< Fhandler->buff_count  ; i++)
+    {
+        tmp = new type_buffElement();
+        tmp->buff = new type_precision[Fhandler->n*Fhandler->y_blockSize];
+        tmp->size = y_blockSize;
+//        for( int i = 0; i < Fhandler->n*Fhandler->y_blockSize; i++)
+//        {
+//            (tmp->buff)[i] = 0;
+//        }
         Fhandler->empty_buffers.push(tmp);
-    }
-    Fhandler->Yb = tmp->buff;
-
-
-
-    pthread_mutex_init(&(Fhandler->m_buff_upd), NULL);
-    pthread_mutex_init(&(Fhandler->m_more), NULL);
[TRUNCATED]

To get the complete diff run:
    svnlook diff /svnroot/genabel -r 1617


More information about the Genabel-commits mailing list