Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

ProxyPullConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00032 //
00033 //  ProxyPullConsumerManager
00034 //
00035 
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   DB(20,"ProxyPullConsumerManager::incarnate()")
00043   ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044   _servants.insert(result);
00045   return result;
00046 }
00047 
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049   PortableServer::POA_ptr parentPoa,
00050   list<CORBA::Any*>&      q
00051 )
00052 : ProxyManager(parentPoa,"ProxyPullConsumer"),
00053   _queue(q)
00054 {
00055   // pass
00056 }
00057 
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060   DB(20,"~ProxyPullConsumerManager()")
00061 }
00062 
00063 CosEventChannelAdmin::ProxyPullConsumer_ptr
00064 ProxyPullConsumerManager::createObject()
00065 {
00066   return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00067            _managedPoa.in(),
00068            CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00069          );
00070 }
00071 
00072 void ProxyPullConsumerManager::trigger()
00073 {
00074   // Trigger each servant in turn.
00075   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00076   {
00077     ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00078     proxy->trigger();
00079   }
00080 }
00081 
00082 void ProxyPullConsumerManager::disconnect()
00083 {
00084   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00085   {
00086     Proxy* p =*i; // Sun's CC requires this temporary.
00087     ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00088     ppc->disconnect_pull_consumer();
00089   }
00090 }
00091 
00092 
00093 //
00094 //  ProxyPullConsumer_i
00095 //
00096 
00097 // CORBA interface methods
00098 
00099 void ProxyPullConsumer_i::connect_pull_supplier(
00100   CosEventComm::PullSupplier_ptr pullSupplier
00101 )
00102 {
00103   if(CORBA::is_nil(pullSupplier))
00104       throw CORBA::BAD_PARAM();
00105   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00106       throw CosEventChannelAdmin::AlreadyConnected();
00107   _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00108 
00109   output(WriteLock().os);
00110 }
00111 
00112 void ProxyPullConsumer_i::disconnect_pull_consumer()
00113 {
00114   DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00115   eraseKey("SupplierAdmin/ProxyPullConsumer");
00116   deactivateObject();
00117   if(CORBA::is_nil(_target))
00118   {
00119     throw CORBA::OBJECT_NOT_EXIST(
00120       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00121       CORBA::COMPLETED_NO
00122     );
00123   }
00124   else
00125   {
00126     CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00127     req->send_deferred();
00128     Orb::inst().deferredRequest(req._retn());
00129     _target=CosEventComm::PullSupplier::_nil();
00130   }
00131 }
00132 
00133 //
00134 
00135 ProxyPullConsumer_i::ProxyPullConsumer_i(
00136   PortableServer::POA_ptr poa,
00137   list<CORBA::Any*>&      q
00138 )
00139 : Proxy(poa),
00140   _target(CosEventComm::PullSupplier::_nil()),
00141   _queue(q)
00142 {
00143   _exceptionCount[Pull]=0;
00144   _exceptionCount[TryPull]=0;
00145 }
00146 
00147 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00148 {
00149   DB(20,"~ProxyPullConsumer_i()")
00150 }
00151 
00152 void ProxyPullConsumer_i::trigger()
00153 {
00154   // Prefer 'pull' method calls.
00155   CORBA::String_var opname ="pull";
00156 
00157   if(!CORBA::is_nil(_req) && _req->poll_response()) 
00158   {
00159     opname=CORBA::string_dup(_req->operation());
00160 
00161     CORBA::Environment_ptr env=_req->env(); // No need to release environment.
00162     if(!CORBA::is_nil(env) && env->exception()) 
00163     {
00164       CORBA::Exception* ex =env->exception(); // No need to free exception.
00165       DB(10,"ProxyPullConsumer got exception"
00166            IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00167       if(0==strcmp("pull",opname))
00168       {
00169         ++(_exceptionCount[Pull]);
00170         opname="try_pull"; // Try something else next time.
00171       }
00172       else if(0==strcmp("try_pull",opname))
00173       {
00174         ++(_exceptionCount[TryPull]);
00175         opname="pull"; // Try something else next time.
00176       }
00177       else
00178           DB(2,"Ignoring unrecognised response. operation:"<<opname);
00179       if(_exceptionCount[Pull]>=2 && _exceptionCount[TryPull]>=2)
00180       {
00181         Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00182 
00183         // Try to notify the Supplier that the connection is closing.
00184         CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00185         req->send_deferred();
00186         Orb::inst().deferredRequest(req._retn());
00187 
00188         _target=CosEventComm::PullSupplier::_nil(); // disconnected
00189         eraseKey("SupplierAdmin/ProxyPullConsumer");
00190         deactivateObject();
00191       }
00192     }
00193     else  
00194     {
00195       // Do we have an event?
00196       bool hasEvent=false;
00197       if(0==strcmp("pull",opname))
00198       {
00199         _exceptionCount[Pull]=0;
00200         hasEvent=true;
00201       }
00202       else if(0==strcmp("try_pull",opname))
00203       {
00204         _exceptionCount[TryPull]=0;
00205         CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
00206         if(args->count()==1)
00207         {
00208           CORBA::NamedValue_var hasEventArg=args->item(0);
00209           if(0==strcmp(hasEventArg->name(),"has_event"))
00210           {
00211             CORBA::Any* a =hasEventArg->value();
00212             CORBA::Boolean b;
00213             CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
00214             hasEvent=(((*a)>>=tb) && b);
00215           }
00216         }
00217       }
00218       // Pick up an event, if we have one.
00219       if(hasEvent)
00220       {
00221         CORBA::Any* event =new CORBA::Any();
00222         _req->return_value() >>= (*event);
00223         _queue.push_back(event);
00224       }
00225     }
00226     _req=CORBA::Request::_nil();
00227   }
00229   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00230   {
00231     _req=_target->_request(opname);
00232     _req->set_return_type(CORBA::_tc_any);
00233     if(0==strcmp("try_pull",opname))
00234         _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00235     _req->send_deferred();
00236   }
00237 }
00238 
00239 void ProxyPullConsumer_i::reincarnate(
00240   const string&      oid,
00241   const PersistNode& node
00242 )
00243 {
00244   CosEventComm::PullSupplier_var pullSupplier =
00245     string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00246   // Do not activate until we know that we have read a valid target.
00247   activateObjectWithId(oid.c_str());
00248   connect_pull_supplier(pullSupplier.in());
00249 }
00250 
00251 void ProxyPullConsumer_i::output(ostream& os)
00252 {
00253   basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00254 }
00255 
00256 }; // end namespace OmniEvents

Generated on Fri Nov 19 17:42:20 2004 for OmniEvents by doxygen1.2.15