VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/posix/pipe-posix.cpp@ 26774

Last change on this file since 26774 was 26774, checked in by vboxsync, 15 years ago

RTPipe: Implemented the posix variant.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 14.1 KB
Line 
1/* $Id: pipe-posix.cpp 26774 2010-02-25 02:30:14Z vboxsync $ */
2/** @file
3 * IPRT - Anonymouse Pipes, POSIX Implementation.
4 */
5
6/*
7 * Copyright (C) 2010 Sun Microsystems, Inc.
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 *
26 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa
27 * Clara, CA 95054 USA or visit http://www.sun.com if you need
28 * additional information or have any questions.
29 */
30
31
32/*******************************************************************************
33* Header Files *
34*******************************************************************************/
35#include <iprt/pipe.h>
36#include "internal/iprt.h"
37
38#include <iprt/asm.h>
39#include <iprt/assert.h>
40#include <iprt/err.h>
41#include <iprt/mem.h>
42#include <iprt/string.h>
43#include <iprt/thread.h>
44#include "internal/magics.h"
45
46#include <errno.h>
47#include <fcntl.h>
48#include <limits.h>
49#include <unistd.h>
50#include <sys/poll.h>
51#include <signal.h>
52
53
54/*******************************************************************************
55* Structures and Typedefs *
56*******************************************************************************/
57typedef struct RTPIPEINTERNAL
58{
59 /** Magic value (RTPIPE_MAGIC). */
60 uint32_t u32Magic;
61 /** The file descriptor. */
62 int fd;
63 /** Set if this is the read end, clear if it's the write end. */
64 bool fRead;
65 /** Atomically operated state variable.
66 *
67 * - Bits 0 thru 29 - Users of the new mode.
68 * - Bit 30 - The pipe mode, set indicates blocking.
69 * - Bit 31 - Set when we're switching the mode.
70 */
71 uint32_t volatile u32State;
72} RTPIPEINTERNAL;
73
74
75/*******************************************************************************
76* Defined Constants And Macros *
77*******************************************************************************/
78/** @name RTPIPEINTERNAL::u32State defines
79 * @{ */
80#define RTPIPE_POSIX_BLOCKING UINT32_C(0x40000000)
81#define RTPIPE_POSIX_SWITCHING UINT32_C(0x80000000)
82#define RTPIPE_POSIX_SWITCHING_BIT 31
83#define RTPIPE_POSIX_USERS_MASK UINT32_C(0x3fffffff)
84/** @} */
85
86
87RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
88{
89 AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
90 AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
91 AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
92
93 /*
94 * Create the pipe and set the close-on-exec flag if requested.
95 */
96 int aFds[2] = {-1, -1};
97 if (pipe(aFds))
98 return RTErrConvertFromErrno(errno);
99
100 int rc = VINF_SUCCESS;
101 if (!(fFlags & RTPIPE_C_INHERIT_READ))
102 {
103 if (fcntl(aFds[0], F_SETFD, FD_CLOEXEC))
104 rc = RTErrConvertFromErrno(errno);
105 }
106
107 if (!(fFlags & RTPIPE_C_INHERIT_WRITE))
108 {
109 if (fcntl(aFds[1], F_SETFD, FD_CLOEXEC))
110 rc = RTErrConvertFromErrno(errno);
111 }
112
113 if (RT_SUCCESS(rc))
114 {
115 /*
116 * Create the two handles.
117 */
118 RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAlloc(sizeof(RTPIPEINTERNAL));
119 if (pThisR)
120 {
121 RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAlloc(sizeof(RTPIPEINTERNAL));
122 if (pThisW)
123 {
124 pThisR->u32Magic = RTPIPE_MAGIC;
125 pThisW->u32Magic = RTPIPE_MAGIC;
126 pThisR->fd = aFds[0];
127 pThisW->fd = aFds[1];
128 pThisR->fRead = true;
129 pThisW->fRead = false;
130 pThisR->u32State = RTPIPE_POSIX_BLOCKING;
131 pThisW->u32State = RTPIPE_POSIX_BLOCKING;
132
133 *phPipeRead = pThisR;
134 *phPipeWrite = pThisW;
135
136 /*
137 * Before we leave, make sure to shut up SIGPIPE.
138 */
139 signal(SIGPIPE, SIG_IGN);
140 return VINF_SUCCESS;
141 }
142
143 RTMemFree(pThisR);
144 }
145 }
146
147 close(aFds[0]);
148 close(aFds[1]);
149 return rc;
150}
151
152
153RTDECL(int) RTPipeClose(RTPIPE hPipe)
154{
155 RTPIPEINTERNAL *pThis = hPipe;
156 if (pThis == NIL_RTPIPE)
157 return VINF_SUCCESS;
158 AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
159 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
160
161 /*
162 * Do the cleanup.
163 */
164 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
165
166 int fd = pThis->fd;
167 pThis->fd = -1;
168 close(fd);
169
170 if (ASMAtomicReadU32(&pThis->u32State) & RTPIPE_POSIX_USERS_MASK)
171 {
172 AssertFailed();
173 RTThreadSleep(1);
174 }
175
176 RTMemFree(pThis);
177
178 return VINF_SUCCESS;
179}
180
181
182RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
183{
184 RTPIPEINTERNAL *pThis = hPipe;
185 AssertPtrReturn(pThis, (RTHCINTPTR)(unsigned int)-1);
186 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, (RTHCINTPTR)(unsigned int)-1);
187
188 return pThis->fd;
189}
190
191
192/**
193 * Prepare blocking mode.
194 *
195 * @returns VINF_SUCCESS
196 * @retval VERR_WRONG_ORDER
197 * @retval VERR_INTERNAL_ERROR_4
198 *
199 * @param pThis The pipe handle.
200 */
201static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis)
202{
203 /*
204 * Update the state.
205 */
206 for (;;)
207 {
208 uint32_t u32State = ASMAtomicReadU32(&pThis->u32State);
209 uint32_t const u32StateOld = u32State;
210 uint32_t const cUsers = (u32State & RTPIPE_POSIX_USERS_MASK);
211
212 if (u32State & RTPIPE_POSIX_BLOCKING)
213 {
214 AssertReturn(cUsers < RTPIPE_POSIX_USERS_MASK / 2, VERR_INTERNAL_ERROR_4);
215 u32State &= ~RTPIPE_POSIX_USERS_MASK;
216 u32State |= cUsers + 1;
217 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
218 {
219 if (u32State & RTPIPE_POSIX_SWITCHING)
220 break;
221 return VINF_SUCCESS;
222 }
223 }
224 else if (cUsers == 0)
225 {
226 u32State = 1 | RTPIPE_POSIX_SWITCHING | RTPIPE_POSIX_BLOCKING;
227 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
228 break;
229 }
230 else
231 return VERR_WRONG_ORDER;
232 ASMNopPause();
233 }
234
235 /*
236 * Do the switching.
237 */
238 int fFlags = fcntl(pThis->fd, F_GETFL, 0);
239 if (fFlags != -1)
240 {
241 if ( !(fFlags & O_NONBLOCK)
242 || fcntl(pThis->fd, F_SETFL, fFlags & ~O_NONBLOCK) != -1)
243 {
244 ASMAtomicBitClear(&pThis->u32State, RTPIPE_POSIX_SWITCHING_BIT);
245 return VINF_SUCCESS;
246 }
247 }
248
249 ASMAtomicDecU32(&pThis->u32State);
250 return RTErrConvertFromErrno(errno);
251}
252
253
254/**
255 * Prepare non-blocking mode.
256 *
257 * @returns VINF_SUCCESS
258 * @retval VERR_WRONG_ORDER
259 * @retval VERR_INTERNAL_ERROR_4
260 *
261 * @param pThis The pipe handle.
262 */
263static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis)
264{
265 /*
266 * Update the state.
267 */
268 for (;;)
269 {
270 uint32_t u32State = ASMAtomicReadU32(&pThis->u32State);
271 uint32_t const u32StateOld = u32State;
272 uint32_t const cUsers = (u32State & RTPIPE_POSIX_USERS_MASK);
273
274 if (!(u32State & RTPIPE_POSIX_BLOCKING))
275 {
276 AssertReturn(cUsers < RTPIPE_POSIX_USERS_MASK / 2, VERR_INTERNAL_ERROR_4);
277 u32State &= ~RTPIPE_POSIX_USERS_MASK;
278 u32State |= cUsers + 1;
279 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
280 {
281 if (u32State & RTPIPE_POSIX_SWITCHING)
282 break;
283 return VINF_SUCCESS;
284 }
285 }
286 else if (cUsers == 0)
287 {
288 u32State = 1 | RTPIPE_POSIX_SWITCHING;
289 if (ASMAtomicCmpXchgU32(&pThis->u32State, u32State, u32StateOld))
290 break;
291 }
292 else
293 return VERR_WRONG_ORDER;
294 ASMNopPause();
295 }
296
297 /*
298 * Do the switching.
299 */
300 int fFlags = fcntl(pThis->fd, F_GETFL, 0);
301 if (fFlags != -1)
302 {
303 if ( (fFlags & O_NONBLOCK)
304 || fcntl(pThis->fd, F_SETFL, fFlags | O_NONBLOCK) != -1)
305 {
306 ASMAtomicBitClear(&pThis->u32State, RTPIPE_POSIX_SWITCHING_BIT);
307 return VINF_SUCCESS;
308 }
309 }
310
311 ASMAtomicDecU32(&pThis->u32State);
312 return RTErrConvertFromErrno(errno);
313}
314
315
316/**
317 * Checks if the read pipe has a HUP condition.
318 *
319 * @returns true if HUP, false if no.
320 * @param pThis The pipe handle (read).
321 */
322static bool rtPipePosixHasHup(RTPIPEINTERNAL *pThis)
323{
324 Assert(pThis->fRead);
325
326 struct pollfd PollFd;
327 RT_ZERO(PollFd);
328 PollFd.fd = pThis->fd;
329 PollFd.events = POLLHUP;
330 return poll(&PollFd, 1, 0) >= 1
331 && (PollFd.revents & POLLHUP);
332}
333
334
335RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
336{
337 RTPIPEINTERNAL *pThis = hPipe;
338 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
339 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
340 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
341 AssertPtr(pcbRead);
342 AssertPtr(pvBuf);
343
344 int rc = rtPipeTryNonBlocking(pThis);
345 if (RT_SUCCESS(rc))
346 {
347 ssize_t cbRead = read(pThis->fd, pvBuf, RT_MIN(cbToRead, SSIZE_MAX));
348 if (cbRead >= 0)
349 {
350 if (cbRead || !cbToRead || !rtPipePosixHasHup(pThis))
351 *pcbRead = cbRead;
352 else
353 rc = VERR_BROKEN_PIPE;
354 }
355 else if (errno == EAGAIN)
356 {
357 *pcbRead = 0;
358 rc = VINF_TRY_AGAIN;
359 }
360 else
361 rc = RTErrConvertFromErrno(errno);
362
363 ASMAtomicDecU32(&pThis->u32State);
364 }
365 return rc;
366}
367
368
369RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead)
370{
371 RTPIPEINTERNAL *pThis = hPipe;
372 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
373 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
374 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
375 AssertPtr(pvBuf);
376
377 int rc = rtPipeTryBlocking(pThis);
378 if (RT_SUCCESS(rc))
379 {
380 do
381 {
382 ssize_t cbRead = read(pThis->fd, pvBuf, RT_MIN(cbToRead, SSIZE_MAX));
383 if (cbRead < 0)
384 {
385 rc = RTErrConvertFromErrno(errno);
386 break;
387 }
388 if (!cbRead && cbToRead > 0 && rtPipePosixHasHup(pThis))
389 {
390 rc = VERR_BROKEN_PIPE;
391 break;
392 }
393
394 /* advance */
395 pvBuf = (char *)pvBuf + cbRead;
396 cbToRead -= cbRead;
397 } while (cbToRead > 0);
398
399 ASMAtomicDecU32(&pThis->u32State);
400 }
401 return rc;
402}
403
404
405RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
406{
407 RTPIPEINTERNAL *pThis = hPipe;
408 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
409 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
410 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
411 AssertPtr(pcbWritten);
412 AssertPtr(pvBuf);
413
414 int rc = rtPipeTryNonBlocking(pThis);
415 if (RT_SUCCESS(rc))
416 {
417 ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX));
418 if (cbWritten >= 0)
419 *pcbWritten = cbWritten;
420 else if (errno == EAGAIN)
421 {
422 *pcbWritten = 0;;
423 rc = VINF_TRY_AGAIN;
424 }
425 else
426 rc = RTErrConvertFromErrno(errno);
427
428 ASMAtomicDecU32(&pThis->u32State);
429 }
430 return rc;
431}
432
433
434RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite)
435{
436 RTPIPEINTERNAL *pThis = hPipe;
437 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
438 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
439 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
440 AssertPtr(pvBuf);
441
442 int rc = rtPipeTryBlocking(pThis);
443 if (RT_SUCCESS(rc))
444 {
445 do
446 {
447 ssize_t cbWritten = write(pThis->fd, pvBuf, RT_MIN(cbToWrite, SSIZE_MAX));
448 if (cbWritten < 0)
449 {
450 rc = RTErrConvertFromErrno(errno);
451 break;
452 }
453
454 /* advance */
455 pvBuf = (char const *)pvBuf + cbWritten;
456 cbToWrite -= cbWritten;
457 } while (cbToWrite > 0);
458
459 ASMAtomicDecU32(&pThis->u32State);
460 }
461 return rc;
462}
463
464
465RTDECL(int) RTPipeFlush(RTPIPE hPipe)
466{
467 RTPIPEINTERNAL *pThis = hPipe;
468 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
469 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
470 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
471
472 if (fsync(pThis->fd))
473 return RTErrConvertFromErrno(errno);
474 return VINF_SUCCESS;
475}
476
477
478RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
479{
480 RTPIPEINTERNAL *pThis = hPipe;
481 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
482 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
483
484 struct pollfd PollFd;
485 RT_ZERO(PollFd);
486 PollFd.fd = pThis->fd;
487 PollFd.events = POLLHUP | POLLERR;
488 if (pThis->fRead)
489 PollFd.events |= POLLIN | POLLPRI;
490 else
491 PollFd.events |= POLLOUT;
492
493 int timeout;
494 if ( cMillies == RT_INDEFINITE_WAIT
495 || cMillies >= INT_MAX /* lazy bird */)
496 timeout = -1;
497 else
498 timeout = cMillies;
499
500 int rc = poll(&PollFd, 1, 0);
501 if (rc == -1)
502 return RTErrConvertFromErrno(errno);
503 return rc > 0 ? VINF_SUCCESS : VERR_TIMEOUT;
504}
505
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette