VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/posix/localipc-posix.cpp@ 58300

Last change on this file since 58300 was 58300, checked in by vboxsync, 10 years ago

RTLocalIpc: Added RTLocalIpcSessionRetain and RTLocalIpcSessionRelease.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 32.7 KB
Line 
1/* $Id: localipc-posix.cpp 58300 2015-10-18 19:52:19Z vboxsync $ */
2/** @file
3 * IPRT - Local IPC Server & Client, Posix.
4 */
5
6/*
7 * Copyright (C) 2006-2013 Oracle Corporation
8 *
9 * This file is part of VirtualBox Open Source Edition (OSE), as
10 * available from http://www.215389.xyz. This file is free software;
11 * you can redistribute it and/or modify it under the terms of the GNU
12 * General Public License (GPL) as published by the Free Software
13 * Foundation, in version 2 as it comes in the "COPYING" file of the
14 * VirtualBox OSE distribution. VirtualBox OSE is distributed in the
15 * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
16 *
17 * The contents of this file may alternatively be used under the terms
18 * of the Common Development and Distribution License Version 1.0
19 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the
20 * VirtualBox OSE distribution, in which case the provisions of the
21 * CDDL are applicable instead of those of the GPL.
22 *
23 * You may elect to license modified versions of this file under the
24 * terms and conditions of either the GPL or the CDDL or both.
25 */
26
27
28/*******************************************************************************
29* Header Files *
30*******************************************************************************/
31#define LOG_GROUP RTLOGGROUP_LOCALIPC
32#include "internal/iprt.h"
33#include <iprt/localipc.h>
34
35#include <iprt/asm.h>
36#include <iprt/assert.h>
37#include <iprt/ctype.h>
38#include <iprt/critsect.h>
39#include <iprt/mem.h>
40#include <iprt/log.h>
41#include <iprt/poll.h>
42#include <iprt/socket.h>
43#include <iprt/string.h>
44#include <iprt/time.h>
45
46#include <sys/types.h>
47#include <sys/socket.h>
48#include <sys/un.h>
49#ifndef RT_OS_OS2
50# include <sys/poll.h>
51# include <errno.h>
52#endif
53#include <fcntl.h>
54#include <unistd.h>
55
56#include "internal/magics.h"
57#include "internal/path.h"
58#include "internal/socket.h"
59
60
61/*******************************************************************************
62* Structures and Typedefs *
63*******************************************************************************/
64/**
65 * Local IPC service instance, POSIX.
66 */
67typedef struct RTLOCALIPCSERVERINT
68{
69 /** The magic (RTLOCALIPCSERVER_MAGIC). */
70 uint32_t u32Magic;
71 /** The creation flags. */
72 uint32_t fFlags;
73 /** Critical section protecting the structure. */
74 RTCRITSECT CritSect;
75 /** The number of references to the instance. */
76 uint32_t volatile cRefs;
77 /** Indicates that there is a pending cancel request. */
78 bool volatile fCancelled;
79 /** The server socket. */
80 RTSOCKET hSocket;
81 /** Thread currently listening for clients. */
82 RTTHREAD hListenThread;
83 /** The name we bound the server to (native charset encoding). */
84 struct sockaddr_un Name;
85} RTLOCALIPCSERVERINT;
86/** Pointer to a local IPC server instance (POSIX). */
87typedef RTLOCALIPCSERVERINT *PRTLOCALIPCSERVERINT;
88
89
90/**
91 * Local IPC session instance, POSIX.
92 */
93typedef struct RTLOCALIPCSESSIONINT
94{
95 /** The magic (RTLOCALIPCSESSION_MAGIC). */
96 uint32_t u32Magic;
97 /** Critical section protecting the structure. */
98 RTCRITSECT CritSect;
99 /** The number of references to the instance. */
100 uint32_t volatile cRefs;
101 /** Indicates that there is a pending cancel request. */
102 bool volatile fCancelled;
103 /** Set if this is the server side, clear if the client. */
104 bool fServerSide;
105 /** The client socket. */
106 RTSOCKET hSocket;
107 /** Thread currently doing read related activites. */
108 RTTHREAD hWriteThread;
109 /** Thread currently doing write related activies. */
110 RTTHREAD hReadThread;
111} RTLOCALIPCSESSIONINT;
112/** Pointer to a local IPC session instance (Windows). */
113typedef RTLOCALIPCSESSIONINT *PRTLOCALIPCSESSIONINT;
114
115
116/** Local IPC name prefix for portable names. */
117#define RTLOCALIPC_POSIX_NAME_PREFIX "/tmp/.iprt-localipc-"
118
119
120/**
121 * Validates the user specified name.
122 *
123 * @returns IPRT status code.
124 * @param pszName The name to validate.
125 * @param fNative Whether it's a native name or a portable name.
126 */
127static int rtLocalIpcPosixValidateName(const char *pszName, bool fNative)
128{
129 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
130 AssertReturn(*pszName, VERR_INVALID_NAME);
131
132 if (!fNative)
133 {
134 for (;;)
135 {
136 char ch = *pszName++;
137 if (!ch)
138 break;
139 AssertReturn(!RT_C_IS_CNTRL(ch), VERR_INVALID_NAME);
140 AssertReturn((unsigned)ch < 0x80, VERR_INVALID_NAME);
141 AssertReturn(ch != '\\', VERR_INVALID_NAME);
142 AssertReturn(ch != '/', VERR_INVALID_NAME);
143 }
144 }
145 else
146 {
147 int rc = RTStrValidateEncoding(pszName);
148 AssertRCReturn(rc, rc);
149 }
150
151 return VINF_SUCCESS;
152}
153
154
155/**
156 * Constructs a local (unix) domain socket name.
157 *
158 * @returns IPRT status code.
159 * @param pAddr The address structure to construct the name in.
160 * @param pcbAddr Where to return the address size.
161 * @param pszName The user specified name (valid).
162 * @param fNative Whether it's a native name or a portable name.
163 */
164static int rtLocalIpcPosixConstructName(struct sockaddr_un *pAddr, uint8_t *pcbAddr, const char *pszName, bool fNative)
165{
166 const char *pszNativeName;
167 int rc = rtPathToNative(&pszNativeName, pszName, NULL /*pszBasePath not support*/);
168 if (RT_SUCCESS(rc))
169 {
170 size_t cchNativeName = strlen(pszNativeName);
171 size_t cbFull = !fNative ? cchNativeName + sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) : cchNativeName + 1;
172 if (cbFull <= sizeof(pAddr->sun_path))
173 {
174 RT_ZERO(*pAddr);
175#ifdef RT_OS_OS2 /* Size must be exactly right on OS/2. */
176 *pcbAddr = sizeof(*pAddr);
177#else
178 *pcbAddr = RT_OFFSETOF(struct sockaddr_un, sun_path) + (uint8_t)cbFull;
179#endif
180#ifdef HAVE_SUN_LEN_MEMBER
181 pAddr->sun_len = *pcbAddr;
182#endif
183 pAddr->sun_family = AF_LOCAL;
184
185 if (!fNative)
186 {
187 memcpy(pAddr->sun_path, RTLOCALIPC_POSIX_NAME_PREFIX, sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1);
188 memcpy(&pAddr->sun_path[sizeof(RTLOCALIPC_POSIX_NAME_PREFIX) - 1], pszNativeName, cchNativeName + 1);
189 }
190 else
191 memcpy(pAddr->sun_path, pszNativeName, cchNativeName + 1);
192 }
193 else
194 rc = VERR_FILENAME_TOO_LONG;
195 rtPathFreeNative(pszNativeName, pszName);
196 }
197 return rc;
198}
199
200
201
202RTDECL(int) RTLocalIpcServerCreate(PRTLOCALIPCSERVER phServer, const char *pszName, uint32_t fFlags)
203{
204 /*
205 * Parameter validation.
206 */
207 AssertPtrReturn(phServer, VERR_INVALID_POINTER);
208 *phServer = NIL_RTLOCALIPCSERVER;
209
210 AssertReturn(!(fFlags & ~RTLOCALIPC_FLAGS_VALID_MASK), VERR_INVALID_FLAGS);
211
212 int rc = rtLocalIpcPosixValidateName(pszName, RT_BOOL(fFlags & RTLOCALIPC_FLAGS_NATIVE_NAME));
213 if (RT_SUCCESS(rc))
214 {
215 /*
216 * Allocate memory for the instance and initialize it.
217 */
218 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)RTMemAllocZ(sizeof(*pThis));
219 if (pThis)
220 {
221 pThis->u32Magic = RTLOCALIPCSERVER_MAGIC;
222 pThis->fFlags = fFlags;
223 pThis->cRefs = 1;
224 pThis->fCancelled = false;
225 pThis->hListenThread = NIL_RTTHREAD;
226 rc = RTCritSectInit(&pThis->CritSect);
227 if (RT_SUCCESS(rc))
228 {
229 /*
230 * Create the local (unix) socket and bind to it.
231 */
232 rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/);
233 if (RT_SUCCESS(rc))
234 {
235 RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/);
236
237 uint8_t cbAddr;
238 rc = rtLocalIpcPosixConstructName(&pThis->Name, &cbAddr, pszName,
239 RT_BOOL(fFlags & RTLOCALIPC_FLAGS_NATIVE_NAME));
240 if (RT_SUCCESS(rc))
241 {
242 rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr);
243 if (rc == VERR_NET_ADDRESS_IN_USE)
244 {
245 unlink(pThis->Name.sun_path);
246 rc = rtSocketBindRawAddr(pThis->hSocket, &pThis->Name, cbAddr);
247 }
248 if (RT_SUCCESS(rc))
249 {
250 LogFlow(("RTLocalIpcServerCreate: Created %p (%s)\n", pThis, pThis->Name.sun_path));
251 *phServer = pThis;
252 return VINF_SUCCESS;
253 }
254 }
255 RTSocketRelease(pThis->hSocket);
256 }
257 RTCritSectDelete(&pThis->CritSect);
258 }
259 RTMemFree(pThis);
260 }
261 else
262 rc = VERR_NO_MEMORY;
263 }
264 Log(("RTLocalIpcServerCreate: failed, rc=%Rrc\n", rc));
265 return rc;
266}
267
268
269/**
270 * Retains a reference to the server instance.
271 *
272 * @returns
273 * @param pThis The server instance.
274 */
275DECLINLINE(void) rtLocalIpcServerRetain(PRTLOCALIPCSERVERINT pThis)
276{
277 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
278 Assert(cRefs < UINT32_MAX / 2 && cRefs);
279}
280
281
282/**
283 * Server instance destructor.
284 *
285 * @returns VINF_OBJECT_DESTROYED
286 * @param pThis The server instance.
287 */
288static int rtLocalIpcServerDtor(PRTLOCALIPCSERVERINT pThis)
289{
290 pThis->u32Magic = ~RTLOCALIPCSERVER_MAGIC;
291 if (RTSocketRelease(pThis->hSocket) == 0)
292 Log(("rtLocalIpcServerDtor: Released socket\n"));
293 else
294 Log(("rtLocalIpcServerDtor: Socket still has references (impossible?)\n"));
295 RTCritSectDelete(&pThis->CritSect);
296 unlink(pThis->Name.sun_path);
297 RTMemFree(pThis);
298 return VINF_OBJECT_DESTROYED;
299}
300
301
302/**
303 * Releases a reference to the server instance.
304 *
305 * @returns VINF_SUCCESS if only release, VINF_OBJECT_DESTROYED if destroyed.
306 * @param pThis The server instance.
307 */
308DECLINLINE(int) rtLocalIpcServerRelease(PRTLOCALIPCSERVERINT pThis)
309{
310 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
311 Assert(cRefs < UINT32_MAX / 2);
312 if (!cRefs)
313 return rtLocalIpcServerDtor(pThis);
314 return VINF_SUCCESS;
315}
316
317
318/**
319 * The core of RTLocalIpcServerCancel, used by both the destroy and cancel APIs.
320 *
321 * @returns IPRT status code
322 * @param pThis The server instance.
323 */
324static int rtLocalIpcServerCancel(PRTLOCALIPCSERVERINT pThis)
325{
326 RTCritSectEnter(&pThis->CritSect);
327 pThis->fCancelled = true;
328 Log(("rtLocalIpcServerCancel:\n"));
329 if (pThis->hListenThread != NIL_RTTHREAD)
330 RTThreadPoke(pThis->hListenThread);
331 RTCritSectLeave(&pThis->CritSect);
332 return VINF_SUCCESS;
333}
334
335
336
337RTDECL(int) RTLocalIpcServerDestroy(RTLOCALIPCSERVER hServer)
338{
339 /*
340 * Validate input.
341 */
342 if (hServer == NIL_RTLOCALIPCSERVER)
343 return VINF_SUCCESS;
344 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
345 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
346 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
347
348 /*
349 * Invalidate the server, releasing the caller's reference to the instance
350 * data and making sure any other thread in the listen API will wake up.
351 */
352 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSERVER_MAGIC, RTLOCALIPCSERVER_MAGIC), VERR_WRONG_ORDER);
353
354 rtLocalIpcServerCancel(pThis);
355 return rtLocalIpcServerRelease(pThis);
356}
357
358
359RTDECL(int) RTLocalIpcServerCancel(RTLOCALIPCSERVER hServer)
360{
361 /*
362 * Validate input.
363 */
364 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
365 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
366 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
367
368 /*
369 * Do the job.
370 */
371 rtLocalIpcServerRetain(pThis);
372 rtLocalIpcServerCancel(pThis);
373 rtLocalIpcServerRelease(pThis);
374 return VINF_SUCCESS;
375}
376
377
378RTDECL(int) RTLocalIpcServerListen(RTLOCALIPCSERVER hServer, PRTLOCALIPCSESSION phClientSession)
379{
380 /*
381 * Validate input.
382 */
383 PRTLOCALIPCSERVERINT pThis = (PRTLOCALIPCSERVERINT)hServer;
384 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
385 AssertReturn(pThis->u32Magic == RTLOCALIPCSERVER_MAGIC, VERR_INVALID_HANDLE);
386
387 /*
388 * Begin listening.
389 */
390 rtLocalIpcServerRetain(pThis);
391 int rc = RTCritSectEnter(&pThis->CritSect);
392 if (RT_SUCCESS(rc))
393 {
394 if (pThis->hListenThread == NIL_RTTHREAD)
395 {
396 pThis->hListenThread = RTThreadSelf();
397
398 /*
399 * The listening retry loop.
400 */
401 for (;;)
402 {
403 if (pThis->fCancelled)
404 {
405 rc = VERR_CANCELLED;
406 break;
407 }
408
409 rc = RTCritSectLeave(&pThis->CritSect);
410 AssertRCBreak(rc);
411
412 rc = rtSocketListen(pThis->hSocket, pThis->fFlags & RTLOCALIPC_FLAGS_MULTI_SESSION ? 10 : 0);
413 if (RT_SUCCESS(rc))
414 {
415 struct sockaddr_un Addr;
416 size_t cbAddr = sizeof(Addr);
417 RTSOCKET hClient;
418 Log(("RTLocalIpcServerListen: Calling rtSocketAccept...\n"));
419 rc = rtSocketAccept(pThis->hSocket, &hClient, (struct sockaddr *)&Addr, &cbAddr);
420 Log(("RTLocalIpcServerListen: rtSocketAccept returns %Rrc.\n", rc));
421
422 int rc2 = RTCritSectEnter(&pThis->CritSect);
423 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
424
425 if (RT_SUCCESS(rc))
426 {
427 /*
428 * Create a client session.
429 */
430 PRTLOCALIPCSESSIONINT pSession = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pSession));
431 if (pSession)
432 {
433 pSession->u32Magic = RTLOCALIPCSESSION_MAGIC;
434 pSession->cRefs = 1;
435 pSession->fCancelled = false;
436 pSession->fServerSide = true;
437 pSession->hSocket = hClient;
438 pSession->hReadThread = NIL_RTTHREAD;
439 pSession->hWriteThread = NIL_RTTHREAD;
440 rc = RTCritSectInit(&pSession->CritSect);
441 if (RT_SUCCESS(rc))
442 {
443 Log(("RTLocalIpcServerListen: Returning new client session: %p\n", pSession));
444 *phClientSession = pSession;
445 break;
446 }
447
448 RTMemFree(pSession);
449 }
450 else
451 rc = VERR_NO_MEMORY;
452 }
453 else if ( rc != VERR_INTERRUPTED
454 && rc != VERR_TRY_AGAIN)
455 break;
456 }
457 else
458 {
459 int rc2 = RTCritSectEnter(&pThis->CritSect);
460 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
461 if ( rc != VERR_INTERRUPTED
462 && rc != VERR_TRY_AGAIN)
463 break;
464 }
465 }
466
467 pThis->hListenThread = NIL_RTTHREAD;
468 }
469 else
470 {
471 AssertFailed();
472 rc = VERR_RESOURCE_BUSY;
473 }
474 int rc2 = RTCritSectLeave(&pThis->CritSect);
475 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
476 }
477 rtLocalIpcServerRelease(pThis);
478
479 Log(("RTLocalIpcServerListen: returns %Rrc\n", rc));
480 return rc;
481}
482
483
484RTDECL(int) RTLocalIpcSessionConnect(PRTLOCALIPCSESSION phSession, const char *pszName, uint32_t fFlags)
485{
486 /*
487 * Parameter validation.
488 */
489 AssertPtrReturn(phSession, VERR_INVALID_POINTER);
490 *phSession = NIL_RTLOCALIPCSESSION;
491
492 AssertReturn(!(fFlags & ~RTLOCALIPC_C_FLAGS_VALID_MASK), VERR_INVALID_FLAGS);
493
494 int rc = rtLocalIpcPosixValidateName(pszName, RT_BOOL(fFlags & RTLOCALIPC_C_FLAGS_NATIVE_NAME));
495 if (RT_SUCCESS(rc))
496 {
497 /*
498 * Allocate memory for the instance and initialize it.
499 */
500 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)RTMemAllocZ(sizeof(*pThis));
501 if (pThis)
502 {
503 pThis->u32Magic = RTLOCALIPCSESSION_MAGIC;
504 pThis->cRefs = 1;
505 pThis->fCancelled = false;
506 pThis->fServerSide = false;
507 pThis->hSocket = NIL_RTSOCKET;
508 pThis->hReadThread = NIL_RTTHREAD;
509 pThis->hWriteThread = NIL_RTTHREAD;
510 rc = RTCritSectInit(&pThis->CritSect);
511 if (RT_SUCCESS(rc))
512 {
513 /*
514 * Create the local (unix) socket and try connect to the server.
515 */
516 rc = rtSocketCreate(&pThis->hSocket, AF_LOCAL, SOCK_STREAM, 0 /*iProtocol*/);
517 if (RT_SUCCESS(rc))
518 {
519 RTSocketSetInheritance(pThis->hSocket, false /*fInheritable*/);
520
521 struct sockaddr_un Addr;
522 uint8_t cbAddr;
523 rc = rtLocalIpcPosixConstructName(&Addr, &cbAddr, pszName, RT_BOOL(fFlags & RTLOCALIPC_C_FLAGS_NATIVE_NAME));
524 if (RT_SUCCESS(rc))
525 {
526 rc = rtSocketConnectRaw(pThis->hSocket, &Addr, cbAddr);
527 if (RT_SUCCESS(rc))
528 {
529 *phSession = pThis;
530 Log(("RTLocalIpcSessionConnect: Returns new session %p\n", pThis));
531 return VINF_SUCCESS;
532 }
533 }
534 RTCritSectDelete(&pThis->CritSect);
535 }
536 }
537 RTMemFree(pThis);
538 }
539 else
540 rc = VERR_NO_MEMORY;
541 }
542 Log(("RTLocalIpcSessionConnect: returns %Rrc\n", rc));
543 return rc;
544}
545
546
547/**
548 * Retains a reference to the session instance.
549 *
550 * @param pThis The server instance.
551 */
552DECLINLINE(void) rtLocalIpcSessionRetain(PRTLOCALIPCSESSIONINT pThis)
553{
554 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
555 Assert(cRefs < UINT32_MAX / 2 && cRefs);
556}
557
558
559RTDECL(uint32_t) RTLocalIpcSessionRetain(RTLOCALIPCSESSION hSession)
560{
561 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)hSession;
562 AssertPtrReturn(pThis, UINT32_MAX);
563 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, UINT32_MAX);
564
565 uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs);
566 Assert(cRefs < UINT32_MAX / 2 && cRefs);
567 return cRefs;
568}
569
570
571/**
572 * Session instance destructor.
573 *
574 * @returns VINF_OBJECT_DESTROYED
575 * @param pThis The server instance.
576 */
577static int rtLocalIpcSessionDtor(PRTLOCALIPCSESSIONINT pThis)
578{
579 pThis->u32Magic = ~RTLOCALIPCSESSION_MAGIC;
580 if (RTSocketRelease(pThis->hSocket) == 0)
581 Log(("rtLocalIpcSessionDtor: Released socket\n"));
582 else
583 Log(("rtLocalIpcSessionDtor: Socket still has references (impossible?)\n"));
584 RTCritSectDelete(&pThis->CritSect);
585 RTMemFree(pThis);
586 return VINF_OBJECT_DESTROYED;
587}
588
589
590/**
591 * Releases a reference to the session instance.
592 *
593 * @returns VINF_SUCCESS or VINF_OBJECT_DESTROYED as appropriate.
594 * @param pThis The session instance.
595 */
596DECLINLINE(int) rtLocalIpcSessionRelease(PRTLOCALIPCSESSIONINT pThis)
597{
598 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
599 Assert(cRefs < UINT32_MAX / 2);
600 if (!cRefs)
601 return rtLocalIpcSessionDtor(pThis);
602 Log(("rtLocalIpcSessionRelease: %u refs left\n", cRefs));
603 return VINF_SUCCESS;
604}
605
606
607RTDECL(uint32_t) RTLocalIpcSessionRelease(RTLOCALIPCSESSION hSession)
608{
609 if (hSession == NIL_RTLOCALIPCSESSION)
610 return 0;
611
612 PRTLOCALIPCSESSIONINT pThis = (PRTLOCALIPCSESSIONINT)hSession;
613 AssertPtrReturn(pThis, UINT32_MAX);
614 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, UINT32_MAX);
615
616 uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs);
617 Assert(cRefs < UINT32_MAX / 2);
618 if (cRefs)
619 Log(("RTLocalIpcSessionRelease: %u refs left\n", cRefs));
620 else
621 rtLocalIpcSessionDtor(pThis);
622 return cRefs;
623}
624
625
626/**
627 * The core of RTLocalIpcSessionCancel, used by both the destroy and cancel APIs.
628 *
629 * @returns IPRT status code
630 * @param pThis The session instance.
631 */
632static int rtLocalIpcSessionCancel(PRTLOCALIPCSESSIONINT pThis)
633{
634 RTCritSectEnter(&pThis->CritSect);
635 pThis->fCancelled = true;
636 Log(("rtLocalIpcSessionCancel:\n"));
637 if (pThis->hReadThread != NIL_RTTHREAD)
638 RTThreadPoke(pThis->hReadThread);
639 if (pThis->hWriteThread != NIL_RTTHREAD)
640 RTThreadPoke(pThis->hWriteThread);
641 RTCritSectLeave(&pThis->CritSect);
642 return VINF_SUCCESS;
643}
644
645
646RTDECL(int) RTLocalIpcSessionClose(RTLOCALIPCSESSION hSession)
647{
648 /*
649 * Validate input.
650 */
651 if (hSession == NIL_RTLOCALIPCSESSION)
652 return VINF_SUCCESS;
653 PRTLOCALIPCSESSIONINT pThis = hSession;
654 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
655 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
656
657 /*
658 * Invalidate the session, releasing the caller's reference to the instance
659 * data and making sure any other thread in the listen API will wake up.
660 */
661 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTLOCALIPCSESSION_MAGIC, RTLOCALIPCSESSION_MAGIC), VERR_WRONG_ORDER);
662 Log(("RTLocalIpcSessionClose:\n"));
663
664 rtLocalIpcSessionCancel(pThis);
665 return rtLocalIpcSessionRelease(pThis);
666}
667
668
669RTDECL(int) RTLocalIpcSessionCancel(RTLOCALIPCSESSION hSession)
670{
671 /*
672 * Validate input.
673 */
674 PRTLOCALIPCSESSIONINT pThis = hSession;
675 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
676 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
677
678 /*
679 * Do the job.
680 */
681 rtLocalIpcSessionRetain(pThis);
682 rtLocalIpcSessionCancel(pThis);
683 rtLocalIpcSessionRelease(pThis);
684 return VINF_SUCCESS;
685}
686
687
688/**
689 * Checks if the socket has has a HUP condition.
690 *
691 * @returns true if HUP, false if no.
692 * @param pThis The IPC session handle.
693 */
694static bool rtLocalIpcPosixHasHup(PRTLOCALIPCSESSIONINT pThis)
695{
696#ifndef RT_OS_OS2
697 struct pollfd PollFd;
698 RT_ZERO(PollFd);
699 PollFd.fd = RTSocketToNative(pThis->hSocket);
700 PollFd.events = POLLHUP;
701 return poll(&PollFd, 1, 0) >= 1
702 && (PollFd.revents & POLLHUP);
703
704#else /* RT_OS_OS2: */
705 return true;
706#endif
707}
708
709
710RTDECL(int) RTLocalIpcSessionRead(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead)
711{
712 /*
713 * Validate input.
714 */
715 PRTLOCALIPCSESSIONINT pThis = hSession;
716 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
717 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
718
719 /*
720 * Do the job.
721 */
722 rtLocalIpcSessionRetain(pThis);
723
724 int rc = RTCritSectEnter(&pThis->CritSect);
725 if (RT_SUCCESS(rc))
726 {
727 if (pThis->hReadThread == NIL_RTTHREAD)
728 {
729 pThis->hReadThread = RTThreadSelf();
730
731 for (;;)
732 {
733 if (!pThis->fCancelled)
734 {
735 rc = RTCritSectLeave(&pThis->CritSect);
736 AssertRCBreak(rc);
737
738 rc = RTSocketRead(pThis->hSocket, pvBuf, cbToRead, pcbRead);
739
740 /* Detect broken pipe. */
741 if (rc == VINF_SUCCESS)
742 {
743 if (!pcbRead || *pcbRead)
744 { /* likely */ }
745 else if (rtLocalIpcPosixHasHup(pThis))
746 rc = VERR_BROKEN_PIPE;
747 }
748 else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN)
749 rc = VERR_BROKEN_PIPE;
750
751 int rc2 = RTCritSectEnter(&pThis->CritSect);
752 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
753
754 if ( rc == VERR_INTERRUPTED
755 || rc == VERR_TRY_AGAIN)
756 continue;
757 }
758 else
759 rc = VERR_CANCELLED;
760 break;
761 }
762
763 pThis->hReadThread = NIL_RTTHREAD;
764 }
765 int rc2 = RTCritSectLeave(&pThis->CritSect);
766 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
767 }
768
769 rtLocalIpcSessionRelease(pThis);
770 return rc;
771}
772
773
774RTDECL(int) RTLocalIpcSessionReadNB(RTLOCALIPCSESSION hSession, void *pvBuf, size_t cbToRead, size_t *pcbRead)
775{
776 /*
777 * Validate input.
778 */
779 PRTLOCALIPCSESSIONINT pThis = hSession;
780 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
781 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
782
783 /*
784 * Do the job.
785 */
786 rtLocalIpcSessionRetain(pThis);
787
788 int rc = RTCritSectEnter(&pThis->CritSect);
789 if (RT_SUCCESS(rc))
790 {
791 if (pThis->hReadThread == NIL_RTTHREAD)
792 {
793 pThis->hReadThread = RTThreadSelf(); /* not really required, but whatever. */
794
795 for (;;)
796 {
797 if (!pThis->fCancelled)
798 {
799 rc = RTSocketReadNB(pThis->hSocket, pvBuf, cbToRead, pcbRead);
800
801 /* Detect broken pipe. */
802 if (rc == VINF_SUCCESS)
803 {
804 if (!pcbRead || *pcbRead)
805 { /* likely */ }
806 else if (rtLocalIpcPosixHasHup(pThis))
807 rc = VERR_BROKEN_PIPE;
808 }
809 else if (rc == VERR_NET_CONNECTION_RESET_BY_PEER || rc == VERR_NET_SHUTDOWN)
810 rc = VERR_BROKEN_PIPE;
811
812 if (rc == VERR_INTERRUPTED)
813 continue;
814 }
815 else
816 rc = VERR_CANCELLED;
817 break;
818 }
819
820 pThis->hReadThread = NIL_RTTHREAD;
821 }
822 int rc2 = RTCritSectLeave(&pThis->CritSect);
823 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
824 }
825
826 rtLocalIpcSessionRelease(pThis);
827 return rc;
828}
829
830
831RTDECL(int) RTLocalIpcSessionWrite(RTLOCALIPCSESSION hSession, const void *pvBuf, size_t cbToWrite)
832{
833 /*
834 * Validate input.
835 */
836 PRTLOCALIPCSESSIONINT pThis = hSession;
837 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
838 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
839
840 /*
841 * Do the job.
842 */
843 rtLocalIpcSessionRetain(pThis);
844
845 int rc = RTCritSectEnter(&pThis->CritSect);
846 if (RT_SUCCESS(rc))
847 {
848 if (pThis->hWriteThread == NIL_RTTHREAD)
849 {
850 pThis->hWriteThread = RTThreadSelf();
851
852 for (;;)
853 {
854 if (!pThis->fCancelled)
855 {
856 rc = RTCritSectLeave(&pThis->CritSect);
857 AssertRCBreak(rc);
858
859 rc = RTSocketWrite(pThis->hSocket, pvBuf, cbToWrite);
860
861 int rc2 = RTCritSectEnter(&pThis->CritSect);
862 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
863
864 if ( rc == VERR_INTERRUPTED
865 || rc == VERR_TRY_AGAIN)
866 continue;
867 }
868 else
869 rc = VERR_CANCELLED;
870 break;
871 }
872
873 pThis->hWriteThread = NIL_RTTHREAD;
874 }
875 int rc2 = RTCritSectLeave(&pThis->CritSect);
876 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
877 }
878
879 rtLocalIpcSessionRelease(pThis);
880 return rc;
881}
882
883
884RTDECL(int) RTLocalIpcSessionFlush(RTLOCALIPCSESSION hSession)
885{
886 /*
887 * Validate input.
888 */
889 PRTLOCALIPCSESSIONINT pThis = hSession;
890 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
891 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
892
893 /*
894 * This is a no-op because apparently write doesn't return until the
895 * result is read. At least that's what the reply to a 2003-04-08 LKML
896 * posting title "fsync() on unix domain sockets?" indicates.
897 *
898 * For conformity, make sure there isn't any active writes concurrent to this call.
899 */
900 rtLocalIpcSessionRetain(pThis);
901
902 int rc = RTCritSectEnter(&pThis->CritSect);
903 if (RT_SUCCESS(rc))
904 {
905 if (pThis->hWriteThread == NIL_RTTHREAD)
906 rc = RTCritSectLeave(&pThis->CritSect);
907 else
908 {
909 rc = RTCritSectLeave(&pThis->CritSect);
910 if (RT_SUCCESS(rc))
911 rc = VERR_RESOURCE_BUSY;
912 }
913 }
914
915 rtLocalIpcSessionRelease(pThis);
916 return rc;
917}
918
919
920RTDECL(int) RTLocalIpcSessionWaitForData(RTLOCALIPCSESSION hSession, uint32_t cMillies)
921{
922 /*
923 * Validate input.
924 */
925 PRTLOCALIPCSESSIONINT pThis = hSession;
926 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
927 AssertReturn(pThis->u32Magic == RTLOCALIPCSESSION_MAGIC, VERR_INVALID_HANDLE);
928
929 /*
930 * Do the job.
931 */
932 rtLocalIpcSessionRetain(pThis);
933
934 int rc = RTCritSectEnter(&pThis->CritSect);
935 if (RT_SUCCESS(rc))
936 {
937 if (pThis->hReadThread == NIL_RTTHREAD)
938 {
939 pThis->hReadThread = RTThreadSelf();
940 uint64_t const msStart = RTTimeMilliTS();
941 RTMSINTERVAL const cMsOriginalTimeout = cMillies;
942
943 for (;;)
944 {
945 if (!pThis->fCancelled)
946 {
947 rc = RTCritSectLeave(&pThis->CritSect);
948 AssertRCBreak(rc);
949
950 uint32_t fEvents = 0;
951#ifdef RT_OS_OS2
952 /* This doesn't give us any error condition on hangup. */
953 Log(("RTLocalIpcSessionWaitForData: Calling RTSocketSelectOneEx...\n"));
954 rc = RTSocketSelectOneEx(pThis->hSocket, RTPOLL_EVT_READ | RTPOLL_EVT_ERROR, &fEvents, cMillies);
955 Log(("RTLocalIpcSessionWaitForData: RTSocketSelectOneEx returns %Rrc, fEvents=%#x\n", rc, fEvents));
956#else
957/** @todo RTSocketPoll */
958 /* POLLHUP will be set on hangup. */
959 struct pollfd PollFd;
960 RT_ZERO(PollFd);
961 PollFd.fd = RTSocketToNative(pThis->hSocket);
962 PollFd.events = POLLHUP | POLLERR | POLLIN;
963 Log(("RTLocalIpcSessionWaitForData: Calling poll...\n"));
964 int cFds = poll(&PollFd, 1, cMillies == RT_INDEFINITE_WAIT ? -1 : cMillies);
965 if (cFds >= 1)
966 {
967 fEvents = PollFd.revents & (POLLHUP | POLLERR) ? RTPOLL_EVT_ERROR : RTPOLL_EVT_READ;
968 rc = VINF_SUCCESS;
969 }
970 else if (rc == 0)
971 rc = VERR_TIMEOUT;
972 else
973 rc = RTErrConvertFromErrno(errno);
974 Log(("RTLocalIpcSessionWaitForData: poll returns %u (rc=%%d), revents=%#x\n", cFds, rc, PollFd.revents));
975#endif
976
977 int rc2 = RTCritSectEnter(&pThis->CritSect);
978 AssertRCBreakStmt(rc2, rc = RT_SUCCESS(rc) ? rc2 : rc);
979
980 if (RT_SUCCESS(rc))
981 {
982 if (pThis->fCancelled)
983 rc = VERR_CANCELLED;
984 else if (fEvents & RTPOLL_EVT_ERROR)
985 rc = VERR_BROKEN_PIPE;
986 }
987 else if ( rc == VERR_INTERRUPTED
988 || rc == VERR_TRY_AGAIN)
989 {
990 /* Recalc cMillies. */
991 if (cMsOriginalTimeout != RT_INDEFINITE_WAIT)
992 {
993 uint64_t cMsElapsed = RTTimeMilliTS() - msStart;
994 cMillies = cMsElapsed >= cMsOriginalTimeout ? 0 : cMsOriginalTimeout - (RTMSINTERVAL)cMsElapsed;
995 }
996 continue;
997 }
998 }
999 else
1000 rc = VERR_CANCELLED;
1001 break;
1002 }
1003
1004 pThis->hReadThread = NIL_RTTHREAD;
1005 }
1006 int rc2 = RTCritSectLeave(&pThis->CritSect);
1007 AssertStmt(RT_SUCCESS(rc2), rc = RT_SUCCESS(rc) ? rc2 : rc);
1008 }
1009
1010 rtLocalIpcSessionRelease(pThis);
1011 return rc;
1012}
1013
1014
1015RTDECL(int) RTLocalIpcSessionQueryProcess(RTLOCALIPCSESSION hSession, PRTPROCESS pProcess)
1016{
1017 return VERR_NOT_SUPPORTED;
1018}
1019
1020
1021RTDECL(int) RTLocalIpcSessionQueryUserId(RTLOCALIPCSESSION hSession, PRTUID pUid)
1022{
1023 return VERR_NOT_SUPPORTED;
1024}
1025
1026
1027RTDECL(int) RTLocalIpcSessionQueryGroupId(RTLOCALIPCSESSION hSession, PRTGID pGid)
1028{
1029 return VERR_NOT_SUPPORTED;
1030}
1031
1032
1033
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette