VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/posix/localipc-posix.cpp@ 58295

Last change on this file since 58295 was 58295, checked in by vboxsync, 10 years ago

iprt: Added RTLocalIpcSessionReadNB on posix systems.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 31.3 KB
Line 
1/* $Id: localipc-posix.cpp 58295 2015-10-18 13:28:34Z vboxsync $ */
2/** @file
3 * IPRT - Local IPC Server & Client, Posix.
4 */
5
6/*
7 * Copyright (C) 2006-2013 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#define LOG_GROUP RTLOGGROUP_LOCALIPC
32#include "internal/iprt.h"
33#include <iprt/localipc.h>
34
35#include <iprt/asm.h>
36#include <iprt/assert.h>
37#include <iprt/ctype.h>
38#include <iprt/critsect.h>
39#include <iprt/mem.h>
40#include <iprt/log.h>
41#include <iprt/poll.h>
42#include <iprt/socket.h>
43#include <iprt/string.h>
44#include <iprt/time.h>
45
46#include <sys/types.h>
47#include <sys/socket.h>
48#include <sys/un.h>
49#ifndef RT_OS_OS2
50# include <sys/poll.h>
51# include <errno.h>
52#endif
53#include <fcntl.h>
54#include <unistd.h>
55
56#include "internal/magics.h"
57#include "internal/socket.h"
58
59
60/*******************************************************************************
61* Structures and Typedefs *
62*******************************************************************************/
63/**
64 * Local IPC service instance, POSIX.
65 */
66typedef struct RTLOCALIPCSERVERINT
67{
68 /** The magic (RTLOCALIPCSERVER_MAGIC). */
69 uint32_t u32Magic;
70 /** The creation flags. */
71 uint32_t fFlags;
72 /** Critical section protecting the structure. */
73 RTCRITSECT CritSect;
74 /** The number of references to the instance. */
75 uint32_t volatile cRefs;
76 /** Indicates that there is a pending cancel request. */
77 bool volatile fCancelled;
78 /** The server socket. */
79 RTSOCKET hSocket;
80 /** Thread currently listening for clients. */
81 RTTHREAD hListenThread;
82 /** The name we bound the server to (native charset encoding). */
83 struct sockaddr_un Name;
84} RTLOCALIPCSERVERINT;
85/** Pointer to a local IPC server instance (POSIX). */
86typedef RTLOCALIPCSERVERINT *PRTLOCALIPCSERVERINT;
87
88
89/**
90 * Local IPC session instance, POSIX.
91 */
92typedef struct RTLOCALIPCSESSIONINT
93{
94 /** The magic (RTLOCALIPCSESSION_MAGIC). */
95 uint32_t u32Magic;
96 /** Critical section protecting the structure. */
97 RTCRITSECT CritSect;
98 /** The number of references to the instance. */
99 uint32_t volatile cRefs;
100 /** Indicates that there is a pending cancel request. */
101 bool volatile fCancelled;
102 /** Set if this is the server side, clear if the client. */
103 bool fServerSide;
104 /** The client socket. */
105 RTSOCKET hSocket;
106 /** Thread currently doing read related activites. */
107 RTTHREAD hWriteThread;
108 /** Thread currently doing write related activies. */
109 RTTHREAD hReadThread;
110} RTLOCALIPCSESSIONINT;
111/** Pointer to a local IPC session instance (Windows). */
112typedef RTLOCALIPCSESSIONINT *PRTLOCALIPCSESSIONINT;
113
114
115/** Local IPC name prefix. */
116#define RTLOCALIPC_POSIX_NAME_PREFIX "/tmp/.iprt-localipc-"
117
118
119/**
120 * Validates the user specified name.
121 *
122 * @returns IPRT status code.
123 * @param pszName The name to validate.
124 * @param pcchName Where to return the length.
125 */
126static int rtLocalIpcPosixValidateName(const char *pszName, size_t *pcchName)
127{
128 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
129
130 uint32_t cchName = 0;
131 for (;;)
132 {
133 char ch = pszName[cchName];
134 if (!ch)
135 break;
136 AssertReturn(!RT_C_IS_CNTRL(ch), VERR_INVALID_NAME);
137 AssertReturn((unsigned)ch < 0x80, VERR_INVALID_NAME);
138 AssertReturn(ch != '\\', VERR_INVALID_NAME);
139 AssertReturn(ch != '/', VERR_INVALID_NAME);
140 cchName++;
141 }
142
143 *pcchName = cchName;
144 AssertReturn(sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) + cchName <= RT_SIZEOFMEMB(struct sockaddr_un, sun_path),
145 VERR_FILENAME_TOO_LONG);
146 AssertReturn(cchName, VERR_INVALID_NAME);
147
148 return VINF_SUCCESS;
149}
150
151
152/**
153 * Constructs a local (unix) domain socket name.
154 *
155 * @returns IPRT status code.
156 * @param pAddr The address structure to construct the name in.
157 * @param pcbAddr Where to return the address size.
158 * @param pszName The user specified name (valid).
159 * @param cchName The user specified name length.
160 */
161static int rtLocalIpcPosixConstructName(struct sockaddr_un *pAddr, uint8_t *pcbAddr, const char *pszName, size_t cchName)
162{
163 AssertMsgReturn(cchName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) <= sizeof(pAddr->sun_path),
164 ("cchName=%zu sizeof(sun_path)=%zu\n", cchName, sizeof(pAddr->sun_path)),
165 VERR_FILENAME_TOO_LONG);
166
167/** @todo Bother converting to local codeset/encoding?? */
168
169 RT_ZERO(*pAddr);
170#ifdef RT_OS_OS2 /* Size must be exactly right on OS/2. */
171 *pcbAddr = sizeof(*pAddr);
172#else
173 *pcbAddr = RT_OFFSETOF(struct sockaddr_un, sun_path) + (uint8_t)cchName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX);
174#endif
175#ifdef HAVE_SUN_LEN_MEMBER
176 pAddr->sun_len = *pcbAddr;
177#endif
178 pAddr->sun_family = AF_LOCAL;
179 memcpy(pAddr->sun_path, RTLOCALIPC_POSIX_NAME_PREFIX, sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1);
180 memcpy(&pAddr->sun_path[sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1], pszName, cchName + 1);
181
182 return VINF_SUCCESS;
183}
184
185
186
187RTDECL(int) RTLocalIpcServerCreate(PRTLOCALIPCSERVER phServer, const char *pszName, uint32_t fFlags)
188{
189 /*
190 * Parameter validation.
191 */
192 AssertPtrReturn(phServer, VERR_INVALID_POINTER);
193 *phServer = NIL_RTLOCALIPCSERVER;
194
195 AssertReturn(!(fFlags & ~RTLOCALIPC_FLAGS_VALID_MASK), VERR_INVALID_FLAGS);
196
197 size_t cchName;
198 int rc = rtLocalIpcPosixValidateName(pszName, &cchName);
199 if (RT_SUCCESS(rc))
200 {
201 /*
202 * Allocate memory for the instance and initialize it.
203 */
204 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)RTMemAllocZ(sizeof(*pThis));
205 if (pThis)
206 {
207 pThis->u32Magic = RTLOCALIPCSERVER_MAGIC;
208 pThis->fFlags = fFlags;
209 pThis->cRefs = 1;
210 pThis->fCancelled = false;
211 pThis->hListenThread = NIL_RTTHREAD;
212 rc = RTCritSectInit(&pThis->CritSect);
213 if (RT_SUCCESS(rc))
214 {
215 /*
216 * Create the local (unix) socket and bind to it.
217 */
218 rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/);
219 if (RT_SUCCESS(rc))
220 {
221 RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/);
222
223 uint8_t cbAddr;
224 rc = rtLocalIpcPosixConstructName(&pThis->Name, &cbAddr, pszName, cchName);
225 if (RT_SUCCESS(rc))
226 {
227 rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr);
228 if (rc == VERR_NET_ADDRESS_IN_USE)
229 {
230 unlink(pThis->Name.sun_path);
231 rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr);
232 }
233 if (RT_SUCCESS(rc))
234 {
235 LogFlow(("RTLocalIpcServerCreate: Created %p (%s)\n", pThis, pThis->Name.sun_path));
236 *phServer = pThis;
237 return VINF_SUCCESS;
238 }
239 }
240 RTSocketRelease(pThis->hSocket);
241 }
242 RTCritSectDelete(&pThis->CritSect);
243 }
244 RTMemFree(pThis);
245 }
246 else
247 rc = VERR_NO_MEMORY;
248 }
249 Log(("RTLocalIpcServerCreate: failed, rc=%Rrc\n", rc));
250 return rc;
251}
252
253
254/**
255 * Retains a reference to the server instance.
256 *
257 * @returns
258 * @param pThis The server instance.
259 */
260DECLINLINE(void) rtLocalIpcServerRetain(PRTLOCALIPCSERVERINT pThis)
261{
262 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
263 Assert(cRefs < UINT32_MAX / 2 && cRefs);
264}
265
266
267/**
268 * Server instance destructor.
269 *
270 * @returns VINF_OBJECT_DESTROYED
271 * @param pThis The server instance.
272 */
273static int rtLocalIpcServerDtor(PRTLOCALIPCSERVERINT pThis)
274{
275 pThis->u32Magic = ~RTLOCALIPCSERVER_MAGIC;
276 if (RTSocketRelease(pThis->hSocket) == 0)
277 Log(("rtLocalIpcServerDtor: Released socket\n"));
278 else
279 Log(("rtLocalIpcServerDtor: Socket still has references (impossible?)\n"));
280 RTCritSectDelete(&pThis->CritSect);
281 unlink(pThis->Name.sun_path);
282 RTMemFree(pThis);
283 return VINF_OBJECT_DESTROYED;
284}
285
286
287/**
288 * Releases a reference to the server instance.
289 *
290 * @returns VINF_SUCCESS if only release, VINF_OBJECT_DESTROYED if destroyed.
291 * @param pThis The server instance.
292 */
293DECLINLINE(int) rtLocalIpcServerRelease(PRTLOCALIPCSERVERINT pThis)
294{
295 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
296 Assert(cRefs < UINT32_MAX / 2);
297 if (!cRefs)
298 return rtLocalIpcServerDtor(pThis);
299 return VINF_SUCCESS;
300}
301
302
303/**
304 * The core of RTLocalIpcServerCancel, used by both the destroy and cancel APIs.
305 *
306 * @returns IPRT status code
307 * @param pThis The server instance.
308 */
309static int rtLocalIpcServerCancel(PRTLOCALIPCSERVERINT pThis)
310{
311 RTCritSectEnter(&pThis->CritSect);
312 pThis->fCancelled = true;
313 Log(("rtLocalIpcServerCancel:\n"));
314 if (pThis->hListenThread != NIL_RTTHREAD)
315 RTThreadPoke(pThis->hListenThread);
316 RTCritSectLeave(&pThis->CritSect);
317 return VINF_SUCCESS;
318}
319
320
321
322RTDECL(int) RTLocalIpcServerDestroy(RTLOCALIPCSERVER hServer)
323{
324 /*
325 * Validate input.
326 */
327 if (hServer == NIL_RTLOCALIPCSERVER)
328 return VINF_SUCCESS;
329 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
330 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
331 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
332
333 /*
334 * Invalidate the server, releasing the caller's reference to the instance
335 * data and making sure any other thread in the listen API will wake up.
336 */
337 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSERVER_MAGIC, RTLOCALIPCSERVER_MAGIC), VERR_WRONG_ORDER);
338
339 rtLocalIpcServerCancel(pThis);
340 return rtLocalIpcServerRelease(pThis);
341}
342
343
344RTDECL(int) RTLocalIpcServerCancel(RTLOCALIPCSERVER hServer)
345{
346 /*
347 * Validate input.
348 */
349 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
350 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
351 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
352
353 /*
354 * Do the job.
355 */
356 rtLocalIpcServerRetain(pThis);
357 rtLocalIpcServerCancel(pThis);
358 rtLocalIpcServerRelease(pThis);
359 return VINF_SUCCESS;
360}
361
362
363RTDECL(int) RTLocalIpcServerListen(RTLOCALIPCSERVER hServer, PRTLOCALIPCSESSION phClientSession)
364{
365 /*
366 * Validate input.
367 */
368 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
369 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
370 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
371
372 /*
373 * Begin listening.
374 */
375 rtLocalIpcServerRetain(pThis);
376 int rc = RTCritSectEnter(&pThis->CritSect);
377 if (RT_SUCCESS(rc))
378 {
379 if (pThis->hListenThread == NIL_RTTHREAD)
380 {
381 pThis->hListenThread = RTThreadSelf();
382
383 /*
384 * The listening retry loop.
385 */
386 for (;;)
387 {
388 if (pThis->fCancelled)
389 {
390 rc = VERR_CANCELLED;
391 break;
392 }
393
394 rc = RTCritSectLeave(&pThis->CritSect);
395 AssertRCBreak(rc);
396
397 rc = rtSocketListen(pThis->hSocket, pThis->fFlags & RTLOCALIPC_FLAGS_MULTI_SESSION ? 10 : 0);
398 if (RT_SUCCESS(rc))
399 {
400 struct sockaddr_un Addr;
401 size_t cbAddr = sizeof(Addr);
402 RTSOCKET hClient;
403 Log(("RTLocalIpcServerListen: Calling rtSocketAccept...\n"));
404 rc = rtSocketAccept(pThis->hSocket, &hClient, (struct sockaddr *)&Addr, &cbAddr);
405 Log(("RTLocalIpcServerListen: rtSocketAccept returns %Rrc.\n", rc));
406
407 int rc2 = RTCritSectEnter(&pThis->CritSect);
408 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
409
410 if (RT_SUCCESS(rc))
411 {
412 /*
413 * Create a client session.
414 */
415 PRTLOCALIPCSESSIONINT pSession = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pSession));
416 if (pSession)
417 {
418 pSession->u32Magic = RTLOCALIPCSESSION_MAGIC;
419 pSession->cRefs = 1;
420 pSession->fCancelled = false;
421 pSession->fServerSide = true;
422 pSession->hSocket = hClient;
423 pSession->hReadThread = NIL_RTTHREAD;
424 pSession->hWriteThread = NIL_RTTHREAD;
425 rc = RTCritSectInit(&pSession->CritSect);
426 if (RT_SUCCESS(rc))
427 {
428 Log(("RTLocalIpcServerListen: Returning new client session: %p\n", pSession));
429 *phClientSession = pSession;
430 break;
431 }
432
433 RTMemFree(pSession);
434 }
435 else
436 rc = VERR_NO_MEMORY;
437 }
438 else if ( rc != VERR_INTERRUPTED
439 && rc != VERR_TRY_AGAIN)
440 break;
441 }
442 else
443 {
444 int rc2 = RTCritSectEnter(&pThis->CritSect);
445 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
446 if ( rc != VERR_INTERRUPTED
447 && rc != VERR_TRY_AGAIN)
448 break;
449 }
450 }
451
452 pThis->hListenThread = NIL_RTTHREAD;
453 }
454 else
455 {
456 AssertFailed();
457 rc = VERR_RESOURCE_BUSY;
458 }
459 int rc2 = RTCritSectLeave(&pThis->CritSect);
460 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
461 }
462 rtLocalIpcServerRelease(pThis);
463
464 Log(("RTLocalIpcServerListen: returns %Rrc\n", rc));
465 return rc;
466}
467
468
469RTDECL(int) RTLocalIpcSessionConnect(PRTLOCALIPCSESSION phSession, const char *pszName, uint32_t fFlags)
470{
471 /*
472 * Parameter validation.
473 */
474 AssertPtrReturn(phSession, VERR_INVALID_POINTER);
475 *phSession = NIL_RTLOCALIPCSESSION;
476
477 AssertReturn(!fFlags, VERR_INVALID_FLAGS);
478
479 size_t cchName;
480 int rc = rtLocalIpcPosixValidateName(pszName, &cchName);
481 if (RT_SUCCESS(rc))
482 {
483 /*
484 * Allocate memory for the instance and initialize it.
485 */
486 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pThis));
487 if (pThis)
488 {
489 pThis->u32Magic = RTLOCALIPCSESSION_MAGIC;
490 pThis->cRefs = 1;
491 pThis->fCancelled = false;
492 pThis->fServerSide = false;
493 pThis->hSocket = NIL_RTSOCKET;
494 pThis->hReadThread = NIL_RTTHREAD;
495 pThis->hWriteThread = NIL_RTTHREAD;
496 rc = RTCritSectInit(&pThis->CritSect);
497 if (RT_SUCCESS(rc))
498 {
499 /*
500 * Create the local (unix) socket and try connect to the server.
501 */
502 rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/);
503 if (RT_SUCCESS(rc))
504 {
505 RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/);
506
507 struct sockaddr_un Addr;
508 uint8_t cbAddr;
509 rc = rtLocalIpcPosixConstructName(&Addr, &cbAddr, pszName, cchName);
510 if (RT_SUCCESS(rc))
511 {
512 rc = rtSocketConnectRaw(pThis->hSocket, &Addr, cbAddr);
513 if (RT_SUCCESS(rc))
514 {
515 *phSession = pThis;
516 Log(("RTLocalIpcSessionConnect: Returns new session %p\n", pThis));
517 return VINF_SUCCESS;
518 }
519 }
520 RTCritSectDelete(&pThis->CritSect);
521 }
522 }
523 RTMemFree(pThis);
524 }
525 else
526 rc = VERR_NO_MEMORY;
527 }
528 Log(("RTLocalIpcSessionConnect: returns %Rrc\n", rc));
529 return rc;
530}
531
532
533/**
534 * Retains a reference to the session instance.
535 *
536 * @param pThis The server instance.
537 */
538DECLINLINE(void) rtLocalIpcSessionRetain(PRTLOCALIPCSESSIONINT pThis)
539{
540 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
541 Assert(cRefs < UINT32_MAX / 2 && cRefs);
542}
543
544
545/**
546 * Session instance destructor.
547 *
548 * @returns VINF_OBJECT_DESTROYED
549 * @param pThis The server instance.
550 */
551static int rtLocalIpcSessionDtor(PRTLOCALIPCSESSIONINT pThis)
552{
553 pThis->u32Magic = ~RTLOCALIPCSESSION_MAGIC;
554 if (RTSocketRelease(pThis->hSocket) == 0)
555 Log(("rtLocalIpcSessionDtor: Released socket\n"));
556 else
557 Log(("rtLocalIpcSessionDtor: Socket still has references (impossible?)\n"));
558 RTCritSectDelete(&pThis->CritSect);
559 RTMemFree(pThis);
560 return VINF_OBJECT_DESTROYED;
561}
562
563
564/**
565 * Releases a reference to the session instance.
566 *
567 * @returns VINF_SUCCESS or VINF_OBJECT_DESTROYED as appropriate.
568 * @param pThis The session instance.
569 */
570DECLINLINE(int) rtLocalIpcSessionRelease(PRTLOCALIPCSESSIONINT pThis)
571{
572 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
573 Assert(cRefs < UINT32_MAX / 2);
574 if (!cRefs)
575 return rtLocalIpcSessionDtor(pThis);
576 Log(("rtLocalIpcSessionRelease: %u refs left\n", cRefs));
577 return VINF_SUCCESS;
578}
579
580
581/**
582 * The core of RTLocalIpcSessionCancel, used by both the destroy and cancel APIs.
583 *
584 * @returns IPRT status code
585 * @param pThis The session instance.
586 */
587static int rtLocalIpcSessionCancel(PRTLOCALIPCSESSIONINT pThis)
588{
589 RTCritSectEnter(&pThis->CritSect);
590 pThis->fCancelled = true;
591 Log(("rtLocalIpcSessionCancel:\n"));
592 if (pThis->hReadThread != NIL_RTTHREAD)
593 RTThreadPoke(pThis->hReadThread);
594 if (pThis->hWriteThread != NIL_RTTHREAD)
595 RTThreadPoke(pThis->hWriteThread);
596 RTCritSectLeave(&pThis->CritSect);
597 return VINF_SUCCESS;
598}
599
600
601RTDECL(int) RTLocalIpcSessionClose(RTLOCALIPCSESSION hSession)
602{
603 /*
604 * Validate input.
605 */
606 if (hSession == NIL_RTLOCALIPCSESSION)
607 return VINF_SUCCESS;
608 PRTLOCALIPCSESSIONINT pThis = hSession;
609 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
610 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
611
612 /*
613 * Invalidate the session, releasing the caller's reference to the instance
614 * data and making sure any other thread in the listen API will wake up.
615 */
616 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSESSION_MAGIC, RTLOCALIPCSESSION_MAGIC), VERR_WRONG_ORDER);
617 Log(("RTLocalIpcSessionClose:\n"));
618
619 rtLocalIpcSessionCancel(pThis);
620 return rtLocalIpcSessionRelease(pThis);
621}
622
623
624RTDECL(int) RTLocalIpcSessionCancel(RTLOCALIPCSESSION hSession)
625{
626 /*
627 * Validate input.
628 */
629 PRTLOCALIPCSESSIONINT pThis = hSession;
630 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
631 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
632
633 /*
634 * Do the job.
635 */
636 rtLocalIpcSessionRetain(pThis);
637 rtLocalIpcSessionCancel(pThis);
638 rtLocalIpcSessionRelease(pThis);
639 return VINF_SUCCESS;
640}
641
642
643/**
644 * Checks if the socket has has a HUP condition.
645 *
646 * @returns true if HUP, false if no.
647 * @param pThis The IPC session handle.
648 */
649static bool rtLocalIpcPosixHasHup(PRTLOCALIPCSESSIONINT pThis)
650{
651#ifndef RT_OS_OS2
652 struct pollfd PollFd;
653 RT_ZERO(PollFd);
654 PollFd.fd = RTSocketToNative(pThis->hSocket);
655 PollFd.events = POLLHUP;
656 return poll(&PollFd, 1, 0) >= 1
657 && (PollFd.revents & POLLHUP);
658
659#else /* RT_OS_OS2: */
660 return true;
661#endif
662}
663
664
665RTDECL(int) RTLocalIpcSessionRead(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead)
666{
667 /*
668 * Validate input.
669 */
670 PRTLOCALIPCSESSIONINT pThis = hSession;
671 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
672 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
673
674 /*
675 * Do the job.
676 */
677 rtLocalIpcSessionRetain(pThis);
678
679 int rc = RTCritSectEnter(&pThis->CritSect);
680 if (RT_SUCCESS(rc))
681 {
682 if (pThis->hReadThread == NIL_RTTHREAD)
683 {
684 pThis->hReadThread = RTThreadSelf();
685
686 for (;;)
687 {
688 if (!pThis->fCancelled)
689 {
690 rc = RTCritSectLeave(&pThis->CritSect);
691 AssertRCBreak(rc);
692
693 rc = RTSocketRead(pThis->hSocket, pvBuf, cbToRead, pcbRead);
694
695 /* Detect broken pipe. */
696 if (rc == VINF_SUCCESS)
697 {
698 if (!pcbRead || *pcbRead)
699 { /* likely */ }
700 else if (rtLocalIpcPosixHasHup(pThis))
701 rc = VERR_BROKEN_PIPE;
702 }
703 else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN)
704 rc = VERR_BROKEN_PIPE;
705
706 int rc2 = RTCritSectEnter(&pThis->CritSect);
707 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
708
709 if ( rc == VERR_INTERRUPTED
710 || rc == VERR_TRY_AGAIN)
711 continue;
712 }
713 else
714 rc = VERR_CANCELLED;
715 break;
716 }
717
718 pThis->hReadThread = NIL_RTTHREAD;
719 }
720 int rc2 = RTCritSectLeave(&pThis->CritSect);
721 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
722 }
723
724 rtLocalIpcSessionRelease(pThis);
725 return rc;
726}
727
728
729RTDECL(int) RTLocalIpcSessionReadNB(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead)
730{
731 /*
732 * Validate input.
733 */
734 PRTLOCALIPCSESSIONINT pThis = hSession;
735 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
736 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
737
738 /*
739 * Do the job.
740 */
741 rtLocalIpcSessionRetain(pThis);
742
743 int rc = RTCritSectEnter(&pThis->CritSect);
744 if (RT_SUCCESS(rc))
745 {
746 if (pThis->hReadThread == NIL_RTTHREAD)
747 {
748 pThis->hReadThread = RTThreadSelf(); /* not really required, but whatever. */
749
750 for (;;)
751 {
752 if (!pThis->fCancelled)
753 {
754 rc = RTSocketReadNB(pThis->hSocket, pvBuf, cbToRead, pcbRead);
755
756 /* Detect broken pipe. */
757 if (rc == VINF_SUCCESS)
758 {
759 if (!pcbRead || *pcbRead)
760 { /* likely */ }
761 else if (rtLocalIpcPosixHasHup(pThis))
762 rc = VERR_BROKEN_PIPE;
763 }
764 else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN)
765 rc = VERR_BROKEN_PIPE;
766
767 if (rc == VERR_INTERRUPTED)
768 continue;
769 }
770 else
771 rc = VERR_CANCELLED;
772 break;
773 }
774
775 pThis->hReadThread = NIL_RTTHREAD;
776 }
777 int rc2 = RTCritSectLeave(&pThis->CritSect);
778 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
779 }
780
781 rtLocalIpcSessionRelease(pThis);
782 return rc;
783}
784
785
786RTDECL(int) RTLocalIpcSessionWrite(RTLOCALIPCSESSION hSession, const void *pvBuf, size_t cbToWrite)
787{
788 /*
789 * Validate input.
790 */
791 PRTLOCALIPCSESSIONINT pThis = hSession;
792 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
793 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
794
795 /*
796 * Do the job.
797 */
798 rtLocalIpcSessionRetain(pThis);
799
800 int rc = RTCritSectEnter(&pThis->CritSect);
801 if (RT_SUCCESS(rc))
802 {
803 if (pThis->hWriteThread == NIL_RTTHREAD)
804 {
805 pThis->hWriteThread = RTThreadSelf();
806
807 for (;;)
808 {
809 if (!pThis->fCancelled)
810 {
811 rc = RTCritSectLeave(&pThis->CritSect);
812 AssertRCBreak(rc);
813
814 rc = RTSocketWrite(pThis->hSocket, pvBuf, cbToWrite);
815
816 int rc2 = RTCritSectEnter(&pThis->CritSect);
817 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
818
819 if ( rc == VERR_INTERRUPTED
820 || rc == VERR_TRY_AGAIN)
821 continue;
822 }
823 else
824 rc = VERR_CANCELLED;
825 break;
826 }
827
828 pThis->hWriteThread = NIL_RTTHREAD;
829 }
830 int rc2 = RTCritSectLeave(&pThis->CritSect);
831 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
832 }
833
834 rtLocalIpcSessionRelease(pThis);
835 return rc;
836}
837
838
839RTDECL(int) RTLocalIpcSessionFlush(RTLOCALIPCSESSION hSession)
840{
841 /*
842 * Validate input.
843 */
844 PRTLOCALIPCSESSIONINT pThis = hSession;
845 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
846 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
847
848 /*
849 * This is a no-op because apparently write doesn't return until the
850 * result is read. At least that's what the reply to a 2003-04-08 LKML
851 * posting title "fsync() on unix domain sockets?" indicates.
852 *
853 * For conformity, make sure there isn't any active writes concurrent to this call.
854 */
855 rtLocalIpcSessionRetain(pThis);
856
857 int rc = RTCritSectEnter(&pThis->CritSect);
858 if (RT_SUCCESS(rc))
859 {
860 if (pThis->hWriteThread == NIL_RTTHREAD)
861 rc = RTCritSectLeave(&pThis->CritSect);
862 else
863 {
864 rc = RTCritSectLeave(&pThis->CritSect);
865 if (RT_SUCCESS(rc))
866 rc = VERR_RESOURCE_BUSY;
867 }
868 }
869
870 rtLocalIpcSessionRelease(pThis);
871 return rc;
872}
873
874
875RTDECL(int) RTLocalIpcSessionWaitForData(RTLOCALIPCSESSION hSession, uint32_t cMillies)
876{
877 /*
878 * Validate input.
879 */
880 PRTLOCALIPCSESSIONINT pThis = hSession;
881 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
882 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
883
884 /*
885 * Do the job.
886 */
887 rtLocalIpcSessionRetain(pThis);
888
889 int rc = RTCritSectEnter(&pThis->CritSect);
890 if (RT_SUCCESS(rc))
891 {
892 if (pThis->hReadThread == NIL_RTTHREAD)
893 {
894 pThis->hReadThread = RTThreadSelf();
895 uint64_t const msStart = RTTimeMilliTS();
896 RTMSINTERVAL const cMsOriginalTimeout = cMillies;
897
898 for (;;)
899 {
900 if (!pThis->fCancelled)
901 {
902 rc = RTCritSectLeave(&pThis->CritSect);
903 AssertRCBreak(rc);
904
905 uint32_t fEvents = 0;
906#ifdef RT_OS_OS2
907 /* This doesn't give us any error condition on hangup. */
908 Log(("RTLocalIpcSessionWaitForData: Calling RTSocketSelectOneEx...\n"));
909 rc = RTSocketSelectOneEx(pThis->hSocket, RTPOLL_EVT_READ | RTPOLL_EVT_ERROR, &fEvents, cMillies);
910 Log(("RTLocalIpcSessionWaitForData: RTSocketSelectOneEx returns %Rrc, fEvents=%#x\n", rc, fEvents));
911#else
912/** @todo RTSocketPoll */
913 /* POLLHUP will be set on hangup. */
914 struct pollfd PollFd;
915 RT_ZERO(PollFd);
916 PollFd.fd = RTSocketToNative(pThis->hSocket);
917 PollFd.events = POLLHUP | POLLERR | POLLIN;
918 Log(("RTLocalIpcSessionWaitForData: Calling poll...\n"));
919 int cFds = poll(&PollFd, 1, cMillies == RT_INDEFINITE_WAIT ? -1 : cMillies);
920 if (cFds >= 1)
921 {
922 fEvents = PollFd.revents & (POLLHUP | POLLERR) ? RTPOLL_EVT_ERROR : RTPOLL_EVT_READ;
923 rc = VINF_SUCCESS;
924 }
925 else if (rc == 0)
926 rc = VERR_TIMEOUT;
927 else
928 rc = RTErrConvertFromErrno(errno);
929 Log(("RTLocalIpcSessionWaitForData: poll returns %u (rc=%%d), revents=%#x\n", cFds, rc, PollFd.revents));
930#endif
931
932 int rc2 = RTCritSectEnter(&pThis->CritSect);
933 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
934
935 if (RT_SUCCESS(rc))
936 {
937 if (pThis->fCancelled)
938 rc = VERR_CANCELLED;
939 else if (fEvents & RTPOLL_EVT_ERROR)
940 rc = VERR_BROKEN_PIPE;
941 }
942 else if ( rc == VERR_INTERRUPTED
943 || rc == VERR_TRY_AGAIN)
944 {
945 /* Recalc cMillies. */
946 if (cMsOriginalTimeout != RT_INDEFINITE_WAIT)
947 {
948 uint64_t cMsElapsed = RTTimeMilliTS() - msStart;
949 cMillies = cMsElapsed >= cMsOriginalTimeout ? 0 : cMsOriginalTimeout - (RTMSINTERVAL)cMsElapsed;
950 }
951 continue;
952 }
953 }
954 else
955 rc = VERR_CANCELLED;
956 break;
957 }
958
959 pThis->hReadThread = NIL_RTTHREAD;
960 }
961 int rc2 = RTCritSectLeave(&pThis->CritSect);
962 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
963 }
964
965 rtLocalIpcSessionRelease(pThis);
966 return rc;
967}
968
969
970RTDECL(int) RTLocalIpcSessionQueryProcess(RTLOCALIPCSESSION hSession, PRTPROCESS pProcess)
971{
972 return VERR_NOT_SUPPORTED;
973}
974
975
976RTDECL(int) RTLocalIpcSessionQueryUserId(RTLOCALIPCSESSION hSession, PRTUID pUid)
977{
978 return VERR_NOT_SUPPORTED;
979}
980
981
982RTDECL(int) RTLocalIpcSessionQueryGroupId(RTLOCALIPCSESSION hSession, PRTGID pGid)
983{
984 return VERR_NOT_SUPPORTED;
985}
986
987
988
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette