root/src/apps/cortex/NodeManager/NodeGroup.cpp
/*
 * Copyright (c) 1999-2000, Eric Moon.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions, and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions, and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * 3. The name of the author may not be used to endorse or promote products
 *    derived from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */


// NodeGroup.cpp

#include "NodeGroup.h"
//#include "NodeGroup_transport_thread.h"

#include "NodeManager.h"
#include "NodeRef.h"

#include <MediaRoster.h>
#include <OS.h>
#include <TimeSource.h>

#include <algorithm>
#include <functional>

#include "array_delete.h"
#include "BasicThread.h"
#include "node_manager_impl.h"
#include "functional_tools.h"

using namespace std;

__USE_CORTEX_NAMESPACE
#define D_METHOD(x) //PRINT (x)
#define D_ROSTER(x) //PRINT (x)
#define D_LOCK(x) //PRINT (x)



// -------------------------------------------------------- //
// *** ctor/dtor
// -------------------------------------------------------- //

// free the group, including all nodes within it
// (this call will result in the eventual deletion of the object.)
// returns B_OK on success; B_NOT_ALLOWED if release() has
// already been called; other error codes if the Media Roster
// call fails.
// * THE MANAGER MUST BE LOCKED

status_t NodeGroup::release() {

        D_METHOD((
                "NodeGroup::release()\n"));
        
        if(isReleased())
                return B_NOT_ALLOWED;

        // clean up             
        lock();

        // halt all nodes
        _stop();
        
        // remove & release all nodes
        // +++++ causes triply-nested lock: eww!
        while(m_nodes.size()) {
                NodeRef* last = m_nodes.back();
                removeNode(m_nodes.size()-1);
                last->release();
        }

        unlock();

// [e.moon 7nov99]
// removing the released group is now NodeManager's responsibility
//
        // remove from NodeManager
        if(!m_manager->lock()) {
                ASSERT(!"* m_manager->lock() failed.\n");
        }
        m_manager->_removeGroup(this);
        m_manager->unlock();
        
        // hand off to IObservable
        return _inherited::release();   
}

// call release() rather than deleting NodeGroup objects
NodeGroup::~NodeGroup() {

        Autolock _l(this);
        D_METHOD((
                "~NodeGroup()\n"));

        ASSERT(!m_nodes.size());        

        if(m_timeSourceObj) {
                m_timeSourceObj->Release();
                m_timeSourceObj = 0;
        }
}


// -------------------------------------------------------- //
// *** accessors
// -------------------------------------------------------- //

// [e.moon 13oct99] moved to header
//inline uint32 NodeGroup::id() const { return m_id; }

// -------------------------------------------------------- //
// *** operations
// -------------------------------------------------------- //

// name access
const char* NodeGroup::name() const {
        Autolock _l(this);
        return m_name.String();
}

status_t NodeGroup::setName(const char* name) {
        Autolock _l(this);
        m_name = name;
        return B_OK;
}

// content access
uint32 NodeGroup::countNodes() const {
        Autolock _l(this);
        return m_nodes.size();
}

NodeRef* NodeGroup::nodeAt(
        uint32                                                                                  index) const {
        Autolock _l(this);
        return (index < m_nodes.size()) ?
                m_nodes[index] :
                0;
}

// add/remove nodes:
// - you may only add a node with no current group.
// - nodes added during playback will be started;
//   nodes removed during playback will be stopped (unless
//   the NO_START_STOP transport restriction flag is set
//   for a given node.)

status_t NodeGroup::addNode(
        NodeRef*                                                                                node) {
        
        D_METHOD((
                "NodeGroup::addNode()\n"));
                
        // lock the manager first; if the node has no current group,
        // this locks the node.
        m_manager->lock();
        
        Autolock _l(this);

        // precondition: GROUP_LOCKED not set
        if(m_flags & GROUP_LOCKED)
                return B_NOT_ALLOWED;

        // precondition: no current group
        if(node->m_group) {
                // [e.moon 28sep99] whoops, forgot one
                PRINT((
                        "!!! node already in group '%s'\n", node->m_group->name()));

                m_manager->unlock(); 
                return B_NOT_ALLOWED;
        }

        // add it
        m_nodes.push_back(node);        
        node->_setGroup(this);

        // release the manager
        m_manager->unlock();
        
        // first node? the transport is now ready to start
        if(m_nodes.size() == 1) {
                _changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
        }
//
//      if(m_syncNode == media_node::null) {
//              // assign as sync node
//              setSyncNode(node->node());
//      }
//      
        // initialize the new node
        status_t err = node->_initTransportState();
        if(err < B_OK)
                return err;

        // set time source
        node->_setTimeSource(m_timeSource.node);

        // set run mode
        node->_setRunMode(m_runMode);   

        // add to cycle set if need be
        // +++++ should I call _cycleAddRef() instead?
        if(node->m_cycle)
                _refCycleChanged(node);
                
        if(m_transportState == TRANSPORT_RUNNING) {
                // +++++ start if necessary!
        }
        // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
        
        // send notification
        if(!LockLooper()) {
                ASSERT(!"LockLooper() failed.");
        }
        BMessage m(M_NODE_ADDED);
        m.AddInt32("groupID", id());
        m.AddInt32("nodeID", node->id());
        notify(&m);
        UnlockLooper();
        
        // success
        return B_OK;
}
                

status_t NodeGroup::removeNode(
        NodeRef*                                                                                node) {

        D_METHOD((
                "NodeGroup::removeNode()\n"));
                
        // lock the manager first; once the node is ungrouped,
        // the manager lock applies to it
        m_manager->lock();
        
        Autolock _l(this);

        // precondition: this must be the node's group
        if(node->m_group != this) {
                // [e.moon 28sep99] whoops, forgot one
                PRINT((
                        "!!! node not in group '%s'\n", node->m_group->name()));

                m_manager->unlock(); 
                return B_NOT_ALLOWED;
        }

        // remove from the cycle set
        if(node->m_cycle)
                _cycleRemoveRef(node);

        // remove it
        ASSERT(m_nodes.size());
        remove(
                m_nodes.begin(),
                m_nodes.end(),
                node);

        // should have removed one and only one entry
        m_nodes.resize(m_nodes.size()-1);

//      // 6aug99: the timesource is now the sync node...
//      // is this the sync node? reassign if so
//
//      if(node->node() == m_syncNode) {
//
//              // look for another sync-capable node
//              bool found = false;
//              for(int n = 0; !found && n < m_nodes.size(); ++n)
//                      if(setSyncNode(m_nodes[n]->node()) == B_OK)
//                              found = true;
//
//              // no luck? admit defeat:               
//              if(!found) {
//                      PRINT((
//                              "* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
//                              
//                      // +++++ stop & set to invalid state?
//                      
//                      setSyncNode(media_node::null);
//              }
//      }

        // stop the node if necessary   
        status_t err = node->_stop();
        if(err < B_OK) {
                PRINT((
                        "*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
                        "    %s\n",
                        node->name(),
                        strerror(err)));
        }

        // clear the node's group pointer
        node->_setGroup(0);

        // release the manager lock; the node is now ungrouped and
        // unlocked
        m_manager->unlock();
        
        // was that the last node? stop/disable the transport if so
        if(!m_nodes.size()) {

                // +++++ kill sync thread(s)
                
                _changeState(TRANSPORT_INVALID);
        }
                
        // send notification
        if(!LockLooper()) {
                ASSERT(!"LockLooper() failed.");
        }
        BMessage m(M_NODE_REMOVED);
        m.AddInt32("groupID", id());
        m.AddInt32("nodeID", node->id());
        notify(&m);
        UnlockLooper();
                
        // success
        return B_OK;
}
        
status_t NodeGroup::removeNode(
        uint32                                                                                  index) {

        D_METHOD((
                "NodeGroup::removeNode(by index)\n"));
                
        // +++++ icky nested lock
        Autolock _l(this);

        ASSERT(m_nodes.size() > index);
        return removeNode(m_nodes[index]);      
}

uint32 NodeGroup::groupFlags() const {
        Autolock _l(this);
        return m_flags;
}

status_t NodeGroup::setGroupFlags(
        uint32                                                                                  flags) {
        Autolock _l(this);
        m_flags = flags;
        return B_OK;
}


// returns true if one or more nodes in the group have cycling
// enabled, and the start- and end-positions are valid
bool NodeGroup::canCycle() const {
        Autolock _l(this);
        
        return
                m_cycleNodes.size() > 0 &&
                m_endPosition - m_startPosition > s_minCyclePeriod;
}

// -------------------------------------------------------- //
// *** TRANSPORT POSITIONING (LOCK REQUIRED)
// -------------------------------------------------------- //

// Fetch the current transport state
        
NodeGroup::transport_state_t NodeGroup::transportState() const {
        Autolock _l(this);
        return m_transportState;
}
        
// Set the starting media time:
//   This is the point at which playback will begin in any media
//   files/documents being played by the nodes in this group.
//   When cycle mode is enabled, this is the point to which each
//   node will be seek'd at the end of each cycle (loop).
//
//   The starting time can't be changed in the B_OFFLINE run mode
//   (this call will return an error.)

status_t NodeGroup::setStartPosition(
        bigtime_t                                                                               start) {
        Autolock _l(this);
        
        D_METHOD((
                "NodeGroup::setStartPosition(%lld)\n", start));
                
        if(
                m_transportState == TRANSPORT_RUNNING ||
                m_transportState == TRANSPORT_ROLLING ||
                m_transportState == TRANSPORT_STARTING) {
        
                if(m_runMode == BMediaNode::B_OFFLINE)
                        return B_NOT_ALLOWED;
                
                ASSERT(m_timeSourceObj);
        
                if(_cycleValid()) {
                        if(m_timeSourceObj->Now() >= m_cycleDeadline) {
                                // too late to change start position; defer
//                              PRINT((" - deferred\n"));
                                m_newStartPosition = start;
                                m_newStart = true;
                                return B_OK;
                        }
                        
                        // not at deadline yet; fall through to set start position
                }
        }
        
        m_startPosition = start;
        
        // +++++ notify [e.moon 11oct99]

        return B_OK;
}
        
// Fetch the starting position:

// +++++ if a previously-set start position was deferred, it won't be
//       returned yet

bigtime_t NodeGroup::startPosition() const {
        Autolock _l(this);
        
        return m_startPosition;
}
                
// Set the ending media time:
//   This is the point at which playback will end relative to
//   media documents begin played by the nodes in this group; 
//   in cycle mode, this specifies the loop point.  If the
//   ending time is less than or equal to the starting time,
//   the transport will continue until stopped manually.
//   If the end position is changed while the transport is playing,
//   it must take effect retroactively (if it's before the current
//   position and looping is enabled, all nodes must 'warp' to
//   the proper post-loop position.)
//
//   The ending time can't be changed if run mode is B_OFFLINE and
//   the transport is running (this call will return an error.)
        
