複数のプロセスから同じファイルにログを出力する

本来なら、AdoNetAppenderとかEventLogAppenderでDBやイベントログに書くのが筋なんだろうけど、
どうしてもファイル出力がしたい。でもSyslogサーバとかUDB受信プロセスとかを立てたくない場合。


まず、FileAppenderのLockingModelにMinimalLockを指定すれば、書き込み時にのみロックを取得するので競合の可能性は低くなる


とはいえ、競合の可能性が0ではないので、MinimalLockを拡張して、
ログ書き込み時にはキューにためるだけして、別スレッドでファイルに書き込むようにしてみた。
そして、書き込み時にロックが取得できなかったときには、リトライするようにしているので、
同時に書き込まれても問題ないと思われる。


ただし、複数プロセスから書き込まれたときに、プロセスないでの出力順序は保たれるが、プロセス間ではリトライ等で時間順に出力されない場合があるので注意が必要。


























using System;
using System.Collections;
using System.IO;
using System.Text;
using System.Threading;
using log4net.Appender;

namespace ConcurrentLog
{
///


/// ConcurrentMinimalLock の概要の説明です。
///

public class ConcurrentMinimalLock : FileAppender.MinimalLock
{
private string m_filename;
private bool m_append;
private Stream m_stream=null;
private ConcurrentStream c_stream = null;

///


/// Prepares to open the file when the first message is logged.
///

/// The filename to use
/// Whether to append to the file, or overwrite
/// The encoding to use
///
///
/// Open the file specified and prepare for logging.
/// No writes will be made until is called.
/// Must be called before any calls to ,
/// and .
///

///

public override void OpenFile(string filename, bool append, Encoding encoding)
{
m_filename=filename;
m_append=append;
}

///


/// Close the file
///

///
///
/// Close the file. No further writes will be made.
///

///

public override void CloseFile()
{
// NOP
}

///


/// Acquire the lock on the file
///

/// A stream that is ready to be written to.
///
///
/// Acquire the lock on the file in preparation for writing to it.
/// Return a stream pointing to the file.
/// must be called to release the lock on the output file.
///

///

public override Stream AcquireLock()
{
if (m_stream==null)
{
try
{
using(CurrentAppender.SecurityContext.Impersonate(this))
{
// Ensure that the directory structure exists
string directoryFullName = Path.GetDirectoryName(m_filename);

// Only create the directory if it does not exist
// doing this check here resolves some permissions failures
if (!Directory.Exists(directoryFullName))
{
Directory.CreateDirectory(directoryFullName);
}

if(c_stream == null)
{
c_stream = ConcurrentStream.GetInstance(m_filename, m_append, FileAccess.Write, FileShare.Read);
}
m_stream = c_stream;
m_append=true;
}
}
catch (Exception e1)
{
CurrentAppender.ErrorHandler.Error("Unable to acquire lock on file "+m_filename+". "+e1.Message);
}
}
return m_stream;
}

///


/// Release the lock on the file
///

///
///
/// Release the lock on the file. No further writes will be made to the
/// stream until is called again.
///

///

public override void ReleaseLock()
{
using(CurrentAppender.SecurityContext.Impersonate(this))
{
m_stream.Close();
m_stream=null;
}
}
}

public class ConcurrentStream : Stream
{
private string path;
private bool append;
private FileAccess access;
private FileShare share;

private QueueManager queueManager;
private static ConcurrentStream instance;


public static ConcurrentStream GetInstance(string path,
bool append,
FileAccess access,
FileShare share)
{
if(instance == null)
{
instance = new ConcurrentStream(path,append,access,share);
}
return instance;
}

private ConcurrentStream (
string path,
bool append,
FileAccess access,
FileShare share
)
{
this.path = path;
this.append = append;
this.access = access;
this.share = share;
this.queueManager = QueueManager.GetInstance(path,append,access,share);
}

public override bool CanRead
{
get{return false;}
}

public override bool CanSeek
{
get
{
return false;
}
}

public override bool CanWrite
{
get
{
return true;
}
}

public override long Length
{
get
{
return 0;
}
}

public override long Position
{
get
{
return 0;
}
set
{
}
}

public override int Read(byte[] buffer, int offset, int count)
{
return 0;
}

public override long Seek(long offset, SeekOrigin origin)
{
return 0;
}

public override void SetLength(long value)
{
}

public override void Flush()
{
}

public override void Write(byte[] buffer, int offset, int count)
{
CachedEntry entry = new CachedEntry(buffer,offset,count);
queueManager.Enqueue(entry);
}
}


internal class QueueManager
{

private string path;
private bool append;
private FileAccess access;
private FileShare share;

private Queue syncQueue = Queue.Synchronized(new Queue());

private bool running = false;
private Random rnd = new Random();
private DateTime retryTime = DateTime.MaxValue;

private static TimeSpan RETRY_MAX_SPAN = TimeSpan.FromMinutes(1);
private static QueueManager instance;

private const int MAX_BATCH_SIZE = 100;


public static QueueManager GetInstance(string path,
bool append,
FileAccess access,
FileShare share)
{
if(instance == null)
{
instance = new QueueManager(path,append,access,share);
}
return instance;
}

private QueueManager(
string path,
bool append,
FileAccess access,
FileShare share)
{
this.path = path;
this.append = append;
this.access = access;
this.share = share;
}


internal void Enqueue(CachedEntry entry)
{
syncQueue.Enqueue(entry);

if(!running)
{
lock(this)
{
running = true;
Thread th = new Thread(new ThreadStart(this.Dequeue));
th.Start();
}
}
}

private void Dequeue()
{
CachedEntry entry =null;

try
{
using(FileStream fs = new FileStream(path,FileMode.Append,access,share))
{
int processedCount = 0;
while(true)
{
processedCount ++;
if(syncQueue.Count == 0)
{ //queueの中身が存在しない場合は、終了
lock(this)
{
running = false;
return;
}
}
else
{
entry = (CachedEntry) syncQueue.Dequeue();
}

if(entry !=null )
{
Write(entry,fs);
}
}
}
}
catch(IOException ioe)
{
if(DateTime.Now - retryTime > RETRY_MAX_SPAN)
{
lock(this)
{
running = false;
}
throw;
}
//Random時間待機した後にリトライ
Thread.Sleep(rnd.Next(1000));
Console.WriteLine("Retry:" + DateTime.Now);
retryTime = DateTime.Now;
Dequeue();
}
}

private void Write(CachedEntry entry,FileStream fs)
{
fs.Write(entry.Buffer,entry.Offset,entry.Count);
fs.Flush();
}
}


internal class CachedEntry
{
private byte[] buffer;
private int offset;
private int count;

internal byte[] Buffer
{
get{return buffer;}
}

internal int Offset
{
get {return offset;}
}

internal int Count
{
get {return count;}
}

internal CachedEntry(byte[] buffer, int offset, int count)
{
this.buffer = new byte[buffer.Length];
buffer.CopyTo(this.buffer,0);
this.offset = offset;
this.count = count;
}
}
}