[Box Backup-commit] COMMIT r3488 - box/trunk/lib/server

subversion at boxbackup.org subversion at boxbackup.org
Fri Dec 26 23:16:24 GMT 2014


Author: chris
Date: 2014-12-26 23:16:24 +0000 (Fri, 26 Dec 2014)
New Revision: 3488

Modified:
   box/trunk/lib/server/WinNamedPipeStream.cpp
   box/trunk/lib/server/WinNamedPipeStream.h
Log:
Add support for timeouts on named pipe writes, using overlapped I/O.

Modified: box/trunk/lib/server/WinNamedPipeStream.cpp
===================================================================
--- box/trunk/lib/server/WinNamedPipeStream.cpp	2014-12-26 23:16:20 UTC (rev 3487)
+++ box/trunk/lib/server/WinNamedPipeStream.cpp	2014-12-26 23:16:24 UTC (rev 3488)
@@ -21,6 +21,7 @@
 
 #include "autogen_ConnectionException.h"
 #include "autogen_ServerException.h"
+#include "BoxTime.h"
 #include "CommonException.h"
 #include "Socket.h"
 #include "WinNamedPipeStream.h"
@@ -106,6 +107,12 @@
 // --------------------------------------------------------------------------
 WinNamedPipeStream::~WinNamedPipeStream()
 {
+	for(std::list<WriteInProgress*>::iterator i = mWritesInProgress.begin();
+		i != mWritesInProgress.end(); i++)
+	{
+		delete *i;
+	}
+
 	if (mSocketHandle != INVALID_HANDLE_VALUE)
 	{
 		try
@@ -240,6 +247,74 @@
 	mIsConnected = true;
 }
 
+// Returns true if the operation is complete (and you will need to start
+// another one), or false otherwise (you can wait again).
+bool WinNamedPipeStream::WaitForOverlappedOperation(OVERLAPPED& Overlapped,
+	int Timeout, int64_t* pBytesTransferred)
+{
+	if (Timeout == IOStream::TimeOutInfinite)
+	{
+		Timeout = INFINITE;
+	}
+	
+	// overlapped I/O completed successfully? (wait if needed)
+	DWORD waitResult = WaitForSingleObject(Overlapped.hEvent, Timeout);
+	DWORD NumBytesTransferred = -1;
+
+	if (waitResult == WAIT_ABANDONED)
+	{
+		THROW_EXCEPTION_MESSAGE(ServerException, BadSocketHandle,
+			"Wait for command socket read abandoned by system");
+	}
+
+	if (waitResult == WAIT_TIMEOUT)
+	{
+		// wait timed out, nothing to read
+		*pBytesTransferred = 0;
+		return false;
+	}
+
+	if (waitResult != WAIT_OBJECT_0)
+	{
+		THROW_EXCEPTION_MESSAGE(ServerException, BadSocketHandle,
+			"Failed to wait for command socket read: unknown "
+			"result code: " << waitResult);
+	}
+
+	// object is ready to read from
+	if (GetOverlappedResult(mSocketHandle, &Overlapped,
+		&NumBytesTransferred, TRUE))
+	{
+		*pBytesTransferred = NumBytesTransferred;
+		return true;
+	}
+
+	// We are here because there was an error.
+	DWORD err = GetLastError();
+
+	if (err == ERROR_HANDLE_EOF)
+	{
+		Close();
+		return true;
+	}
+
+	// ERROR_NO_DATA is a strange name for 
+	// "The pipe is being closed". No exception wanted.
+
+	if (err == ERROR_NO_DATA || 
+		err == ERROR_PIPE_NOT_CONNECTED ||
+		err == ERROR_BROKEN_PIPE)
+	{
+		BOX_INFO(BOX_WIN_ERRNO_MESSAGE(err,
+			"Control client disconnected"));
+		Close();
+		return true;
+	}
+
+	THROW_WIN_ERROR_NUMBER("Failed to wait for OVERLAPPED operation "
+		"to complete", err, ConnectionException, SocketReadError);
+}
+
 // --------------------------------------------------------------------------
 //
 // Function
@@ -272,169 +347,79 @@
 		THROW_EXCEPTION(CommonException, AssertFailed)
 	}
 
-	DWORD NumBytesRead;
+	int64_t NumBytesRead;
 
-	if (mIsServer)
+	// satisfy from buffer if possible, to avoid
+	// blocking on read.
+	bool needAnotherRead = false;
+	if (mBytesInBuffer == 0)
 	{
-		// satisfy from buffer if possible, to avoid
-		// blocking on read.
-		bool needAnotherRead = false;
-		if (mBytesInBuffer == 0)
-		{
-			// overlapped I/O completed successfully? 
-			// (wait if needed)
-			DWORD waitResult = WaitForSingleObject(
-				mReadOverlap.hEvent, Timeout);
+		needAnotherRead = WaitForOverlappedOperation(
+			mReadOverlap, Timeout, &NumBytesRead);
+	}
+	else
+	{
+		// Just return the existing data from the buffer
+		// this time around. The caller should call again,
+		// and then the buffer will be empty.
+		NumBytesRead = 0;
+	}
 
-			if (waitResult == WAIT_ABANDONED)
-			{
-				BOX_ERROR("Wait for command socket read "
-					"abandoned by system");
-				THROW_EXCEPTION(ServerException,
-					BadSocketHandle);
-			}
-			else if (waitResult == WAIT_TIMEOUT)
-			{
-				// wait timed out, nothing to read
-				NumBytesRead = 0;
-			}
-			else if (waitResult != WAIT_OBJECT_0)
-			{
-				BOX_ERROR("Failed to wait for command "
-					"socket read: unknown result " <<
-					waitResult);
-			}
-			// object is ready to read from
-			else if (GetOverlappedResult(mSocketHandle,
-				&mReadOverlap, &NumBytesRead, TRUE))
-			{
-				needAnotherRead = true;
-			}
-			else
-			{
-				DWORD err = GetLastError();
+	size_t BytesToCopy = NumBytesRead + mBytesInBuffer;
+	size_t BytesRemaining = 0;
 
-				if (err == ERROR_HANDLE_EOF)
-				{
-					mReadClosed = true;
-				}
-				else 
-				{
-					if (err == ERROR_BROKEN_PIPE)
-					{
-						BOX_NOTICE("Control client "
-							"disconnected");
-					}
-					else
-					{
-						BOX_ERROR("Failed to wait for "
-							"ReadFile to complete: "
-							<< GetErrorMessage(err));
-					}
+	if (BytesToCopy > (size_t)NBytes)
+	{
+		BytesRemaining = BytesToCopy - NBytes;
+		BytesToCopy = NBytes;
+	}
 
-					Close();
-					THROW_EXCEPTION(ConnectionException, 
-						SocketReadError)
-				}
-			}
-		}
-		else
-		{
-			NumBytesRead = 0;
-		}
+	memcpy(pBuffer, mReadBuffer, BytesToCopy);
+	memmove(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining);
 
-		size_t BytesToCopy = NumBytesRead + mBytesInBuffer;
-		size_t BytesRemaining = 0;
+	mBytesInBuffer = BytesRemaining;
+	NumBytesRead = BytesToCopy;
 