status_t NodeGroup::setEndPosition(
        bigtime_t                                                                               end) {
        Autolock _l(this);
        
        D_METHOD((
                "NodeGroup::setEndPosition(%lld)\n", end));

        if(
                m_transportState == TRANSPORT_RUNNING ||
                m_transportState == TRANSPORT_ROLLING ||
                m_transportState == TRANSPORT_STARTING) {
        
                if(m_runMode == BMediaNode::B_OFFLINE)
                        return B_NOT_ALLOWED;
                
                ASSERT(m_timeSourceObj);
        
                bigtime_t endDelta = end - m_endPosition;
        
                if(_cycleValid()) {
                        if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
                                // too late to change end position; defer
//                              PRINT((" - deferred\n"));
                                m_newEndPosition = end;
                                m_newEnd = true;
                                return B_OK;
                        }
                        else {
                                // set new end position
                                m_endPosition = end;
                                
                                // inform thread
                                ASSERT(m_cyclePort);
                                write_port(
                                        m_cyclePort,
                                        _CYCLE_END_CHANGED,
                                        0,
                                        0);
//                      
//                              // restart nodes' cycle threads with new end position
//                              _cycleInit(m_cycleStart);
//                              for(node_set::iterator it = m_cycleNodes.begin();
//                                      it != m_cycleNodes.end(); ++it) {
//                                      (*it)->_scheduleCycle(m_cycleBoundary);
//                              }
//                              return B_OK;
                        }
                }
        }
                
        m_endPosition = end;

        // +++++ notify [e.moon 11oct99]

        return B_OK;
}


// Fetch the end position:
//   Note that if the end position is less than or equal to the start
//   position, it's ignored.

// +++++ if a previously-set end position was deferred, it won't be
//       returned yet
                
bigtime_t NodeGroup::endPosition() const {
        Autolock _l(this);
        return m_endPosition;
}
                
// -------------------------------------------------------- //
// *** TRANSPORT OPERATIONS (LOCK REQUIRED)
// -------------------------------------------------------- //

// Preroll the group:
//   Seeks, then prerolls, each node in the group (honoring the
//   NO_SEEK and NO_PREROLL flags.)  This ensures that the group
//   can start as quickly as possible.
//
//   Does not return until all nodes in the group have been
//   prepared.

status_t NodeGroup::preroll() {
        D_METHOD((
                "NodeGroup::preroll()\n"));
                
        Autolock _l(this);
        return _preroll();
}

// Start all nodes in the group:
//   Nodes with the NO_START_STOP flag aren't molested.
        
status_t NodeGroup::start() {
        D_METHOD((
                "NodeGroup::start()\n"));
                
        Autolock _l(this);
        return _start();
}

// Stop all nodes in the group:
//   Nodes with the NO_START_STOP flag aren't molested.
        
status_t NodeGroup::stop() {
        D_METHOD((
                "NodeGroup::stop()\n"));
                
        Autolock _l(this);
        return _stop();
}

// Roll all nodes in the group:
//   Queues a start and stop atomically (via BMediaRoster::RollNode()).
//   Returns B_NOT_ALLOWED if endPosition <= startPosition;

status_t NodeGroup::roll() {
        D_METHOD((
                "NodeGroup::roll()\n"));
                
        Autolock _l(this);
        return _roll();
}

// -------------------------------------------------------- //
// *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
// -------------------------------------------------------- //

// time source control:
//   getTimeSource():
//   returns B_ERROR if no time source has been set; otherwise,
//   returns the node ID of the current time source for all
//   nodes in the group.
//
//   setTimeSource():
//   Calls SetTimeSourceFor() on every node in the group.
//   The group must be stopped; B_NOT_ALLOWED will be returned
//   if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
        
status_t NodeGroup::getTimeSource(
        media_node*                                                                     outTimeSource) const {
        Autolock _l(this);

        if(m_timeSource != media_node::null) {
                *outTimeSource = m_timeSource;  
                return B_OK;
        }
        return B_ERROR;
}
                        
status_t NodeGroup::setTimeSource(
        const media_node&                                               timeSource) {

        Autolock _l(this);
        
        if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
                return B_NOT_ALLOWED;
        
        if(m_timeSourceObj)
                m_timeSourceObj->Release();
        
        m_timeSource = timeSource;
        
        // cache a BTimeSource*
        m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
        ASSERT(m_timeSourceObj);
        
        // apply new time source to all nodes
        for_each(
                m_nodes.begin(),
                m_nodes.end(),
#if __GNUC__ <= 2
                bind2nd(
                        mem_fun(&NodeRef::_setTimeSource),
                        m_timeSource.node
                )
#else
                [this](NodeRef* nodeRef) { nodeRef->_setTimeSource(m_timeSource.node); }
#endif
        );
        
//      // try to set as sync node
//      err = setSyncNode(timeSource);
//      if(err < B_OK) {
//              PRINT((
//                      "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
//                      strerror(err)));
//      }
        
        // notify
        if(!LockLooper()) {
                ASSERT(!"LockLooper() failed.");
        }
        BMessage m(M_TIME_SOURCE_CHANGED);
        m.AddInt32("groupID", id());
        m.AddInt32("timeSourceID", timeSource.node);
        notify(&m);
        UnlockLooper();
        
        return B_OK;
}
                
// run mode access:
//   Sets the default run mode for the group.  This will be
//   applied to every node with a wildcard (0) run mode.
//
//   Special case: if the run mode is B_OFFLINE, it will be
//   applied to all nodes in the group.
        
BMediaNode::run_mode NodeGroup::runMode() const {
        Autolock _l(this);
        return m_runMode;
}

status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
        Autolock _l(this);

        m_runMode = mode;
        
        // apply to all nodes
        for_each(
                m_nodes.begin(),
                m_nodes.end(),
#if __GNUC__ <= 2
                bind2nd(
                        mem_fun(&NodeRef::_setRunModeAuto),
                        m_runMode
                )
#else
                [this](NodeRef* ref) { ref->_setRunModeAuto(this->m_runMode); }
#endif
//              bound_method(
//                      *this,
//                      &NodeGroup::setRunModeFor)
        );
        

        return B_OK;
}

// -------------------------------------------------------- //
// *** BHandler
// -------------------------------------------------------- //

void NodeGroup::MessageReceived(
        BMessage*                                                                               message) {

//      PRINT((
//              "NodeGroup::MessageReceived():\n"));
//      message->PrintToStream();
        status_t err;
        
        switch(message->what) {
                case M_SET_TIME_SOURCE:
                        {
                                media_node timeSource;
                                void* data;
                                ssize_t dataSize;
                                err = message->FindData(
                                        "timeSourceNode",
                                        B_RAW_TYPE,
                                        (const void**)&data,
                                        &dataSize);
                                if(err < B_OK) {
                                        PRINT((
                                                "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
                                                "  no timeSourceNode!\n"));
                                        break;
                                }
                                timeSource = *(media_node*)data;

                                setTimeSource(timeSource);
                        }
                        break;
                        
                case M_SET_RUN_MODE:
                        {
                                uint32 runMode;
                                err = message->FindInt32("runMode", (int32*)&runMode);
                                if(err < B_OK) {
                                        PRINT((
                                                "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
                                                "  no runMode!\n"));
                                        break;
                                }
                                
                                if(runMode < BMediaNode::B_OFFLINE ||
                                        runMode > BMediaNode::B_RECORDING) {
                                        PRINT((
                                                "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
                                                "  invalid run mode (%" B_PRIu32 ")\n", runMode));
                                        break;
                                }
                                
                                setRunMode((BMediaNode::run_mode)runMode);
                        }
                        break;
                        
                case M_SET_START_POSITION:
                        {
                                bigtime_t position;
                                err = message->FindInt64("position", (int64*)&position);
                                if(err < B_OK) {
                                        PRINT(( 
                                                "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
                                                "  no position!\n"));
                                        break;
                                }
                                setStartPosition(position);
                        }
                        break;
        
                case M_SET_END_POSITION:
                        {
                                bigtime_t position;
                                err = message->FindInt64("position", (int64*)&position);
                                if(err < B_OK) {
                                        PRINT(( 
                                                "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
                                                "  no position!\n"));
                                        break;
                                }
                                setEndPosition(position);
                        }
                        break;
        
                case M_PREROLL:
                        preroll();
                        break;

                case M_START:
                        start();
                        break;
        
                case M_STOP:
                        stop();
                        break;
                        
                case M_ROLL:
                        roll();
                        break;
        
                default:
                        _inherited::MessageReceived(message);
                        break;
        }
}
                

// -------------------------------------------------------- //
// *** IPersistent
// -------------------------------------------------------- //

// !
#if CORTEX_XML
// !

// +++++

// Default constructor
NodeGroup::NodeGroup() :
        m_manager(0) {} // +++++ finish initialization
        

// !
#endif /*CORTEX_XML*/
// !

// -------------------------------------------------------- //
// *** IObservable:             [19aug99]
// -------------------------------------------------------- //

void NodeGroup::observerAdded(
        const BMessenger&                               observer) {

        BMessage m(M_OBSERVER_ADDED);
        m.AddInt32("groupID", id());
        m.AddMessenger("target", BMessenger(this));
        observer.SendMessage(&m);
}
                
void NodeGroup::observerRemoved(
        const BMessenger&                               observer) {

        BMessage m(M_OBSERVER_REMOVED);
        m.AddInt32("groupID", id());
        m.AddMessenger("target", BMessenger(this));
        observer.SendMessage(&m);
}

void NodeGroup::notifyRelease() {

        BMessage m(M_RELEASED);
        m.AddInt32("groupID", id());
        m.AddMessenger("target", BMessenger(this));
        notify(&m);
}

void NodeGroup::releaseComplete() {
        // +++++
}

// -------------------------------------------------------- //
// *** ILockable: pass lock requests to m_lock
// -------------------------------------------------------- //

bool NodeGroup::lock(
        lock_t type,
        bigtime_t timeout) {
        
        D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
        
        ASSERT(type == WRITE);
        status_t err = m_lock.LockWithTimeout(timeout);
        
        D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));

        return err == B_OK;
}
        
bool NodeGroup::unlock(
        lock_t type) {

        D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
        
        ASSERT(type == WRITE);
        m_lock.Unlock();

        D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));

        return true;
}
        
bool NodeGroup::isLocked(
        lock_t type) const {

        ASSERT(type == WRITE);
        return m_lock.IsLocked();
}
                
// -------------------------------------------------------- //
// *** ctor (accessible to NodeManager)
// -------------------------------------------------------- //

NodeGroup::NodeGroup(
        const char*                                                                     name,
        NodeManager*                                                            manager,
        BMediaNode::run_mode                            runMode) :
        
        ObservableHandler(name),
        m_lock("NodeGroup::m_lock"),
        m_manager(manager),
        m_id(NextID()),
        m_name(name),
        m_flags(0),
        m_transportState(TRANSPORT_INVALID),
        m_runMode(runMode),
        m_timeSourceObj(0),
        m_released(false),
        m_cycleThread(0),
        m_cyclePort(0),
        m_startPosition(0LL),
        m_endPosition(0LL),
        m_newStart(false),
        m_newEnd(false) {

        ASSERT(m_manager);
        
        if(!m_manager->Lock()) {
                ASSERT(!"m_manager->Lock() failed");
        }
        m_manager->AddHandler(this);
        m_manager->Unlock();
        
        // set default time source
        media_node ts;
        D_ROSTER(("# roster->GetTimeSource()\n"));
        status_t err = m_manager->roster->GetTimeSource(&ts);
        if(err < B_OK) {
                PRINT((
                        "*** NodeGroup(): roster->GetTimeSource() failed:\n"
                        "    %s\n", strerror(err)));
        }
        setTimeSource(ts);
}

// -------------------------------------------------------- //
// *** internal operations
// -------------------------------------------------------- //

uint32 NodeGroup::s_nextID = 1;
uint32 NodeGroup::NextID() {
        return atomic_add((int32*)&s_nextID, 1);
}

// -------------------------------------------------------- //
// *** ref->group communication (LOCK REQUIRED)
// -------------------------------------------------------- //

// when a NodeRef's cycle state (ie. looping or not looping)
// changes, it must pass that information on via this method

void NodeGroup::_refCycleChanged(
        NodeRef*                                                                                ref) {
        assert_locked(this);
        D_METHOD((
                "NodeGroup::_refCycleChanged('%s')\n",
                ref->name()));

        if(ref->m_cycle) {
                _cycleAddRef(ref);
        }       else {
                _cycleRemoveRef(ref);
        }

        // +++++ if running & cycle valid, the node should be properly
        //       seek'd and start'd
}
        

// when a cycling node's latency changes, call this method.

void NodeGroup::_refLatencyChanged(
        NodeRef*                                                                                ref) {
        assert_locked(this);
        D_METHOD((
                "NodeGroup::_refLatencyChanged('%s')\n",
                ref->name()));
        
        if(!_cycleValid())
                return;

        // remove & replace ref (positions it properly) 
        _cycleRemoveRef(ref);
        _cycleAddRef(ref);
        
        // slap my thread up
        ASSERT(m_cyclePort);
        write_port(
                m_cyclePort,
                _CYCLE_LATENCY_CHANGED,
                0,
                0);

        // +++++ zat it?
}

// when a NodeRef receives notification that it has been stopped,
// but is labeled as still running, it must call this method.
// [e.moon 11oct99: roll/B_OFFLINE support]

void NodeGroup::_refStopped(
        NodeRef*                                                                                ref) {
        assert_locked(this);
        D_METHOD((
                "NodeGroup::_refStopped('%s')\n",
                ref->name()));
        
        // roll/B_OFFLINE support [e.moon 11oct99]
        // (check to see if any other nodes in the group are still running;
        //  mark group stopped if not.)
        if(m_transportState == TRANSPORT_ROLLING) {
                bool nodesRunning = false;
                for(node_set::iterator it = m_nodes.begin();
                        it != m_nodes.end(); ++it) {
                        if((*it)->isRunning()) {
                                nodesRunning = true;
                                break;
                        }
                }
                if(!nodesRunning)
                        // the group has stopped; update transport state
                        _changeState(TRANSPORT_STOPPED);        

        }
        
}


// -------------------------------------------------------- //
// *** transport helpers (LOCK REQUIRED)
// -------------------------------------------------------- //


// Preroll all nodes in the group; this is the implementation
// of preroll().
// *** this method should not be called from the transport thread
// (since preroll operations can block for a relatively long time.)
        
status_t NodeGroup::_preroll() {
        assert_locked(this);
        
        D_METHOD((
                "NodeGroup::_preroll()\n"));
                
        if(
                m_transportState == TRANSPORT_RUNNING ||
                m_transportState == TRANSPORT_ROLLING)
                // too late
                return B_NOT_ALLOWED;
        
        // * preroll all nodes to the start position

        // +++++ currently, if an error is encountered it's ignored.
        //       should the whole operation fail if one node couldn't
        //       be prerolled?
        //
        //       My gut response is 'no', since the preroll step is
        //       optional, but the caller should have some inkling that
        //       one of its nodes didn't behave.

// [e.moon 13oct99] making PPC compiler happy
//      for_each(
//              m_nodes.begin(),
//              m_nodes.end(),
//              bind2nd(
//                      mem_fun(&NodeRef::_preroll),
//                      m_startPosition
//              )
//      );
        for(node_set::iterator it = m_nodes.begin();
                it != m_nodes.end(); ++it) {
                (*it)->_preroll(m_startPosition);
        }

//    replaces
//              bind2nd(
//                      bound_method(*this, &NodeGroup::prerollNode),
//                      m_startPosition
//              )

        return B_OK;
}


