File: BufferStream.h
Copyright 1995-97, Be Incorporated
******************************************************************************/
#ifndef _BUFFER_STREAM_H
#define _BUFFER_STREAM_H
#include <stdlib.h>
#include <OS.h>
#include <SupportDefs.h>
#include <Locker.h>
#include <Messenger.h>
class BSubscriber;
Per-subscriber information.
================ */
struct _sbuf_info;
typedef struct _sub_info {
_sub_info *fNext;
_sub_info *fPrev;
_sbuf_info *fRel;
_sbuf_info *fAcq;
sem_id fSem;
bigtime_t fTotalTime;
int32 fHeld;
sem_id fBlockedOn;
} *subscriber_id;
Per-buffer information
================ */
typedef struct _sbuf_info {
_sbuf_info *fNext;
subscriber_id fAvailTo;
subscriber_id fHeldBy;
bigtime_t fAcqTime;
area_id fAreaID;
char *fAddress;
int32 fSize;
int32 fAreaSize;
bool fIsFinal;
} *buffer_id;
Interface definition for BBufferStream class
================ */
* enough so that a BBufferStream structure fits in one 4096 byte page.
*/
#define B_MAX_SUBSCRIBER_COUNT 52
#define B_MAX_BUFFER_COUNT 32
class BBufferStream;
class BBufferStreamManager;
typedef BBufferStream* stream_id;
class BAbstractBufferStream
{
public:
#if __GNUC__ > 3
virtual ~BAbstractBufferStream();
#endif
virtual status_t GetStreamParameters(size_t *bufferSize,
int32 *bufferCount,
bool *isRunning,
int32 *subscriberCount) const;
virtual status_t SetStreamBuffers(size_t bufferSize,
int32 bufferCount);
virtual status_t StartStreaming();
virtual status_t StopStreaming();
protected:
virtual void _ReservedAbstractBufferStream1();
virtual void _ReservedAbstractBufferStream2();
virtual void _ReservedAbstractBufferStream3();
virtual void _ReservedAbstractBufferStream4();
friend class BSubscriber;
friend class BBufferStreamManager;
virtual stream_id StreamID() const;
virtual status_t Subscribe(char *name,
subscriber_id *subID,
sem_id semID);
virtual status_t Unsubscribe(subscriber_id subID);
virtual status_t EnterStream(subscriber_id subID,
subscriber_id neighbor,
bool before);
virtual status_t ExitStream(subscriber_id subID);
virtual BMessenger* Server() const;
status_t SendRPC(BMessage* msg, BMessage* reply = NULL) const;
};
class BBufferStream : public BAbstractBufferStream
{
public:
BBufferStream(size_t headerSize,
BBufferStreamManager* controller,
BSubscriber* headFeeder,
BSubscriber* tailFeeder);
virtual ~BBufferStream();
void *operator new(size_t size);
void operator delete(void *stream, size_t size);
size_t HeaderSize() const;
status_t GetStreamParameters(size_t *bufferSize,
int32 *bufferCount,
bool *isRunning,
int32 *subscriberCount) const;
status_t SetStreamBuffers(size_t bufferSize,
int32 bufferCount);
status_t StartStreaming();
status_t StopStreaming();
BBufferStreamManager *StreamManager() const;
int32 CountBuffers() const;
status_t Subscribe(char *name,
subscriber_id *subID,
sem_id semID);
status_t Unsubscribe(subscriber_id subID);
status_t EnterStream(subscriber_id subID,
subscriber_id neighbor,
bool before);
status_t ExitStream(subscriber_id subID);
bool IsSubscribed(subscriber_id subID);
bool IsEntered(subscriber_id subID);
status_t SubscriberInfo(subscriber_id subID,
char** name,
stream_id* streamID,
int32* position);
status_t UnblockSubscriber(subscriber_id subID);
status_t AcquireBuffer(subscriber_id subID,
buffer_id *bufID,
bigtime_t timeout);
status_t ReleaseBuffer(subscriber_id subID);
size_t BufferSize(buffer_id bufID) const;
char *BufferData(buffer_id bufID) const;
bool IsFinalBuffer(buffer_id bufID) const;
int32 CountBuffersHeld(subscriber_id subID);
int32 CountSubscribers() const;
int32 CountEnteredSubscribers() const;
subscriber_id FirstSubscriber() const;
subscriber_id LastSubscriber() const;
subscriber_id NextSubscriber(subscriber_id subID);
subscriber_id PrevSubscriber(subscriber_id subID);
void PrintStream();
void PrintBuffers();
void PrintSubscribers();
bool Lock();
void Unlock();
status_t AddBuffer(buffer_id bufID);
buffer_id RemoveBuffer(bool force);
buffer_id CreateBuffer(size_t size, bool isFinal);
void DestroyBuffer(buffer_id bufID);
* that have not yet been claimed by any subscribers. If there are
* no subscribers, this clears the entire chain.
*/
void RescindBuffers();
Private member functions that assume locking already has been done.
================ */
private:
virtual void _ReservedBufferStream1();
virtual void _ReservedBufferStream2();
virtual void _ReservedBufferStream3();
virtual void _ReservedBufferStream4();
void InitSubscribers();
bool IsSubscribedSafe(subscriber_id subID) const;
bool IsEnteredSafe(subscriber_id subID) const;
void InitBuffers();
status_t WakeSubscriber(subscriber_id subID);
void InheritBuffers(subscriber_id subID);
void BequeathBuffers(subscriber_id subID);
status_t ReleaseBufferSafe(subscriber_id subID);
status_t ReleaseBufferTo(buffer_id bufID, subscriber_id subID);
void FreeAllBuffers();
void FreeAllSubscribers();
Private data members
================ */
BLocker fLock;
area_id fAreaID;
BBufferStreamManager *fStreamManager;
BSubscriber *fHeadFeeder;
BSubscriber *fTailFeeder;
size_t fHeaderSize;
subscribers
================ */
_sub_info *fFreeSubs;
_sub_info *fFirstSub;
_sub_info *fLastSub;
sem_id fFirstSem;
int32 fSubCount;
int32 fEnteredSubCount;
_sub_info fSubscribers[B_MAX_SUBSCRIBER_COUNT];
buffers
================ */
_sbuf_info *fFreeBuffers;
_sbuf_info *fOldestBuffer;
_sbuf_info *fNewestBuffer;
int32 fCountBuffers;
_sbuf_info fBuffers[B_MAX_BUFFER_COUNT];
uint32 _reserved[4];
};
#endif