-		if (BytesToCopy > (size_t)NBytes)
+	if (needAnotherRead)
+	{
+		// reinitialise the OVERLAPPED structure
+		memset(&mReadOverlap, 0, sizeof(mReadOverlap));
+		mReadOverlap.hEvent = mReadableEvent;
+	}
+
+	// start the next overlapped read
+	if (needAnotherRead && !ReadFile(mSocketHandle, 
+		mReadBuffer + mBytesInBuffer, 
+		sizeof(mReadBuffer) - mBytesInBuffer,
+		NULL, &mReadOverlap))
+	{
+		DWORD err = GetLastError();
+		if (err == ERROR_IO_PENDING)
 		{
-			BytesRemaining = BytesToCopy - NBytes;
-			BytesToCopy = NBytes;
+			// Don't reset yet, there might be data
+			// in the buffer waiting to be read, 
+			// will check below.
+			// ResetEvent(mReadableEvent);
 		}
-
-		memcpy(pBuffer, mReadBuffer, BytesToCopy);
-		memmove(mReadBuffer, mReadBuffer + BytesToCopy, BytesRemaining);
-
-		mBytesInBuffer = BytesRemaining;
-		NumBytesRead = BytesToCopy;
-
-		if (needAnotherRead)
+		else if (err == ERROR_HANDLE_EOF)
 		{
-			// reinitialise the OVERLAPPED structure
-			memset(&mReadOverlap, 0, sizeof(mReadOverlap));
-			mReadOverlap.hEvent = mReadableEvent;
+			mReadClosed = true;
 		}
-
-		// start the next overlapped read
-		if (needAnotherRead && !ReadFile(mSocketHandle, 
-			mReadBuffer + mBytesInBuffer, 
-			sizeof(mReadBuffer) - mBytesInBuffer,
-			NULL, &mReadOverlap))
+		else if (err == ERROR_BROKEN_PIPE)
 		{
-			DWORD err = GetLastError();
-			if (err == ERROR_IO_PENDING)
-			{
-				// Don't reset yet, there might be data
-				// in the buffer waiting to be read, 
-				// will check below.
-				// ResetEvent(mReadableEvent);
-			}
-			else if (err == ERROR_HANDLE_EOF)
-			{
-				mReadClosed = true;
-			}
-			else if (err == ERROR_BROKEN_PIPE)
-			{
-				BOX_ERROR("Control client disconnected");
-				mReadClosed = true;
-			}
-			else
-			{
-				BOX_ERROR("Failed to start overlapped read: "
-					<< GetErrorMessage(err));
-				Close();
-				THROW_EXCEPTION(ConnectionException, 
-					SocketReadError)
-			}
+			BOX_ERROR("Control client disconnected");
+			mReadClosed = true;
 		}
-	}
-	else
-	{
-		if (!ReadFile( 
-			mSocketHandle, // pipe handle 
-			pBuffer,       // buffer to receive reply 
-			NBytes,        // size of buffer 
-			&NumBytesRead, // number of bytes read 
-			NULL))         // not overlapped 
+		else
 		{
-			DWORD err = GetLastError();
-		
+			BOX_ERROR("Failed to start overlapped read: "
+				<< GetErrorMessage(err));
 			Close();
-
-			// ERROR_NO_DATA is a strange name for 
-			// "The pipe is being closed". No exception wanted.
-
-			if (err == ERROR_NO_DATA || 
-				err == ERROR_PIPE_NOT_CONNECTED) 
-			{
-				NumBytesRead = 0;
-			}
-			else
-			{
-				BOX_ERROR("Failed to read from control socket: "
-					<< GetErrorMessage(err));
-				THROW_EXCEPTION(ConnectionException, 
-					SocketReadError)
-			}
+			THROW_EXCEPTION(ConnectionException, 
+				SocketReadError)
 		}
-		
-		// Closed for reading at EOF?
-		if (NumBytesRead == 0)
-		{
-			mReadClosed = true;
-		}
 	}
-		
+
 	return NumBytesRead;
 }
 
@@ -446,8 +431,15 @@
 //		Created: 2003/07/31
 //
 // --------------------------------------------------------------------------