//// functor: calculates latency of each node it's handed, caching
//// the largest one found; includes initial latency if nodes report it.
//
//class NodeGroup::calcLatencyFn { public:
//      bigtime_t& maxLatency;
//      
//      calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
//
//      void operator()(NodeRef* r) {
//              ASSERT(r);
//              
////            PRINT((
////                    "# calcLatencyFn(): '%s'\n",
////                    r->name()));
//                      
//              if(!(r->node().kind & B_BUFFER_PRODUCER)) {
//                      // node can't incur latency
////                    PRINT((
////                            "-   not a producer\n"));
//                      return;
//              }
//              
//              bigtime_t latency;
//              status_t err =
//                      BMediaRoster::Roster()->GetLatencyFor(
//                              r->node(),
//                              &latency);
//              if(err < B_OK) {
//                      PRINT((
//                              "* calcLatencyFn: GetLatencyFor() failed: %s\n",
//                              strerror(err)));
//                      return;
//              }
////            PRINT(("-   %lld\n", latency));
//              
//              bigtime_t add;
//              err = BMediaRoster::Roster()->GetInitialLatencyFor(
//                      r->node(),
//                      &add);
////            PRINT(("-   %lld\n", add));
//              if(err < B_OK) {
//                      PRINT((
//                              "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
//                              strerror(err)));
//              }
//              else
//                      latency += add;
//              
//              if(latency > maxLatency)
//                      maxLatency = latency;
//
////            PRINT((
////                    "-   max latency: %lld\n",
////                    maxLatency));
//      }
//};

// Start all nodes in the group; this is the implementation of
// start().  Fails if the run mode is B_OFFLINE; use _roll() instead
// in that case.
//
// (this may be called from the transport thread or from
//  an API-implementation method.)
                
status_t NodeGroup::_start() {
        assert_locked(this);
        
        D_METHOD((
                "NodeGroup::_start()\n"));
        status_t err;
                
        if(m_transportState != TRANSPORT_STOPPED)
                return B_NOT_ALLOWED;
                
        if(m_runMode == BMediaNode::B_OFFLINE)
                return B_NOT_ALLOWED;

        ASSERT(m_nodes.size());

        _changeState(TRANSPORT_STARTING);

        // * Find the highest latency in the group
        
        bigtime_t offset = 0LL;
        calcLatencyFn _f(offset);
        for_each(
                m_nodes.begin(),
                m_nodes.end(),
                _f);

        offset += s_rosterLatency;
        PRINT((
                "- offset: %" B_PRIdBIGTIME "\n", offset));
        
        // * Seek all nodes (in case one or more failed to preroll)

        for(node_set::iterator it = m_nodes.begin();
                it != m_nodes.end(); ++it) {
                err = (*it)->_seekStopped(m_startPosition);
                if(err < B_OK) {
                        PRINT((
                                "! NodeGroup('%s')::_start():\n"
                                "  ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n"
                                "  %s\n",
                                name(), (*it)->name(), m_startPosition,
                                strerror(err)));

                        // +++++ continue?
                }
        }
        
        // * Start all nodes, allowing for the max latency found

        ASSERT(m_timeSourceObj);
        bigtime_t when = m_timeSourceObj->Now() + offset;

        // 10aug99: initialize cycle (loop) settings
        if(_cycleValid()) {
                _initCycleThread();
                _cycleInit(when);
        }
        
        // start the nodes
        for(node_set::iterator it = m_nodes.begin();
                it != m_nodes.end(); ++it) {
                err = (*it)->_start(when);
                if(err < B_OK) {
                        PRINT((
                                "! NodeGroup('%s')::_start():\n"
                                "  ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n"
                                "  %s\n",
                                name(), (*it)->name(), when,
                                strerror(err)));

                        // +++++ continue?
                }
        }
        
        // notify observers
        _changeState(TRANSPORT_RUNNING);
        return B_OK;    
}
        
// Stop all nodes in the group; this is the implementation of
// stop().
//
// (this may be called from the transport thread or from
//  an API-implementation method.)
                
status_t NodeGroup::_stop() {

        D_METHOD((
                "NodeGroup::_stop()\n"));
                
        assert_locked(this);

        if(
                m_transportState != TRANSPORT_RUNNING &&
                m_transportState != TRANSPORT_ROLLING)
                return B_NOT_ALLOWED;

        _changeState(TRANSPORT_STOPPING);

        // * stop the cycle thread if need be
        _destroyCycleThread();

        // * stop all nodes
        //   +++++ error reports would be nice
        
        for_each(
                m_nodes.begin(),
                m_nodes.end(),
#if __GNUC__ <= 2
                mem_fun(&NodeRef::_stop)
#else
                [](NodeRef* ref) { ref->_stop(); }
#endif
        );

        // update transport state
        _changeState(TRANSPORT_STOPPED);
                
        return B_OK;
}

// Roll all nodes in the group; this is the implementation of
// roll().
//
// (this may be called from the transport thread or from
//  an API-implementation method.)

