时间: 2021-07-31 作者:daque
[c#]
using system;
using system.io;
using system.threading;
using system.runtime.interopservices;
public class bulkimageprocasync
{
public const string imagebasename = "tmpimage-";
public const int numimages = 200;
public const int numpixels = 512*512;
// processimage has a simple o(n) loop, and you can vary the number
// of times you repeat that loop to make the application more cpu-bound
// or more io-bound.
public static int processimagerepeats = 20;
// threads must decrement numimagestofinish, and protect
// their access to it through a mutex.
public static int numimagestofinish = numimages;
public static object numimagesmutex = new object[0];
// waitobject is signalled when all image processing is done.
public static object waitobject = new object[0];
public class imagestateobject
{
public byte[] pixels;
public int imagenum;
public filestream fs;
}
public static void makeimagefiles()
{
int sides = (int) math.sqrt(numpixels);
console.write("making "+numimages+" "+sides+"x"+sides+" images...");
byte[] pixels = new byte[numpixels];
for(int i=0; i<numpixels; i++)
pixels[i] = (byte) i;
for(int i=0; i<numimages; i++)
{
filestream fs = new filestream(imagebasename+i+".tmp", filemode.create,
fileaccess.write, fileshare.none, 8192, false);
fs.write(pixels, 0, pixels.length);
flushfilebuffers(fs.handle);
fs.close();
}
console.writeline("done.");
}
public static void readinimagecallback(iasyncresult asyncresult)
{
imagestateobject state = (imagestateobject) asyncresult.asyncstate;
//console.writeline("image "+state.imagenum+" was read
"+(asyncresult.completedsynchronously ? "synchronously" :
"asynchronously"));
stream stream = state.fs;//(stream) asyncresult.asyncobject;
int bytesread = stream.endread(asyncresult);
if (bytesread != numpixels)
throw new exception("in readinimagecallback, got wrong number of bytes
from the image!got: "+bytesread);
processimage(state.pixels, state.imagenum);
stream.close();
// now write out the image.
// using asynchronous io here appears not to be best practice.it ends up
// swamping the threadpool, since the threadpool threads are blocked
// on io requests that we've just queued to the threadpool.
filestream fs = new filestream(imagebasename+state.imagenum+".done",
filemode.create, fileaccess.write, fileshare.none, 4096, false);
fs.write(state.pixels, 0, numpixels);
fs.close();
// this application model uses too much memory.
// releasing memory as soon as possible is a good idea, especially global
// state.
state.pixels = null;
// record that an image is done now.
lock(numimagesmutex)
{
numimagestofinish--;
if (numimagestofinish==0)
{
monitor.enter(waitobject);
monitor.pulse(waitobject);
monitor.exit(waitobject);
}
}
}
public static void processimage(byte[] pixels, int imagenum)
{
console.writeline("processimage "+imagenum);
// do some cpu-intensive operation on the image.
for(int i=0; i<processimagerepeats; i++)
for(int j=0; j<numpixels; j++)
pixels[j] += 1;
console.writeline("processimage "+imagenum+" done.");
}
public static void processimagesinbulk()
{
console.writeline("processing images...");
long t0 = environment.tickcount;
numimagestofinish = numimages;
asynccallback readimagecallback = new asynccallback(readinimagecallback);
for(int i=0; i<numimages; i++)
{
imagestateobject state = new imagestateobject();
state.pixels = new byte[numpixels];
state.imagenum = i;
// very large items are read only once, so you can make the
// buffer on the file stream very small to save memory.
filestream fs = new filestream(imagebasename+i+".tmp",
filemode.open, fileaccess.read, fileshare.read, 1, true);
state.fs = fs;
fs.beginread(state.pixels, 0, numpixels, readimagecallback, state);
}
// determine whether all images are done being processed.
// if not, block until all are finished.
bool mustblock = false;
lock (numimagesmutex)
{
if (numimagestofinish > 0)
mustblock = true;
}
if (mustblock)
{
console.writeline("all worker threads are queued... blocking until they
complete.numleft: "+numimagestofinish);
monitor.enter(waitobject);
monitor.wait(waitobject);
monitor.exit(waitobject);
}
long t1 = environment.tickcount;
console.writeline("total time processing images: {0} ms", (t1-t0));
}
public static void cleanup()
{
for(int i=0; i<numimages; i++)
{
file.delete(imagebasename+i+".tmp");
file.delete(imagebasename+i+".done");
}
}
public static void trytocleardiskcache()
{
// try to force all pending writes to disk, andclear the
// disk cache of any data.
byte[] bytes = new byte[100*(1<<20)];
for(int i=0; i<bytes.length; i++)
bytes[i] = 0;
bytes = null;
gc.collect();
thread.sleep(2000);
}
public static void main(string[] args)
{
console.writeline("bulk image processing sample application, using
async io");
console.writeline("simulates applying a simple transformation to
"+numimages+" \"images\"");
console.writeline("(ie, async filestream & threadpool benchmark)");
console.writeline("warning - this test requires "+(numpixels *
numimages * 2)+" bytes of tmp space");
if (args.length==1)
{
processimagerepeats = int32.parse(args[0]);
console.writeline("processimage inner loop -
"+processimagerepeats);
}
makeimagefiles();
trytocleardiskcache();
processimagesinbulk();
cleanup();
}
[dllimport("kernel32", setlasterror=true)]
private static extern void flushfilebuffers(intptr handle);
}
以次是同一假如的同步示例。[c#]
using system;
using system.io;
using system.threading;
using system.runtime.interopservices;
public class bulkimageprocsync
{
public const string imagebasename = "tmpimage-";
public const int numimages = 200;
public const int numpixels = 512*512;
// processimage has a simple o(n) loop, and you can vary the number
// of times you repeat that loop to make the application more cpu-bound or
// more io-bound.
public static int processimagerepeats = 20;
public static void makeimagefiles()
{
int sides = (int) math.sqrt(numpixels);
console.write("making "+numimages+" "+sides+"x"+sides+" images...");
byte[] pixels = new byte[numpixels];
for(int i=0; i<numpixels; i++)
pixels[i] = (byte) i;
for(int i=0; i<numimages; i++) {
filestream fs = new filestream(imagebasename+i+".tmp", filemode.create, fileaccess.write, fileshare.none, 8192, false);
fs.write(pixels, 0, pixels.length);
flushfilebuffers(fs.handle);
fs.close();
}
console.writeline("done.");
}
public static void processimage(byte[] pixels, int imagenum)
{
console.writeline("processimage "+imagenum);
// do some cpu-intensive operation on the image
for(int i=0; i<processimagerepeats; i++)
for(int j=0; j<numpixels; j++)
pixels[j] += 1;
console.writeline("processimage "+imagenum+" done.");
}
public static void processimagesinbulk()
{
console.writeline("processing images...");
long t0 = environment.tickcount;
byte[] pixels = new byte[numpixels];
for(int i=0; i<numimages; i++)
{
filestream input = new filestream(imagebasename+i+".tmp", filemode.open,
fileaccess.read, fileshare.read, 4196, false);
input.read(pixels, 0, numpixels);
input.close();
processimage(pixels, i);
filestream output = new filestream(imagebasename+i+".done",
filemode.create, fileaccess.write, fileshare.none, 4196, false);
output.write(pixels, 0, numpixels);
output.close();
}
long t1 = environment.tickcount;
console.writeline("total time processing images: {0} ms", (t1-t0));
}
public static void cleanup()
{
for(int i=0; i<numimages; i++)
{
file.delete(imagebasename+i+".tmp");
file.delete(imagebasename+i+".done");
}
}
public static void trytocleardiskcache()
{
byte[] bytes = new byte[100*(1<<20)];
for(int i=0; i<bytes.length; i++)
bytes[i] = 0;
bytes = null;
gc.collect();
thread.sleep(2000);
}
public static void main(string[] args)
{
console.writeline("bulk image processing sample application, using synchronous io");
console.writeline("simulates applying a simple transformation to "+numimages+" \"images\"");
console.writeline("(ie, sync filestream benchmark)");
console.writeline("warning - this test requires "+(numpixels * numimages * 2)+" bytes of tmp space");
if (args.length==1) {
processimagerepeats = int32.parse(args[0]);
console.writeline("processimage inner loop - "+processimagerepeats);
}
makeimagefiles();
trytocleardiskcache();
processimagesinbulk();
cleanup();
}
[dllimport("kernel32", setlasterror=true)]
private static extern void flushfilebuffers(intptr handle);
}