-void WinNamedPipeStream::Write(const void *pBuffer, int NBytes)
+void WinNamedPipeStream::Write(const void *pBuffer, int NBytes, int Timeout)
 {
+	// Calculate the deadline at the beginning. Not valid if Timeout is
+	// IOStream::TimeOutInfinite!
+	ASSERT(Timeout != IOStream::TimeOutInfinite);
+
+	box_time_t deadline = GetCurrentBoxTime() +
+		MilliSecondsToBoxTime(Timeout);
+
 	if (mSocketHandle == INVALID_HANDLE_VALUE || !mIsConnected) 
 	{
 		THROW_EXCEPTION(ServerException, BadSocketHandle)
@@ -455,41 +447,59 @@
 	
 	// Buffer in byte sized type.
 	ASSERT(sizeof(char) == 1);
-	const char *pByteBuffer = (char *)pBuffer;
-	
-	int NumBytesWrittenTotal = 0;
+	WriteInProgress* new_write = new WriteInProgress(
+		std::string((char *)pBuffer, NBytes));
 
-	while (NumBytesWrittenTotal < NBytes)
+	// Start the WriteFile operation, and add to queue if pending.
+	BOOL Success = WriteFile( 
+		mSocketHandle,    // pipe handle 
+		new_write->mBuffer.c_str(), // message 
+		NBytes, // message length 
+		NULL, // bytes written this time
+		&(new_write->mOverlap));
+
+	if (Success == TRUE)
 	{
-		DWORD NumBytesWrittenThisTime = 0;
+		BOX_NOTICE("Write claimed success while overlapped?");
+		mWritesInProgress.push_back(new_write);
+	}
+	else
+	{
+		DWORD err = GetLastError();
 
-		bool Success = WriteFile( 
-			mSocketHandle,    // pipe handle 
-			pByteBuffer + NumBytesWrittenTotal, // message 
-			NBytes      - NumBytesWrittenTotal, // message length 
-			&NumBytesWrittenThisTime, // bytes written this time
-			NULL);            // not overlapped 
-
-		if (!Success)
+		if (err == ERROR_IO_PENDING)
 		{
-			// ERROR_NO_DATA is a strange name for 
-			// "The pipe is being closed".
+			BOX_TRACE("WriteFile is pending, adding to queue");
+			mWritesInProgress.push_back(new_write);
+		}
+		else
+		{
+			// Not in progress any more, pop it
+			Close();
+			THROW_WIN_ERROR_NUMBER("Failed to start overlapped "
+				"write", err, ConnectionException,
+				SocketWriteError);
+		}
+	}
 
-			DWORD err = GetLastError();
+	// Wait for previous WriteFile operations to complete, one at a time,
+	// until the deadline expires.
+	for(box_time_t remaining = deadline - GetCurrentBoxTime();
+		remaining > 0 && !mWritesInProgress.empty();
+		remaining = deadline - GetCurrentBoxTime())
+	{
+		int new_timeout = BoxTimeToMilliSeconds(remaining);
+		WriteInProgress* oldest_write =
+			*(mWritesInProgress.begin());
 
-			if (err != ERROR_NO_DATA)
-			{
-				BOX_ERROR("Failed to write to control "
-					"socket: " << GetErrorMessage(err));
-			}
-
-			Close();
-
-			THROW_EXCEPTION(ConnectionException, 
-				SocketWriteError)
+		int64_t bytes_written = 0;
+		if(WaitForOverlappedOperation(oldest_write->mOverlap,
+			new_timeout, &bytes_written))
+		{
+			// This one is complete, pop it and start a new one
+			delete oldest_write;
+			mWritesInProgress.pop_front();
 		}
-
-		NumBytesWrittenTotal += NumBytesWrittenThisTime;
 	}
 }
 
@@ -514,58 +524,50 @@
 		THROW_EXCEPTION(ServerException, BadSocketHandle)
 	}
 
-	if (mIsServer)
+	if (!CancelIo(mSocketHandle))
 	{
-		if (!CancelIo(mSocketHandle))
-		{
-			BOX_ERROR("Failed to cancel outstanding I/O: " <<
-				GetErrorMessage(GetLastError()));
-		}
+		BOX_ERROR("Failed to cancel outstanding I/O: " <<
+			GetErrorMessage(GetLastError()));
+	}
 
-		if (mReadableEvent == INVALID_HANDLE_VALUE)
-		{
-			BOX_ERROR("Failed to destroy Readable event: "
-				"invalid handle");
-		}
-		else if (!CloseHandle(mReadableEvent))
-		{
-			BOX_ERROR("Failed to destroy Readable event: " <<
-				GetErrorMessage(GetLastError()));
-		}
+	if (mReadableEvent == INVALID_HANDLE_VALUE)
+	{
+		BOX_ERROR("Failed to destroy Readable event: "
+			"invalid handle");
+	}
+	else if (!CloseHandle(mReadableEvent))
+	{
+		BOX_ERROR("Failed to destroy Readable event: " <<
+			GetErrorMessage(GetLastError()));
+	}
 
