Processing Large Data

Jun 27, 2013 at 7:45 PM
Edited Jun 28, 2013 at 9:05 PM
Hi folks. I'm very new to the world of Cudafy, but really enjoying my experience with it so far.

I am looking for a bit of direction as I've failed (miserably) to find an example which tackles processing large data arrays on the GPU using OpenCL and CUDA.

What I'm aiming to achieve is to implement a Boyer-Moore byte comparison to scan through extremely large files (>5GB) in 300MB segments and return back a number of matches.

My current implementation will only scan through the first MB (1024 bytes) of each 300MB segment. Is there a way to scan through all of the 300MB byte array reliably and quickly?

Code:
 public void DDCommand()
        {
               String path = txtFile.Text;                         //Target File (from interface)
               String searchTarget = "FFD8FFE0";            // Target string
               Byte[] target = GetBytes(searchTarget);     // Convert target into byte array
               uint[] results = new uint[1];                       // Number of Hits
               results[0] = 0;

                int chunkSize = 314572800;                      // 300MB Chunk Size
                byte[] buffer = new byte[chunkSize];         // Buffer Creation
                double count = 0;                                   // Number of bytes read
                int chunkCount = 0;                                 // Number of chunks processed

                int[] lookup = new int[256];                      // Lookup (required for Boyer-Moore algorithm)
                for (int i = 0; i < lookup.Length; i++)
                {
                    lookup[i] = target.Length;
                }

                for (int i = 0; i < target.Length; i++)
                {
                    lookup[target[i]] = target.Length - i - 1;
                }

                int lastByte = target.Last();                      // lastByte (required for Boyer-Moore algorithm)

                GPGPU gpu = CudafyHost.GetDevice(CudafyModes.Target, CudafyModes.DeviceId);

                if (TestType == "CUDA")
                {
                    CudafyModule km = CudafyTranslator.Cudafy(eArchitecture.sm_20);
                    gpu.LoadModule(km);
                }
                else
                {
                    CudafyModule km = CudafyTranslator.Cudafy();
                    gpu.LoadModule(km);
                } 
                
                // allocate the memory on the GPU
                uint[] dev_results = gpu.Allocate<uint>(results);
                gpu.Set(dev_results);

                byte[] dev_target = gpu.CopyToDevice<byte>(target);
                int[] dev_lookup = gpu.CopyToDevice<int>(lookup);

                GPGPUProperties prop = gpu.GetDeviceProperties();
                int gpuThreads = prop.MaxThreadsPerBlock;

                //Start timer (benchmarking purposes)
                double time = MeasureTime(() =>
                {
                    //Read file 300MB at a time, feed to GPU, then search for matches
                    using (FileStream DDStream = new FileStream(path, FileMode.Open, FileAccess.Read))
                    {
                        int bytesRead;
                        while ((bytesRead = DDStream.Read(buffer, 0, buffer.Length)) > 0)
                        {
                            byte[] dev_buffer = gpu.CopyToDevice<byte>(buffer);
                            count += buffer.Length;
                            int size = buffer.Length;
                            int blockSize = Math.Min(gpuThreads, (int)Math.Ceiling(size / (float)gpuThreads));
                            gpu.Synchronize();
                            gpu.Launch(gpuThreads, blockSize).analyse(dev_buffer, size, dev_target, lastByte, dev_lookup, dev_results);
                            gpu.Free(dev_buffer);
                            chunkCount++;
                        }
                        DDStream.Dispose();
                    }

                });

                gpu.CopyFromDevice(dev_results, results);
                
                gpu.FreeAll();

        }

[Cudafy]
public static void analyse(GThread thread, byte[] buffer, int size, byte[] target, int lastByte, int[] lookup, uint[] results)
        {
            int i = thread.threadIdx.x + thread.blockIdx.x * thread.blockDim.x;
            int stride = thread.blockDim.x * thread.gridDim.x;

           while (i < size)
            {
                var checkByte = buffer[i];
                if (buffer[i] == lastByte)
                {
                    bool found = true;
                    for (int j = target.Length - 2; j >= 0; j--)
                    {
                        if (buffer[i - target.Length + j + 1] != target[j])
                        {
                            found = false;
                            break;
                        }
                    }

                    if (found == true)
                    {
                        //Found match
                        thread.atomicAdd(ref results[0], 1);
                        i += stride;
                    }
                    else
                    {
                        i += stride;
                    }
                }
                else
                {
                    i += (lookup[checkByte] * stride);
                }
            }
            else
            {
                return;
            }

        }
Jun 27, 2013 at 11:05 PM
Hi

You seem to have misinterperted the arguments for the launch function? The first is grid size (in other words, how many blocks to use), the 2nd is block size (how many threads to use).
Anyhow, usually the way it's done is to break your problem in small pieces and launch a kernel for each piece, finally merging the output of each kernel.
How about you launch many kernels, each for a different section of your 300mb buffer?

Once you've done that, there are many possible improvements. You could have three parallel lanes (cuda streams), each doing the following sequence: a) reading from hdd, b) copying buffer to device, c) launching a sequence of kernels to process said buffer. This way you optimize your system bandwidth in terms of hdd io, device mem copy and kernel use.
Also, you should defenitely get rid of that atomicAdd and do a proper reduction. Also, use shared memory to store as much as possible, such as your lookup and search buffers. Finally, since Boyer–Moore is an average O(N), the bottleneck is on the memory bandwidth and not on instructions, so you should rethink your algorithm to get proper coalesced memory access.

cheers
Jun 27, 2013 at 11:38 PM
Edited Jun 27, 2013 at 11:38 PM
Thanks for the reply and the insight, seems I have misinterpreted the arguments for the launch function. I am very new to the world of GPGPU programming.

How could I implement a multi-kernel approach? Would it to be launch one kernel from the program to control, separate and spur different kernels to process parts of the data?

Thanks again
Jun 28, 2013 at 11:45 AM
"How could I implement a multi-kernel approach? "
You just call Launch() from within a for loop, each time with slightly arguments.
Jun 28, 2013 at 11:47 AM
Better than a multi-kernel approach would be if you modified your current kernel, adding it an outer for loop which would (for each thread) scan a subsection of your 300mb buffer.
Jun 28, 2013 at 11:51 AM
pedritolo1 wrote:
Better than a multi-kernel approach would be if you modified your current kernel, adding it an outer for loop which would (for each thread) scan a subsection of your 300mb buffer.
So I would use i as a loop counter for the threads and have another variable for the index of the byte array?
Jun 28, 2013 at 3:49 PM
Actually, now that I looked again, I noticed that you were already half way towards that goal, but didn't quite finished it, right? I mean you do increment your i variable and all, but in the end you just exit the method, instead of looping.
Jun 28, 2013 at 9:04 PM
...oh dear, that seems to be a schoolboy error. Thanks pedritolo, I'll make the change tonight and see where it leaves it.
Jun 29, 2013 at 1:44 AM
Okay, little bit closer now - it's now returning more results - however I don't believe it's scanning through the entire 300MB segment of data. Could it be because I'm using i as an index and it can't scan through the full 314572800 byte array?

Are there any other variables I could use for a location index in the 300MB buffer that could keep a count and do the correct "strides" to other locations?

Really grateful for all the help on this - it's driving me insane :-)

Latest Code:
[Cudafy]
        public static void analyse(GThread thread, byte[] buffer, int size, byte[] target, int lastByte, int[] lookup, uint[] results)
        {
            int i = thread.threadIdx.x + thread.blockIdx.x * thread.blockDim.x;
            int stride = thread.blockDim.x * thread.gridDim.x;

            uint[] temp = thread.AllocateShared<uint>("temp", 1024);
            temp[thread.threadIdx.x] = 0;

            while (i < size)
            {
                var checkByte = buffer[i];
                if (buffer[i] == lastByte)
                {
                    bool found = true;
                    for (int j = target.Length - 2; j >= 0; j--)
                    {
                        if (buffer[i - target.Length + j + 1] != target[j])
                        {
                            found = false;
                            break;
                        }
                    }

                    if (found)
                    {
                        //Found match
                        temp[buffer[i]] += 1;
                        i += stride;
                    }
                    else
                    {
                        i += stride;
                    }
                }
                else
                {
                    i += (lookup[checkByte] * stride);
                }
            }

            thread.SyncThreads();

            thread.atomicAdd(ref results[0], temp[thread.threadIdx.x]);
           

        }
Jun 29, 2013 at 11:29 AM
Hi
  • you need a synchThreads after temp[thread.threadIdx.x] = 0;
  • you seem to have a race condition here: temp[buffer[i]] += 1;
  • have you tried using the emulator on smaller memory chunks? With fewer threads and blocks? Start small, it'd be easier to debug using the cpu emulator.
Jun 29, 2013 at 11:30 AM
"Could it be because I'm using i as an index and it can't scan through "
I don't think so.
Jul 1, 2013 at 1:03 PM
Hi pedritolo,

Just wanted to say thanks for all of your help regarding this issue. It is now working perfectly. The issue was where (lookup[checkByte] * stride) was called. As the solution is multi-threaded in design, it was skipping over huge chunks if several threads were not hitting the lastByte, the GPU version just needed the stride increased. Now the kernel is producing identical results as the CPU equivalent with an average improvement of 480%. I also corrected the temp[buffer[i]] += 1; to temp[thread.threadIdx.x] += 1; which seemed to have corrected the varying results.

Thanks again!

Ethan
Jul 1, 2013 at 3:35 PM
I'm glad I could help :)
480% is nice, but I suspect it could go much faster with the proper optimizations. It's usual to expect a 10 to 100 fold improvement over a normal cpu on highly optimized algos.
GJ, nevertheless
cheers