[Box Backup-dev] COMMIT r475 - box/chris/general/lib/win32

boxbackup-dev at fluffy.co.uk boxbackup-dev at fluffy.co.uk
Sun Feb 19 23:59:57 GMT 2006


Author: chris
Date: 2006-02-19 23:59:51 +0000 (Sun, 19 Feb 2006)
New Revision: 475

Modified:
   box/chris/general/lib/win32/WinNamedPipeStream.cpp
   box/chris/general/lib/win32/WinNamedPipeStream.h
Log:
* WinNamedPipeStream.cpp, WinNamedPipeStream.h
- Used overlapped I/O to avoid blocking on ReadFile


Modified: box/chris/general/lib/win32/WinNamedPipeStream.cpp
===================================================================
--- box/chris/general/lib/win32/WinNamedPipeStream.cpp	2006-02-18 11:45:01 UTC (rev 474)
+++ box/chris/general/lib/win32/WinNamedPipeStream.cpp	2006-02-19 23:59:51 UTC (rev 475)
@@ -36,6 +36,8 @@
 // --------------------------------------------------------------------------
 WinNamedPipeStream::WinNamedPipeStream()
 	: mSocketHandle(NULL),
+	  mReadableEvent(INVALID_HANDLE_VALUE),
+	  mBytesInBuffer(0),
 	  mReadClosed(false),
 	  mWriteClosed(false),
 	  mIsServer(false),
@@ -77,7 +79,8 @@
 
 	mSocketHandle = CreateNamedPipeW( 
 		pName,                     // pipe name 
-		PIPE_ACCESS_DUPLEX,        // read/write access 
+		PIPE_ACCESS_DUPLEX |       // read/write access 
+		FILE_FLAG_OVERLAPPED,      // enabled overlapped I/O
 		PIPE_TYPE_MESSAGE |        // message type pipe 
 		PIPE_READMODE_MESSAGE |    // message-read mode 
 		PIPE_WAIT,                 // blocking mode 
@@ -108,6 +111,37 @@
 	mWriteClosed = false;
 	mIsServer    = true; // must flush and disconnect before closing
 	mIsConnected = true;
+
+	// create the Readable event
+	mReadableEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+
+	if (mReadableEvent == INVALID_HANDLE_VALUE)
+	{
+		::syslog(LOG_ERR, "Failed to create the Readable event: "
+			"error %d", GetLastError());
+		Close();
+		THROW_EXCEPTION(CommonException, Internal)
+	}
+
+	// initialise the OVERLAPPED structure
+	memset(&mReadOverlap, 0, sizeof(mReadOverlap));
+	mReadOverlap.hEvent = mReadableEvent;
+
+	// start the first overlapped read
+	if (!ReadFile(mSocketHandle, mReadBuffer, sizeof(mReadBuffer),
+		NULL, &mReadOverlap))
+	{
+		DWORD err = GetLastError();
+
+		if (err != ERROR_IO_PENDING)
+		{
+			::syslog(LOG_ERR, "Failed to start overlapped read: "
+				"error %d", err);
+			Close();
+			THROW_EXCEPTION(ConnectionException, 
+				Conn_SocketReadError)
+		}
+	}
 }
 
 // --------------------------------------------------------------------------
@@ -159,7 +193,10 @@
 int WinNamedPipeStream::Read(void *pBuffer, int NBytes, int Timeout)
 {
 	// TODO no support for timeouts yet
-	ASSERT(Timeout == IOStream::TimeOutInfinite)
+	if (Timeout != IOStream::TimeOutInfinite)
+	{
+		THROW_EXCEPTION(CommonException, AssertFailed)
+	}
 	
 	if (mSocketHandle == NULL || !mIsConnected) 
 	{
@@ -167,25 +204,95 @@
 	}
 
 	DWORD NumBytesRead;
-	
-	bool Success = ReadFile( 
-		mSocketHandle, // pipe handle 
-		pBuffer,       // buffer to receive reply 
-		NBytes,        // size of buffer 
-		&NumBytesRead, // number of bytes read 
-		NULL);         // not overlapped 
-	
-	if (!Success)
+
+	if (mIsServer)
 	{
-		THROW_EXCEPTION(ConnectionException, Conn_SocketReadError)
+		// overlapped I/O completed successfully? (wait if needed)
+
+		if (!GetOverlappedResult(mSocketHandle,
+			&mReadOverlap, &NumBytesRead, TRUE))
+		{
+			DWORD err = GetLastError();
+
+			if (err == ERROR_HANDLE_EOF)
+			{
+				mReadClosed = true;
+			}
+			else
+			{
+				::syslog(LOG_ERR, "Failed to wait for "
+					"ReadFile to complete: error %d", err);
+				Close();
+				THROW_EXCEPTION(ConnectionException, 
+					Conn_SocketReadError)
+			}
+		}
+
+		size_t BytesToCopy = NumBytesRead + mBytesInBuffer;
+		size_t BytesRemaining = 0;
+
+		if (BytesToCopy > NBytes)
+		{
+			BytesRemaining = BytesToCopy - NBytes;
+			BytesToCopy = NBytes;
+		}
+
+		memcpy(pBuffer, mReadBuffer, BytesToCopy);
+		memcpy(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining);
+
+		mBytesInBuffer = BytesRemaining;
+		NumBytesRead = BytesToCopy;
+
+		// start the next overlapped read
+		if (!ReadFile(mSocketHandle, 
+			mReadBuffer + mBytesInBuffer, 
+			sizeof(mReadBuffer) - mBytesInBuffer,
+			NULL, &mReadOverlap))
+		{
+			DWORD err = GetLastError();
+			if (err == ERROR_IO_PENDING)
+			{
+				ResetEvent(mReadableEvent);
+			}
+			else if (err == ERROR_HANDLE_EOF)
+			{
+				mReadClosed = true;
+			}
+			else
+			{
+				::syslog(LOG_ERR, "Failed to start "
+					"overlapped read: error %d", err);
+				Close();
+				THROW_EXCEPTION(ConnectionException, 
+					Conn_SocketReadError)
+			}
+		}
+
+		// If the read succeeded immediately, leave the event 
+		// signaled, so that we will be called again to process 
+		// the newly read data and start another overlapped read.
 	}
-	
-	// Closed for reading at EOF?
-	if (NumBytesRead == 0)
+	else
 	{
-		mReadClosed = true;
+		if (!ReadFile( 
+			mSocketHandle, // pipe handle 
+			pBuffer,       // buffer to receive reply 
+			NBytes,        // size of buffer 
+			&NumBytesRead, // number of bytes read 
+			NULL))         // not overlapped 
+		{
+			Close();
+			THROW_EXCEPTION(ConnectionException, 
+				Conn_SocketReadError)
+		}
+		
+		// Closed for reading at EOF?
+		if (NumBytesRead == 0)
+		{
+			mReadClosed = true;
+		}
 	}
-	
+		
 	return NumBytesRead;
 }
 
@@ -223,7 +330,7 @@
 
 		if (!Success)
 		{
-			mWriteClosed = true;	// assume can't write again
+			Close();
 			THROW_EXCEPTION(ConnectionException, 
 				Conn_SocketWriteError)
 		}
@@ -255,7 +362,26 @@
 	}
 
 	if (mIsServer)
-	{	
+	{
+		if (!CancelIo(mSocketHandle))
+		{
+			::syslog(LOG_ERR, "Failed to cancel outstanding "
+				"I/O: error %d", GetLastError());
+		}
+
+		if (mReadableEvent == INVALID_HANDLE_VALUE)
+		{
+			::syslog(LOG_ERR, "Failed to destroy Readable "
+				"event: invalid handle");
+		}
+		else if (!CloseHandle(mReadableEvent))
+		{
+			::syslog(LOG_ERR, "Failed to destroy Readable "
+				"event: error %d", GetLastError());
+		}
+
+		mReadableEvent = INVALID_HANDLE_VALUE;
+
 		if (!FlushFileBuffers(mSocketHandle))
 		{
 			::syslog(LOG_INFO, "FlushFileBuffers failed: %d", 
@@ -275,6 +401,8 @@
 
 	mSocketHandle = NULL;
 	mIsConnected = false;
+	mReadClosed  = true;
+	mWriteClosed = true;
 
 	if (!result) 
 	{
@@ -309,4 +437,27 @@
 	return mWriteClosed;
 }
 
+// --------------------------------------------------------------------------
+//
+// Function
+//		Name:    IOStream::WriteAllBuffered()
+//		Purpose: Ensures that any data which has been buffered is written to the stream
+//		Created: 2003/08/26
+//
+// --------------------------------------------------------------------------
+void WinNamedPipeStream::WriteAllBuffered()
+{
+	if (mSocketHandle == NULL || !mIsConnected) 
+	{
+		THROW_EXCEPTION(ServerException, BadSocketHandle)
+	}
+	
+	if (!FlushFileBuffers(mSocketHandle))
+	{
+		::syslog(LOG_WARNING, "FlushFileBuffers failed: %d", 
+			GetLastError());
+	}
+}
+
+
 #endif // WIN32

Modified: box/chris/general/lib/win32/WinNamedPipeStream.h
===================================================================
--- box/chris/general/lib/win32/WinNamedPipeStream.h	2006-02-18 11:45:01 UTC (rev 474)
+++ box/chris/general/lib/win32/WinNamedPipeStream.h	2006-02-19 23:59:51 UTC (rev 475)
@@ -36,13 +36,15 @@
 	virtual int Read(void *pBuffer, int NBytes, 
 		int Timeout = IOStream::TimeOutInfinite);
 	virtual void Write(const void *pBuffer, int NBytes);
+	virtual void WriteAllBuffered();
 	virtual void Close();
 	virtual bool StreamDataLeft();
 	virtual bool StreamClosed();
 	bool IsConnected() { return mIsConnected; }
+	HANDLE GetSocketHandle() { return mSocketHandle; }
+	HANDLE GetReadableEvent() { return mReadableEvent; }
 
 protected:
-	HANDLE GetSocketHandle();
 	void MarkAsReadClosed()  {mReadClosed  = true;}
 	void MarkAsWriteClosed() {mWriteClosed = true;}
 
@@ -51,6 +53,10 @@
 		{ /* do not call */ }
 
 	HANDLE mSocketHandle;
+	HANDLE mReadableEvent;
+	OVERLAPPED mReadOverlap;
+	uint8_t mReadBuffer[4096];
+	size_t  mBytesInBuffer;
 	bool mReadClosed;
 	bool mWriteClosed;
 	bool mIsServer;




More information about the Boxbackup-dev mailing list