-		mReadableEvent = INVALID_HANDLE_VALUE;
+	mReadableEvent = INVALID_HANDLE_VALUE;
 
-		if (!FlushFileBuffers(mSocketHandle))
+	if (!FlushFileBuffers(mSocketHandle))
+	{
+		BOX_ERROR("Failed to FlushFileBuffers: " <<
+			GetErrorMessage(GetLastError()));
+	}
+
+	if (!DisconnectNamedPipe(mSocketHandle))
+	{
+		DWORD err = GetLastError();
+		if (err != ERROR_PIPE_NOT_CONNECTED)
 		{
-			BOX_ERROR("Failed to FlushFileBuffers: " <<
-				GetErrorMessage(GetLastError()));
+			BOX_ERROR("Failed to DisconnectNamedPipe: " <<
+				GetErrorMessage(err));
 		}
-	
-		if (!DisconnectNamedPipe(mSocketHandle))
-		{
-			DWORD err = GetLastError();
-			if (err != ERROR_PIPE_NOT_CONNECTED)
-			{
-				BOX_ERROR("Failed to DisconnectNamedPipe: " <<
-					GetErrorMessage(err));
-			}
-		}
-
-		mIsServer = false;
 	}
 
-	bool result = CloseHandle(mSocketHandle);
-
 	mSocketHandle = INVALID_HANDLE_VALUE;
 	mIsConnected = false;
 	mReadClosed  = true;
 	mWriteClosed = true;
 
-	if (!result) 
+	if (!CloseHandle(mSocketHandle))
 	{
-		BOX_ERROR("Failed to CloseHandle: " <<
-			GetErrorMessage(GetLastError()));
-		THROW_EXCEPTION(ServerException, SocketCloseError)
+		THROW_WIN_ERROR_NUMBER("Failed to CloseHandle",
+			GetLastError(), ServerException, SocketCloseError);
 	}
 }
 

Modified: box/trunk/lib/server/WinNamedPipeStream.h
===================================================================
--- box/trunk/lib/server/WinNamedPipeStream.h	2014-12-26 23:16:20 UTC (rev 3487)
+++ box/trunk/lib/server/WinNamedPipeStream.h	2014-12-26 23:16:24 UTC (rev 3488)
@@ -10,6 +10,8 @@
 #if ! defined WINNAMEDPIPESTREAM__H && defined WIN32
 #define WINNAMEDPIPESTREAM__H
 
+#include <list>
+
 #include "IOStream.h"
 
 // --------------------------------------------------------------------------
@@ -46,6 +48,8 @@
 protected:
 	void MarkAsReadClosed()  {mReadClosed  = true;}
 	void MarkAsWriteClosed() {mWriteClosed = true;}
+	bool WaitForOverlappedOperation(OVERLAPPED& Overlapped,
+		int Timeout, int64_t* pBytesTransferred);
 
 private:
 	WinNamedPipeStream(const WinNamedPipeStream &rToCopy) 
@@ -61,6 +65,36 @@
 	bool mIsServer;
 	bool mIsConnected;
 
+	class WriteInProgress {
+	private:
+		friend class WinNamedPipeStream;
+		std::string mBuffer;
+		OVERLAPPED mOverlap;
+		WriteInProgress(const WriteInProgress& other); // do not call
+	public:
+		WriteInProgress(const std::string& dataToWrite)
+		: mBuffer(dataToWrite)
+		{
+			// create the Writable event
+			HANDLE writable_event = CreateEvent(NULL, TRUE, FALSE,
+				NULL);
+			if (writable_event == INVALID_HANDLE_VALUE)
+			{
+				BOX_LOG_WIN_ERROR("Failed to create the "
+					"Writable event");
+				THROW_EXCEPTION(CommonException, Internal)
+			}
+
+			memset(&mOverlap, 0, sizeof(mOverlap));
+			mOverlap.hEvent = writable_event;
+		}
+		~WriteInProgress()
+		{
+			CloseHandle(mOverlap.hEvent);
+		}
+	};
+	std::list<WriteInProgress*> mWritesInProgress;
+
 public:
 	static std::string sPipeNamePrefix;
 };




More information about the Boxbackup-commit mailing list