VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/win/pipe-win.cpp@ 27388

Last change on this file since 27388 was 27388, checked in by vboxsync, 15 years ago

iprt: flush file & pipe.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 29.0 KB
Line 
1/* $Id: pipe-win.cpp 27388 2010-03-15 23:00:25Z vboxsync $ */
2/** @file
3 * IPRT - Anonymous Pipes, Windows Implementation.
4 */
5
6/*
7 * Copyright (C) 2010 Sun Microsystems, Inc.
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 *
26 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
27 * Clara, CA 95054 USA or visit http://www.sun.com if you need
28 * additional information or have any questions.
29 */
30
31
32/*******************************************************************************
33* Header Files *
34*******************************************************************************/
35#include <Windows.h>
36
37#include <iprt/pipe.h>
38#include "internal/iprt.h"
39
40#include <iprt/asm.h>
41#include <iprt/assert.h>
42#include <iprt/critsect.h>
43#include <iprt/err.h>
44#include <iprt/mem.h>
45#include <iprt/string.h>
46#include <iprt/process.h>
47#include <iprt/thread.h>
48#include <iprt/time.h>
49#include "internal/magics.h"
50
51
52/*******************************************************************************
53* Structures and Typedefs *
54*******************************************************************************/
55typedef struct RTPIPEINTERNAL
56{
57 /** Magic value (RTPIPE_MAGIC). */
58 uint32_t u32Magic;
59 /** The pipe handle. */
60 HANDLE hPipe;
61 /** Set if this is the read end, clear if it's the write end. */
62 bool fRead;
63 /** Set if there is already pending I/O. */
64 bool fIOPending;
65 /** The number of users of the current mode. */
66 uint32_t cModeUsers;
67 /** The overlapped I/O structure we use. */
68 OVERLAPPED Overlapped;
69 /** Bounce buffer for writes. */
70 uint8_t *pbBounceBuf;
71 /** Amount of used buffer space. */
72 size_t cbBounceBufUsed;
73 /** Amount of allocated buffer space. */
74 size_t cbBounceBufAlloc;
75 /** Critical section protecting the above members.
76 * (Taking the lazy/simple approach.) */
77 RTCRITSECT CritSect;
78} RTPIPEINTERNAL;
79
80
81RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
82{
83 AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
84 AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
85 AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
86
87 /*
88 * Create the read end of the pipe.
89 */
90 DWORD dwErr;
91 HANDLE hPipeR;
92 HANDLE hPipeW;
93 int rc;
94 for (;;)
95 {
96 static volatile uint32_t g_iNextPipe = 0;
97 char szName[128];
98 RTStrPrintf(szName, sizeof(szName), "\\\\.\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe));
99
100 SECURITY_ATTRIBUTES SecurityAttributes;
101 PSECURITY_ATTRIBUTES pSecurityAttributes = NULL;
102 if (fFlags & RTPIPE_C_INHERIT_READ)
103 {
104 SecurityAttributes.nLength = sizeof(SecurityAttributes);
105 SecurityAttributes.lpSecurityDescriptor = NULL;
106 SecurityAttributes.bInheritHandle = TRUE;
107 pSecurityAttributes = &SecurityAttributes;
108 }
109
110 DWORD dwOpenMode = PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED;
111#ifdef FILE_FLAG_FIRST_PIPE_INSTANCE
112 dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
113#endif
114
115 DWORD dwPipeMode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
116#ifdef PIPE_REJECT_REMOTE_CLIENTS
117 dwPipeMode |= PIPE_REJECT_REMOTE_CLIENTS;
118#endif
119
120 hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, _64K, _64K,
121 NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes);
122#ifdef PIPE_REJECT_REMOTE_CLIENTS
123 if (hPipeR == INVALID_HANDLE_VALUE && GetLastError() == ERROR_INVALID_PARAMETER)
124 {
125 dwPipeMode &= ~PIPE_REJECT_REMOTE_CLIENTS;
126 hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, _64K, _64K,
127 NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes);
128 }
129#endif
130#ifdef FILE_FLAG_FIRST_PIPE_INSTANCE
131 if (hPipeR == INVALID_HANDLE_VALUE && GetLastError() == ERROR_INVALID_PARAMETER)
132 {
133 dwOpenMode &= ~FILE_FLAG_FIRST_PIPE_INSTANCE;
134 hPipeR = CreateNamedPipeA(szName, dwOpenMode, dwPipeMode, 1 /*nMaxInstances*/, _64K, _64K,
135 NMPWAIT_USE_DEFAULT_WAIT, pSecurityAttributes);
136 }
137#endif
138 if (hPipeR != INVALID_HANDLE_VALUE)
139 {
140 /*
141 * Connect to the pipe (the write end).
142 * We add FILE_READ_ATTRIBUTES here to make sure we can query the
143 * pipe state later on.
144 */
145 pSecurityAttributes = NULL;
146 if (fFlags & RTPIPE_C_INHERIT_WRITE)
147 {
148 SecurityAttributes.nLength = sizeof(SecurityAttributes);
149 SecurityAttributes.lpSecurityDescriptor = NULL;
150 SecurityAttributes.bInheritHandle = TRUE;
151 pSecurityAttributes = &SecurityAttributes;
152 }
153
154 hPipeW = CreateFileA(szName,
155 GENERIC_WRITE | FILE_READ_ATTRIBUTES /*dwDesiredAccess*/,
156 0 /*dwShareMode*/,
157 pSecurityAttributes,
158 OPEN_EXISTING /* dwCreationDisposition */,
159 FILE_FLAG_OVERLAPPED /*dwFlagsAndAttributes*/,
160 NULL /*hTemplateFile*/);
161 if (hPipeW != INVALID_HANDLE_VALUE)
162 break;
163 dwErr = GetLastError();
164 CloseHandle(hPipeR);
165 }
166 else
167 dwErr = GetLastError();
168 if ( dwErr != ERROR_PIPE_BUSY /* already exist - compatible */
169 && dwErr != ERROR_ACCESS_DENIED /* already exist - incompatible */)
170 return RTErrConvertFromWin32(dwErr);
171 /* else: try again with a new name */
172 }
173
174 /*
175 * Create the two handles.
176 */
177 RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
178 if (pThisR)
179 {
180 RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
181 if (pThisW)
182 {
183 rc = RTCritSectInit(&pThisR->CritSect);
184 if (RT_SUCCESS(rc))
185 {
186 rc = RTCritSectInit(&pThisW->CritSect);
187 if (RT_SUCCESS(rc))
188 {
189 pThisR->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/,
190 TRUE /*fInitialState*/, NULL /*pName*/);
191 if (pThisR->Overlapped.hEvent != NULL)
192 {
193 pThisW->Overlapped.hEvent = CreateEvent(NULL, TRUE /*fManualReset*/,
194 TRUE /*fInitialState*/, NULL /*pName*/);
195 if (pThisW->Overlapped.hEvent != NULL)
196 {
197 pThisR->u32Magic = RTPIPE_MAGIC;
198 pThisW->u32Magic = RTPIPE_MAGIC;
199 pThisR->hPipe = hPipeR;
200 pThisW->hPipe = hPipeW;
201 pThisR->fRead = true;
202 pThisW->fRead = false;
203 pThisR->fIOPending = false;
204 pThisW->fIOPending = false;
205 //pThisR->cModeUsers = 0;
206 //pThisW->cModeUsers = 0;
207 //pThisR->pbBounceBuf = NULL;
208 //pThisW->pbBounceBuf = NULL;
209 //pThisR->cbBounceBufUsed = 0;
210 //pThisW->cbBounceBufUsed = 0;
211 //pThisR->cbBounceBufAlloc= 0;
212 //pThisW->cbBounceBufAlloc= 0;
213
214 *phPipeRead = pThisR;
215 *phPipeWrite = pThisW;
216 return VINF_SUCCESS;
217 }
218 CloseHandle(pThisR->Overlapped.hEvent);
219 }
220 RTCritSectDelete(&pThisW->CritSect);
221 }
222 RTCritSectDelete(&pThisR->CritSect);
223 }
224 RTMemFree(pThisW);
225 }
226 else
227 rc = VERR_NO_MEMORY;
228 RTMemFree(pThisR);
229 }
230 else
231 rc = VERR_NO_MEMORY;
232
233 CloseHandle(hPipeR);
234 CloseHandle(hPipeW);
235 return rc;
236}
237
238
239/**
240 * Common worker for handling I/O completion.
241 *
242 * This is used by RTPipeClose, RTPipeWrite and RTPipeWriteBlocking.
243 *
244 * @returns IPRT status code.
245 * @param pThis The pipe instance handle.
246 */
247static int rtPipeWriteCheckCompletion(RTPIPEINTERNAL *pThis)
248{
249 int rc;
250 DWORD dwRc = WaitForSingleObject(pThis->Overlapped.hEvent, 0);
251 if (dwRc == WAIT_OBJECT_0)
252 {
253 DWORD cbWritten = 0;
254 if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbWritten, TRUE))
255 {
256 for (;;)
257 {
258 if (cbWritten >= pThis->cbBounceBufUsed)
259 {
260 pThis->fIOPending = false;
261 rc = VINF_SUCCESS;
262 break;
263 }
264
265 /* resubmit the remainder of the buffer - can this actually happen? */
266 memmove(&pThis->pbBounceBuf[0], &pThis->pbBounceBuf[cbWritten], pThis->cbBounceBufUsed - cbWritten);
267 rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
268 if (!WriteFile(pThis->hPipe, pThis->pbBounceBuf, (DWORD)pThis->cbBounceBufUsed,
269 &cbWritten, &pThis->Overlapped))
270 {
271 if (GetLastError() == ERROR_IO_PENDING)
272 rc = VINF_TRY_AGAIN;
273 else
274 {
275 pThis->fIOPending = false;
276 if (GetLastError() == ERROR_NO_DATA)
277 rc = VERR_BROKEN_PIPE;
278 else
279 rc = RTErrConvertFromWin32(GetLastError());
280 }
281 break;
282 }
283 Assert(cbWritten > 0);
284 }
285 }
286 else
287 {
288 pThis->fIOPending = false;
289 rc = RTErrConvertFromWin32(GetLastError());
290 }
291 }
292 else if (dwRc == WAIT_TIMEOUT)
293 rc = VINF_TRY_AGAIN;
294 else
295 {
296 pThis->fIOPending = false;
297 if (dwRc == WAIT_ABANDONED)
298 rc = VERR_INVALID_HANDLE;
299 else
300 rc = RTErrConvertFromWin32(GetLastError());
301 }
302 return rc;
303}
304
305
306
307RTDECL(int) RTPipeClose(RTPIPE hPipe)
308{
309 RTPIPEINTERNAL *pThis = hPipe;
310 if (pThis == NIL_RTPIPE)
311 return VINF_SUCCESS;
312 AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
313 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
314
315 /*
316 * Do the cleanup.
317 */
318 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
319 RTCritSectEnter(&pThis->CritSect);
320 Assert(pThis->cModeUsers == 0);
321
322 if (!pThis->fRead && pThis->fIOPending)
323 rtPipeWriteCheckCompletion(pThis);
324
325 CloseHandle(pThis->hPipe);
326 pThis->hPipe = INVALID_HANDLE_VALUE;
327
328 CloseHandle(pThis->Overlapped.hEvent);
329 pThis->Overlapped.hEvent = NULL;
330
331 RTMemFree(pThis->pbBounceBuf);
332 pThis->pbBounceBuf = NULL;
333
334 RTCritSectLeave(&pThis->CritSect);
335 RTCritSectDelete(&pThis->CritSect);
336
337 RTMemFree(pThis);
338
339 return VINF_SUCCESS;
340}
341
342
343RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
344{
345 RTPIPEINTERNAL *pThis = hPipe;
346 AssertPtrReturn(pThis, (RTHCINTPTR)(unsigned int)-1);
347 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, (RTHCINTPTR)(unsigned int)-1);
348
349 return (RTHCINTPTR)pThis->hPipe;
350}
351
352
353RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
354{
355 RTPIPEINTERNAL *pThis = hPipe;
356 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
357 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
358 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
359 AssertPtr(pcbRead);
360 AssertPtr(pvBuf);
361
362 int rc = RTCritSectEnter(&pThis->CritSect);
363 if (RT_SUCCESS(rc))
364 {
365 /* No concurrent readers, sorry. */
366 if (pThis->cModeUsers == 0)
367 {
368 pThis->cModeUsers++;
369
370 /*
371 * Kick of a an overlapped read. It should return immedately if
372 * there is bytes in the buffer. If not, we'll cancel it and see
373 * what we get back.
374 */
375 rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
376 DWORD cbRead = 0;
377 if ( cbToRead == 0
378 || ReadFile(pThis->hPipe, pvBuf,
379 cbToRead <= ~(DWORD)0 ? (DWORD)cbToRead : ~(DWORD)0,
380 &cbRead, &pThis->Overlapped))
381 {
382 *pcbRead = cbRead;
383 rc = VINF_SUCCESS;
384 }
385 else if (GetLastError() == ERROR_IO_PENDING)
386 {
387 pThis->fIOPending = true;
388 RTCritSectLeave(&pThis->CritSect);
389
390 if (!CancelIo(pThis->hPipe))
391 WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
392 if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/))
393 {
394 *pcbRead = cbRead;
395 rc = VINF_SUCCESS;
396 }
397 else if (GetLastError() == ERROR_OPERATION_ABORTED)
398 {
399 *pcbRead = 0;
400 rc = VINF_TRY_AGAIN;
401 }
402 else
403 rc = RTErrConvertFromWin32(GetLastError());
404
405 RTCritSectEnter(&pThis->CritSect);
406 pThis->fIOPending = false;
407 }
408 else
409 rc = RTErrConvertFromWin32(GetLastError());
410
411 pThis->cModeUsers--;
412 }
413 else
414 rc = VERR_WRONG_ORDER;
415 RTCritSectLeave(&pThis->CritSect);
416 }
417 return rc;
418}
419
420
421RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
422{
423 RTPIPEINTERNAL *pThis = hPipe;
424 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
425 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
426 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
427 AssertPtr(pvBuf);
428
429 int rc = RTCritSectEnter(&pThis->CritSect);
430 if (RT_SUCCESS(rc))
431 {
432 /* No concurrent readers, sorry. */
433 if (pThis->cModeUsers == 0)
434 {
435 pThis->cModeUsers++;
436
437 size_t cbTotalRead = 0;
438 while (cbToRead > 0)
439 {
440 /*
441 * Kick of a an overlapped read. It should return immedately if
442 * there is bytes in the buffer. If not, we'll cancel it and see
443 * what we get back.
444 */
445 rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
446 DWORD cbRead = 0;
447 pThis->fIOPending = true;
448 RTCritSectLeave(&pThis->CritSect);
449
450 if (ReadFile(pThis->hPipe, pvBuf,
451 cbToRead <= ~(DWORD)0 ? (DWORD)cbToRead : ~(DWORD)0,
452 &cbRead, &pThis->Overlapped))
453 rc = VINF_SUCCESS;
454 else if (GetLastError() == ERROR_IO_PENDING)
455 {
456 WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
457 if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/))
458 rc = VINF_SUCCESS;
459 else
460 rc = RTErrConvertFromWin32(GetLastError());
461 }
462 else
463 rc = RTErrConvertFromWin32(GetLastError());
464
465 RTCritSectEnter(&pThis->CritSect);
466 pThis->fIOPending = false;
467 if (RT_FAILURE(rc))
468 break;
469
470 /* advance */
471 cbToRead -= cbRead;
472 cbTotalRead += cbRead;
473 pvBuf = (uint8_t *)pvBuf + cbRead;
474 }
475
476 if (pcbRead)
477 {
478 *pcbRead = cbTotalRead;
479 if ( RT_FAILURE(rc)
480 && cbTotalRead
481 && rc != VERR_INVALID_POINTER)
482 rc = VINF_SUCCESS;
483 }
484
485 pThis->cModeUsers--;
486 }
487 else
488 rc = VERR_WRONG_ORDER;
489 RTCritSectLeave(&pThis->CritSect);
490 }
491 return rc;
492}
493
494
495RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
496{
497 RTPIPEINTERNAL *pThis = hPipe;
498 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
499 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
500 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
501 AssertPtr(pcbWritten);
502 AssertPtr(pvBuf);
503
504 int rc = RTCritSectEnter(&pThis->CritSect);
505 if (RT_SUCCESS(rc))
506 {
507 /* No concurrent readers, sorry. */
508 if (pThis->cModeUsers == 0)
509 {
510 pThis->cModeUsers++;
511
512 /* If I/O is pending, check if it has completed. */
513 if (pThis->fIOPending)
514 rc = rtPipeWriteCheckCompletion(pThis);
515 else
516 rc = VINF_SUCCESS;
517 if (rc == VINF_SUCCESS)
518 {
519 Assert(!pThis->fIOPending);
520
521 /* Do the bounce buffering. */
522 if ( pThis->cbBounceBufAlloc < cbToWrite
523 && pThis->cbBounceBufAlloc < _64K)
524 {
525 if (cbToWrite > _64K)
526 cbToWrite = _64K;
527 void *pv = RTMemRealloc(pThis->pbBounceBuf, RT_ALIGN_Z(cbToWrite, _1K));
528 if (pv)
529 {
530 pThis->pbBounceBuf = (uint8_t *)pv;
531 pThis->cbBounceBufAlloc = RT_ALIGN_Z(cbToWrite, _1K);
532 }
533 else
534 rc = VERR_NO_MEMORY;
535 }
536 else if (cbToWrite > _64K)
537 cbToWrite = _64K;
538 if (RT_SUCCESS(rc) && cbToWrite)
539 {
540 memcpy(pThis->pbBounceBuf, pvBuf, cbToWrite);
541 pThis->cbBounceBufUsed = (uint32_t)cbToWrite;
542
543 /* Submit the write. */
544 rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
545 DWORD cbWritten = 0;
546 if (WriteFile(pThis->hPipe, pThis->pbBounceBuf, (DWORD)pThis->cbBounceBufUsed,
547 &cbWritten, &pThis->Overlapped))
548 {
549 *pcbWritten = cbWritten;
550 rc = VINF_SUCCESS;
551 }
552 else if (GetLastError() == ERROR_IO_PENDING)
553 {
554 *pcbWritten = cbWritten;
555 pThis->fIOPending = true;
556 rc = VINF_SUCCESS;
557 }
558 else if (GetLastError() == ERROR_NO_DATA)
559 rc = VERR_BROKEN_PIPE;
560 else
561 rc = RTErrConvertFromWin32(GetLastError());
562 }
563 else if (RT_SUCCESS(rc))
564 *pcbWritten = 0;
565 }
566 else if (RT_SUCCESS(rc))
567 *pcbWritten = 0;
568
569 pThis->cModeUsers--;
570 }
571 else
572 rc = VERR_WRONG_ORDER;
573 RTCritSectLeave(&pThis->CritSect);
574 }
575 return rc;
576}
577
578
579RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
580{
581 RTPIPEINTERNAL *pThis = hPipe;
582 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
583 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
584 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
585 AssertPtr(pvBuf);
586 AssertPtrNull(pcbWritten);
587
588 int rc = RTCritSectEnter(&pThis->CritSect);
589 if (RT_SUCCESS(rc))
590 {
591 /* No concurrent readers, sorry. */
592 if (pThis->cModeUsers == 0)
593 {
594 pThis->cModeUsers++;
595
596 /*
597 * If I/O is pending, wait for it to complete.
598 */
599 if (pThis->fIOPending)
600 {
601 rc = rtPipeWriteCheckCompletion(pThis);
602 while (rc == VINF_TRY_AGAIN)
603 {
604 Assert(pThis->fIOPending);
605 HANDLE hEvent = pThis->Overlapped.hEvent;
606 RTCritSectLeave(&pThis->CritSect);
607 WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
608 RTCritSectEnter(&pThis->CritSect);
609 }
610 }
611 if (RT_SUCCESS(rc))
612 {
613 Assert(!pThis->fIOPending);
614
615 /*
616 * Try write everything.
617 * No bounce buffering, cModeUsers protects us.
618 */
619 size_t cbTotalWritten = 0;
620 while (cbToWrite > 0)
621 {
622 rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
623 pThis->fIOPending = true;
624 RTCritSectLeave(&pThis->CritSect);
625
626 DWORD cbWritten = 0;
627 if (WriteFile(pThis->hPipe, pvBuf,
628 cbToWrite <= ~(DWORD)0 ? (DWORD)cbToWrite : ~(DWORD)0,
629 &cbWritten, &pThis->Overlapped))
630 rc = VINF_SUCCESS;
631 else if (GetLastError() == ERROR_IO_PENDING)
632 {
633 WaitForSingleObject(pThis->Overlapped.hEvent, INFINITE);
634 if (GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbWritten, TRUE /*fWait*/))
635 rc = VINF_SUCCESS;
636 else
637 rc = RTErrConvertFromWin32(GetLastError());
638 }
639 else if (GetLastError() == ERROR_NO_DATA)
640 rc = VERR_BROKEN_PIPE;
641 else
642 rc = RTErrConvertFromWin32(GetLastError());
643
644 RTCritSectEnter(&pThis->CritSect);
645 pThis->fIOPending = false;
646 if (RT_FAILURE(rc))
647 break;
648
649 /* advance */
650 pvBuf = (char const *)pvBuf + cbWritten;
651 cbTotalWritten += cbWritten;
652 cbToWrite -= cbWritten;
653 }
654
655 if (pcbWritten)
656 {
657 *pcbWritten = cbTotalWritten;
658 if ( RT_FAILURE(rc)
659 && cbTotalWritten
660 && rc != VERR_INVALID_POINTER)
661 rc = VINF_SUCCESS;
662 }
663 }
664
665 pThis->cModeUsers--;
666 }
667 else
668 rc = VERR_WRONG_ORDER;
669 RTCritSectLeave(&pThis->CritSect);
670 }
671 return rc;
672
673#if 1
674 return VERR_NOT_IMPLEMENTED;
675#else
676 int rc = rtPipeTryBlocking(pThis);
677 if (RT_SUCCESS(rc))
678 {
679 size_t cbTotalWritten = 0;
680 while (cbToWrite > 0)
681 {
682 ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX));
683 if (cbWritten < 0)
684 {
685 rc = RTErrConvertFromErrno(errno);
686 break;
687 }
688
689 /* advance */
690 pvBuf = (char const *)pvBuf + cbWritten;
691 cbTotalWritten += cbWritten;
692 cbToWrite -= cbWritten;
693 }
694
695 if (pcbWritten)
696 {
697 *pcbWritten = cbTotalWritten;
698 if ( RT_FAILURE(rc)
699 && cbTotalWritten
700 && rc != VERR_INVALID_POINTER)
701 rc = VINF_SUCCESS;
702 }
703
704 ASMAtomicDecU32(&pThis->u32State);
705 }
706 return rc;
707#endif
708}
709
710
711RTDECL(int) RTPipeFlush(RTPIPE hPipe)
712{
713 RTPIPEINTERNAL *pThis = hPipe;
714 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
715 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
716 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
717
718 if (!FlushFileBuffers(pThis->hPipe))
719 return RTErrConvertFromWin32(GetLastError());
720 return VINF_SUCCESS;
721}
722
723
724RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
725{
726 RTPIPEINTERNAL *pThis = hPipe;
727 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
728 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
729
730 uint64_t const StartMsTS = RTTimeMilliTS();
731
732 int rc = RTCritSectEnter(&pThis->CritSect);
733 if (RT_FAILURE(rc))
734 return rc;
735 for (unsigned iLoop = 0;; iLoop++)
736 {
737 uint8_t abBuf[4];
738 bool fPendingRead = false;
739 HANDLE hWait = INVALID_HANDLE_VALUE;
740 if (pThis->fRead)
741 {
742 if (pThis->fIOPending)
743 hWait = pThis->Overlapped.hEvent;
744 else
745 {
746 /* Peek at the pipe buffer and see how many bytes it contains. */
747 DWORD cbAvailable;
748 if ( PeekNamedPipe(pThis->hPipe, NULL, 0, NULL, &cbAvailable, NULL)
749 && cbAvailable > 0)
750 {
751 rc = VINF_SUCCESS;
752 break;
753 }
754
755 /* Start a zero byte read operation that we can wait on. */
756 if (cMillies == 0)
757 {
758 rc = VERR_TIMEOUT;
759 break;
760 }
761 AssertBreakStmt(pThis->cModeUsers == 0, rc = VERR_INTERNAL_ERROR_5);
762 rc = ResetEvent(pThis->Overlapped.hEvent); Assert(rc == TRUE);
763 DWORD cbRead = 0;
764 if (ReadFile(pThis->hPipe, &abBuf[0], 0, &cbRead, &pThis->Overlapped))
765 {
766 rc = VINF_SUCCESS;
767 if (iLoop > 10)
768 RTThreadYield();
769 }
770 else if (GetLastError() == ERROR_IO_PENDING)
771 {
772 pThis->cModeUsers++;
773 pThis->fIOPending = true;
774 fPendingRead = true;
775 hWait = pThis->Overlapped.hEvent;
776 }
777 else
778 rc = RTErrConvertFromWin32(GetLastError());
779 }
780 }
781 else
782 {
783 if (pThis->fIOPending)
784 hWait = pThis->Overlapped.hEvent;
785 else
786 {
787 /* If nothing pending, the next write will succeed because
788 we buffer it and pretend that it does... */
789 rc = VINF_SUCCESS;
790 break;
791 }
792 }
793 if (RT_FAILURE(rc))
794 break;
795
796 /*
797 * Check for timeout.
798 */
799 DWORD cMsMaxWait = INFINITE;
800 if ( cMillies != RT_INDEFINITE_WAIT
801 && ( hWait != INVALID_HANDLE_VALUE
802 || iLoop > 10)
803 )
804 {
805 uint64_t cElapsed = RTTimeMilliTS() - StartMsTS;
806 if (cElapsed >= cMillies)
807 {
808 rc = VERR_TIMEOUT;
809 break;
810 }
811 cMsMaxWait = cMillies - (uint32_t)cElapsed;
812 }
813
814 /*
815 * Wait.
816 */
817 if (hWait != INVALID_HANDLE_VALUE)
818 {
819 RTCritSectLeave(&pThis->CritSect);
820
821 DWORD dwRc = WaitForSingleObject(hWait, cMsMaxWait);
822 if (dwRc == WAIT_OBJECT_0)
823 rc = VINF_SUCCESS;
824 else if (dwRc == WAIT_TIMEOUT)
825 rc = VERR_TIMEOUT;
826 else if (dwRc == WAIT_ABANDONED)
827 rc = VERR_INVALID_HANDLE;
828 else
829 rc = RTErrConvertFromWin32(GetLastError());
830 if ( RT_FAILURE(rc)
831 && pThis->u32Magic != RTPIPE_MAGIC)
832 return rc;
833
834 RTCritSectEnter(&pThis->CritSect);
835 if (fPendingRead)
836 {
837 pThis->cModeUsers--;
838 pThis->fIOPending = false;
839 if (rc != VINF_SUCCESS)
840 CancelIo(pThis->hPipe);
841 DWORD cbRead = 0;
842 GetOverlappedResult(pThis->hPipe, &pThis->Overlapped, &cbRead, TRUE /*fWait*/);
843 }
844 if (RT_FAILURE(rc))
845 break;
846 }
847 }
848
849 RTCritSectLeave(&pThis->CritSect);
850 return rc;
851}
852
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette