⛏️ index : haiku.git

/******************************************************************************

	File:	BufferStream.h

	Copyright 1995-97, Be Incorporated

******************************************************************************/
#ifndef _BUFFER_STREAM_H
#define _BUFFER_STREAM_H

#include <stdlib.h>
#include <OS.h>
#include <SupportDefs.h>
#include <Locker.h>
#include <Messenger.h>


class BSubscriber;


/* ================
   Per-subscriber information.
   ================ */

struct _sbuf_info;

typedef struct _sub_info {
  _sub_info			*fNext;		/* next subscriber in the stream*/
  _sub_info			*fPrev;		/* previous subscriber in the stream */
  _sbuf_info		*fRel;		/* next buf to be released */
  _sbuf_info		*fAcq;		/* next buf to be acquired */
  sem_id			fSem;		/* semaphore used for blocking */
  bigtime_t			fTotalTime;	/* accumulated time between acq/rel */
  int32				fHeld;		/* # of buffers acq'd but not yet rel'd */
  sem_id			fBlockedOn;	/* the semaphore being waited on */
								/* or B_BAD_SEM_ID if not blocked */
} *subscriber_id;


/* ================
   Per-buffer information
   ================ */

typedef struct _sbuf_info {
  _sbuf_info		*fNext;		/* next "newer" buffer in the chain */
  subscriber_id		fAvailTo;	/* next subscriber to acquire this buffer */
  subscriber_id		fHeldBy;	/* subscriber that's acquired this buffer */
  bigtime_t			fAcqTime;	/* time at which this buffer was acquired */
  area_id			fAreaID;	/* for system memory allocation calls */
  char				*fAddress;
  int32				fSize;     /* usable portion can be smaller than ... */
  int32				fAreaSize; /* ... the size of the area. */
  bool				fIsFinal;	/* TRUE => stream is stopping */
} *buffer_id;


/* ================
   Interface definition for BBufferStream class
   ================ */

/* We've chosen B_MAX_SUBSCRIBER_COUNT and B_MAX_BUFFER_COUNT to be small
 * enough so that a BBufferStream structure fits in one 4096 byte page.
 */
#define	B_MAX_SUBSCRIBER_COUNT	52
#define	B_MAX_BUFFER_COUNT		32

class BBufferStream;
class BBufferStreamManager;

typedef BBufferStream* stream_id;  // for now


class BAbstractBufferStream
{
public:
#if __GNUC__ > 3
	virtual				~BAbstractBufferStream();
#endif

	virtual	status_t	GetStreamParameters(size_t *bufferSize,
											int32 *bufferCount,
											bool *isRunning,
											int32 *subscriberCount) const;

	virtual	status_t	SetStreamBuffers(size_t bufferSize, 
										 int32 bufferCount);

	virtual	status_t	StartStreaming();
	virtual	status_t	StopStreaming();

protected:

virtual	void		_ReservedAbstractBufferStream1();
virtual	void		_ReservedAbstractBufferStream2();
virtual	void		_ReservedAbstractBufferStream3();
virtual	void		_ReservedAbstractBufferStream4();

	friend class BSubscriber;
	friend class BBufferStreamManager;

	virtual stream_id	StreamID() const; 
	/* stream identifier for direct access */

	/* Create or delete a subscriber id for subsequent operations */
	virtual	status_t	Subscribe(char *name,
								  subscriber_id *subID,
								  sem_id semID);
	virtual	status_t	Unsubscribe(subscriber_id subID);

/* Enter into or quit the stream */
	virtual	status_t	EnterStream(subscriber_id subID, 
									subscriber_id neighbor,
									bool before);

	virtual	status_t	ExitStream(subscriber_id subID);

	virtual BMessenger*	Server() const;	/* message pipe to server */
	status_t 		SendRPC(BMessage* msg, BMessage* reply = NULL) const;
};


class BBufferStream : public BAbstractBufferStream
{
public:

						BBufferStream(size_t headerSize,
									  BBufferStreamManager* controller,
									  BSubscriber* headFeeder,
									  BSubscriber* tailFeeder);
		virtual			~BBufferStream();
				
/* BBufferStreams are allocated on shared memory pages */
		void			*operator new(size_t size);
		void			operator delete(void *stream, size_t size);

/* Return header size */
		size_t			HeaderSize() const;

/* These four functions are delegated to the stream controller */
		status_t		GetStreamParameters(size_t *bufferSize,
											int32 *bufferCount,
											bool *isRunning,
											int32 *subscriberCount) const;

		status_t		SetStreamBuffers(size_t bufferSize, 
										 int32 bufferCount);

		status_t		StartStreaming();
		status_t		StopStreaming();

/* Get the controller for delegation */
		BBufferStreamManager *StreamManager() const;

/* number of buffers in stream */
		int32			CountBuffers() const;

/* Create or delete a subscriber id for subsequent operations */
		status_t		Subscribe(char *name, 
								  subscriber_id *subID,
								  sem_id semID);

		status_t		Unsubscribe(subscriber_id subID);

/* Enter into or quit the stream */
		status_t		EnterStream(subscriber_id subID, 
									subscriber_id neighbor,
									bool before);

		status_t		ExitStream(subscriber_id subID);

/* queries about a subscriber */
		bool			IsSubscribed(subscriber_id subID);
		bool			IsEntered(subscriber_id subID);

		status_t		SubscriberInfo(subscriber_id subID,
									   char** name,
									   stream_id* streamID,
									   int32* position);

/* Force an error return of a subscriber if it's blocked */
		status_t		UnblockSubscriber(subscriber_id subID);

/* Acquire and release a buffer */
		status_t		AcquireBuffer(subscriber_id subID, 
									  buffer_id *bufID,
									  bigtime_t timeout);
		status_t		ReleaseBuffer(subscriber_id subID);

/* Get the attributes of a particular buffer */
		size_t			BufferSize(buffer_id bufID) const;
		char			*BufferData(buffer_id bufID) const;
		bool			IsFinalBuffer(buffer_id bufID) const;

/* Get attributes of a particular subscriber */
		int32			CountBuffersHeld(subscriber_id subID);

/* Queries for the BBufferStream */
		int32			CountSubscribers() const;
		int32			CountEnteredSubscribers() const;

		subscriber_id	FirstSubscriber() const;
		subscriber_id	LastSubscriber() const;
		subscriber_id	NextSubscriber(subscriber_id subID);
		subscriber_id	PrevSubscriber(subscriber_id subID);

/* debugging aids */
		void			PrintStream();
		void			PrintBuffers();
		void			PrintSubscribers();

/* gaining exclusive access to the BBufferStream */
		bool 			Lock();
		void			Unlock();

/* introduce a new buffer into the "newest" end of the chain */
		status_t		AddBuffer(buffer_id bufID);

/* remove a buffer from the "oldest" end of the chain */
		buffer_id		RemoveBuffer(bool force);

/* allocate a buffer from shared memory and create a bufID for it. */
		buffer_id		CreateBuffer(size_t size, bool isFinal);

/* deallocate a buffer and returns its bufID to the freelist */
		void			DestroyBuffer(buffer_id bufID);

/* remove and destroy any "newest" buffers from the head of the chain
 * that have not yet been claimed by any subscribers.  If there are
 * no subscribers, this clears the entire chain.
 */
		void			RescindBuffers();

/* ================
   Private member functions that assume locking already has been done.
   ================ */

private:

virtual	void		_ReservedBufferStream1();
virtual	void		_ReservedBufferStream2();
virtual	void		_ReservedBufferStream3();
virtual	void		_ReservedBufferStream4();

/* initialize the free list of subscribers */
		void			InitSubscribers();

/* return TRUE if subID appears valid */
		bool			IsSubscribedSafe(subscriber_id subID) const;

/* return TRUE if subID is entered into the stream */
		bool			IsEnteredSafe(subscriber_id subID) const;

/* initialize the free list of buffer IDs */
		void			InitBuffers();

/* Wake a blocked subscriber */
		status_t		WakeSubscriber(subscriber_id subID);

/* Give subID all the buffers it can get */
		void			InheritBuffers(subscriber_id subID);

/* Relinquish any buffers held by subID */
		void			BequeathBuffers(subscriber_id subID);

/* Fast version of ReleaseBuffer() */
		status_t		ReleaseBufferSafe(subscriber_id subID);

/* Release a buffer to a subscriber */
		status_t		ReleaseBufferTo(buffer_id bufID, subscriber_id subID);

/* deallocate all buffers */
		void			FreeAllBuffers();

/* deallocate all subscribers */
		void			FreeAllSubscribers();

/* ================
   Private data members
   ================ */

		BLocker				fLock;
		area_id				fAreaID;		/* area id for this BBufferStream */
		BBufferStreamManager	*fStreamManager;
		BSubscriber			*fHeadFeeder;
		BSubscriber			*fTailFeeder;
		size_t				fHeaderSize;

		/* ================
		   subscribers
		   ================ */

		_sub_info			*fFreeSubs;		/* free list of subscribers */
		_sub_info			*fFirstSub;		/* first entered in itinierary */
		_sub_info			*fLastSub;		/* last entered in itinerary */

		sem_id				fFirstSem;		/* semaphore used by fFirstSub */
		int32				fSubCount;
		int32				fEnteredSubCount;

		_sub_info			fSubscribers[B_MAX_SUBSCRIBER_COUNT];

		/* ================
		   buffers
		   ================ */

		_sbuf_info			*fFreeBuffers;
		_sbuf_info			*fOldestBuffer;	/* first in line */
		_sbuf_info			*fNewestBuffer;	/* fNewest->fNext = NULL */
		int32				fCountBuffers;

		_sbuf_info			fBuffers[B_MAX_BUFFER_COUNT];

		uint32				_reserved[4];
};

#endif 	// #ifdef _BUFFER_STREAM_H