status_t NodeGroup::_roll() {

        D_METHOD((
                "NodeGroup::_roll()\n"));               
        assert_locked(this);
        status_t err;

        if(m_transportState != TRANSPORT_STOPPED)
                return B_NOT_ALLOWED;
                
        bigtime_t period = m_endPosition - m_startPosition;
        if(period <= 0LL)
                return B_NOT_ALLOWED;
        
        _changeState(TRANSPORT_STARTING);

        bigtime_t tpStart = 0LL;
        bigtime_t tpStop = period;
        
        if(m_runMode != BMediaNode::B_OFFLINE) {
        
                // * Find the highest latency in the group      
                bigtime_t offset = 0LL;
                calcLatencyFn _f(offset);
                for_each(
                        m_nodes.begin(),
                        m_nodes.end(),
                        _f);

                offset += s_rosterLatency;
                PRINT((
                        "- offset: %" B_PRIdBIGTIME "\n", offset));

                ASSERT(m_timeSourceObj);
                tpStart = m_timeSourceObj->Now() + offset;
                tpStop += tpStart;
        }
        
        // * Roll all nodes; watch for errors
        bool allFailed = true;
        err = B_OK;
        for(
                node_set::iterator it = m_nodes.begin();
                it != m_nodes.end(); ++it) {

                status_t e = (*it)->_roll(
                        tpStart,
                        tpStop,
                        m_startPosition);
                if(e < B_OK)
                        err = e;
                else
                        allFailed = false;
        }
        
        if(!allFailed)
                // notify observers
                _changeState(TRANSPORT_ROLLING);

        return err;     
}
        

// State transition; notify listeners
// +++++ [18aug99] DANGER: should notification happen in the middle
//                         of such an operation?
inline void NodeGroup::_changeState(
        transport_state_t                       to) {

        assert_locked(this);
        
        m_transportState = to;

        if(!LockLooper()) {
                ASSERT(!"LockLooper() failed.");
        }
        BMessage m(M_TRANSPORT_STATE_CHANGED);
        m.AddInt32("groupID", id());
        m.AddInt32("transportState", m_transportState);
        notify(&m);
        UnlockLooper();
}

// Enforce a state transition, and notify listeners
inline void NodeGroup::_changeState(
        transport_state_t                       from,
        transport_state_t                       to) {

        assert_locked(this);
        ASSERT(m_transportState == from);
        
        _changeState(to);
}


// -------------------------------------------------------- //
// *** cycle thread & helpers (LOCK REQUIRED)
// -------------------------------------------------------- //

// *** cycle port definitions

const int32                                     _portLength                     = 32;
const char* const               _portName                               = "NodeGroup::m_cyclePort";
const size_t                            _portMsgMaxSize = 256;


// set up the cycle thread (including its kernel port)
status_t NodeGroup::_initCycleThread() {
        assert_locked(this);
        status_t err;
        D_METHOD((
                "NodeGroup::_initCycleThread()\n"));

        if(m_cycleThread) {
                // thread is still alive
                err = _destroyCycleThread();
                if(err < B_OK)
                        return err;
        }

        // create
        m_cycleThreadDone = false;
        m_cycleThread = spawn_thread(
                &_CycleThread,
                "NodeGroup[cycleThread]",
                B_NORMAL_PRIORITY,
                (void*)this);
        if(m_cycleThread < B_OK) {
                PRINT((
                        "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
                        "  %s\n",
                        strerror(m_cycleThread)));
                return m_cycleThread;
        }

        // launch
        return resume_thread(m_cycleThread);
}
        
// shut down the cycle thread/port
status_t NodeGroup::_destroyCycleThread() {
        assert_locked(this);
        status_t err;
        D_METHOD((
                "NodeGroup::_destroyCycleThread()\n"));

        if(!m_cycleThread)
                return B_OK;

        if(!m_cycleThreadDone) {
                // kill the thread              
                ASSERT(m_cyclePort);
                err = write_port_etc(
                        m_cyclePort,
                        _CYCLE_STOP,
                        0,
                        0,
                        B_TIMEOUT,
                        10000LL);

                if(err < B_OK) {
                        // bad thread.  die, thread, die.
                        PRINT((
                                "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
                        delete_port(m_cyclePort);
                        m_cyclePort = 0;
                        kill_thread(m_cycleThread);
                        m_cycleThread = 0;
                        return B_OK;
                }       
        
                // the thread got the message; wait for it to quit
                unlock();
                while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
                        PRINT((
                                "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
                }               
                lock();
        }
        
        // it's up to the thread to close its port
        ASSERT(!m_cyclePort);

        m_cycleThread = 0;
                                
        return B_OK;
}


// 1) do the current positions specify a valid cycle region?
// 2) are any nodes in the group cycle-enabled?

bool NodeGroup::_cycleValid() {
        assert_locked(this);
        return
                (m_transportState == TRANSPORT_RUNNING ||
                 m_transportState == TRANSPORT_STARTING) &&
                 canCycle();
}

// initialize the cycle members (call when starting)

void NodeGroup::_cycleInit(
        bigtime_t                                                                               startTime) {
        assert_locked(this);
        ASSERT(m_cycleNodes.size() > 0);
        D_METHOD((
                "NodeGroup::_cycleInit(%lld)\n",
                startTime));

        // +++++ rescan latencies?

        // figure new boundary & deadline from region length
        bigtime_t cyclePeriod = m_endPosition - m_startPosition;

        if(cyclePeriod <= 0) {
                // cycle region is no longer valid
                m_cycleBoundary = 0LL;
                m_cycleDeadline = 0LL;

//              no no no -- deadlocks when the thread calls this method
//              // stop the thread
//              _destroyCycleThread();
                return;
        }
        
        m_cycleStart = startTime;
        m_cycleBoundary = startTime + cyclePeriod;
        m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
}


// add a ref to the cycle set (in proper order, based on latency)
void NodeGroup::_cycleAddRef(
        NodeRef*                                                                                ref) {
        assert_locked(this);
        
        // make sure it's not already there
        ASSERT(find(
                m_cycleNodes.begin(),
                m_cycleNodes.end(),
                ref) == m_cycleNodes.end());

        // [re]calc latency if 0
        if(!ref->m_latency)
                ref->_updateLatency();

        node_set::iterator it;
        for(it = m_cycleNodes.begin();
                it != m_cycleNodes.end(); ++it) {
                if(ref->m_latency > (*it)->m_latency) {
                        m_cycleNodes.insert(it, ref);
                        break;
                }
        }
        
        // not inserted? new ref belongs at the end
        if(it == m_cycleNodes.end())
                m_cycleNodes.insert(it, ref);
}
                
// remove a ref from the cycle set
void NodeGroup::_cycleRemoveRef(
        NodeRef*                                                                                ref) {
        assert_locked(this);
        
        node_set::iterator it = find(
                m_cycleNodes.begin(),
                m_cycleNodes.end(),
                ref);
        ASSERT(it != m_cycleNodes.end());
        m_cycleNodes.erase(it);
}

bigtime_t NodeGroup::_cycleBoundary() const {
        Autolock _l(this);
        return m_cycleBoundary;
}

// cycle thread impl.
/*static*/
status_t NodeGroup::_CycleThread(void* user) {
        ((NodeGroup*)user)->_cycleThread();
        return B_OK;
}

void NodeGroup::_cycleThread() {

        status_t err;
        int32 code;
        int32 errorCount = 0;

        // +++++ liability -- if the thread has to be killed, this buffer
        //       won't be reclaimed
        char* msgBuffer = new char[_portMsgMaxSize];
        array_delete<char> _d(msgBuffer);
        
        // create port
        ASSERT(!m_cyclePort);
        m_cyclePort = create_port(
                _portLength,
                _portName);
        ASSERT(m_cyclePort >= B_OK);
        
        // the message-handling loop
        bool done = false;      
        while(!done) {

                // *** wait until it's time to queue the next cycle, or until
                // *** a message arrives

                lock();                         // **** BEGIN LOCKED SECTION ****
                if(!_cycleValid()) {
                        unlock();
                        break;
                }

                ASSERT(m_cycleNodes.size() > 0);
                ASSERT(m_timeSourceObj);

                bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
                bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
                        m_cycleBoundary, maxLatency + s_rosterLatency);
                bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();

                if(timeout <= 0) {
                        // +++++ whoops, I'm late.
                        // +++++ adjust to compensate !!!
                        PRINT((
                                "*** NodeGroup::_cycleThread(): LATE\n"
                                "    by %" B_PRIdBIGTIME "\n", -timeout));
                }
                
                // +++++ if timeout is very short, spin until the target time arrives
                
                unlock();                       // **** END LOCKED SECTION ****
                
                // block until message arrives or it's time to wake up
                err = read_port_etc(
                        m_cyclePort,
                        &code,
                        msgBuffer,
                        _portMsgMaxSize,
                        B_TIMEOUT,
                        timeout);

                if(err == B_TIMED_OUT) {
                        // the time has come to seek my nodes
                        _handleCycleService();
                        continue;
                }               
                else if(err < B_OK) {
                        // any other error is bad news
                        PRINT((
                                "* NodeGroup::_cycleThread(): read_port error:\n"
                                "  %s\n"
                                "  ABORTING\n\n", strerror(err)));
                        if(++errorCount > 10) {
                                PRINT((
                                        "*** Too many errors; aborting.\n"));
                                break;
                        }
                        continue;
                }
                
                errorCount = 0;
                
                // process the message
                switch(code) {
                        case _CYCLE_STOP:
                                // bail
                                done = true;
                                break;
                                
                        case _CYCLE_END_CHANGED:
                        case _CYCLE_LATENCY_CHANGED:
                                // fall through to next loop; for now, these messages
                                // serve only to slap me out of my stupor and reassess
                                // the timing situation...
                                break;
                                
                        default:
                                PRINT((
                                        "* NodeGroup::_cycleThread(): unknown message code '%"
                                                B_PRId32 "'\n", code));
                                break;
                }
        } // while(!done)

        
        // delete port
        delete_port(m_cyclePort);
        m_cyclePort = 0;

        // done
        m_cycleThreadDone = true;
}

// cycle service: seek all nodes & initiate next cycle
void NodeGroup::_handleCycleService() {
        Autolock _l(this);
//      D_METHOD((
//              "NodeGroup::_handleCycleService()\n"));
        status_t err;
                
        if(!_cycleValid()) {
//              PRINT((
//                      "- _handleCycleService(): cycle not valid; quitting.\n"));
                return;
        }
        
        // seek
        for(node_set::iterator it = m_cycleNodes.begin();
                it != m_cycleNodes.end(); ++it) {
                err = (*it)->_seek(
                        m_startPosition,
                        m_cycleBoundary);
                if(err < B_OK) {
                        PRINT((
                                "- _handleCycleService(): node('%s')::_seek() failed:\n"
                                "  %s\n",
                                (*it)->name(), strerror(err)));
                }
        }

        // update cycle settings
        if(m_newStart) {
                m_newStart = false;
                m_startPosition = m_newStartPosition;
        }
        if(m_newEnd) {
                m_newEnd = false;
                m_endPosition = m_newEndPosition;
        }

        // prepare next cycle
        _cycleInit(m_cycleBoundary);
}

// END -- NodeGroup.cpp --