00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00032
00033
00034
00035
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042 DB(20,"ProxyPullConsumerManager::incarnate()")
00043 ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044 _servants.insert(result);
00045 return result;
00046 }
00047
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049 PortableServer::POA_ptr parentPoa,
00050 list<CORBA::Any*>& q
00051 )
00052 : ProxyManager(parentPoa,"ProxyPullConsumer"),
00053 _queue(q)
00054 {
00055
00056 }
00057
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060 DB(20,"~ProxyPullConsumerManager()")
00061 }
00062
00063 CosEventChannelAdmin::ProxyPullConsumer_ptr
00064 ProxyPullConsumerManager::createObject()
00065 {
00066 return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00067 _managedPoa.in(),
00068 CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00069 );
00070 }
00071
00072 void ProxyPullConsumerManager::trigger()
00073 {
00074
00075 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00076 {
00077 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00078 proxy->trigger();
00079 }
00080 }
00081
00082 void ProxyPullConsumerManager::disconnect()
00083 {
00084 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00085 {
00086 Proxy* p =*i;
00087 ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00088 ppc->disconnect_pull_consumer();
00089 }
00090 }
00091
00092
00093
00094
00095
00096
00097
00098
00099 void ProxyPullConsumer_i::connect_pull_supplier(
00100 CosEventComm::PullSupplier_ptr pullSupplier
00101 )
00102 {
00103 if(CORBA::is_nil(pullSupplier))
00104 throw CORBA::BAD_PARAM();
00105 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00106 throw CosEventChannelAdmin::AlreadyConnected();
00107 _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00108
00109 output(WriteLock().os);
00110 }
00111
00112 void ProxyPullConsumer_i::disconnect_pull_consumer()
00113 {
00114 DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00115 eraseKey("SupplierAdmin/ProxyPullConsumer");
00116 deactivateObject();
00117 if(CORBA::is_nil(_target))
00118 {
00119 throw CORBA::OBJECT_NOT_EXIST(
00120 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00121 CORBA::COMPLETED_NO
00122 );
00123 }
00124 else
00125 {
00126 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00127 req->send_deferred();
00128 Orb::inst().deferredRequest(req._retn());
00129 _target=CosEventComm::PullSupplier::_nil();
00130 }
00131 }
00132
00133
00134
00135 ProxyPullConsumer_i::ProxyPullConsumer_i(
00136 PortableServer::POA_ptr poa,
00137 list<CORBA::Any*>& q
00138 )
00139 : Proxy(poa),
00140 _target(CosEventComm::PullSupplier::_nil()),
00141 _queue(q)
00142 {
00143 _exceptionCount[Pull]=0;
00144 _exceptionCount[TryPull]=0;
00145 }
00146
00147 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00148 {
00149 DB(20,"~ProxyPullConsumer_i()")
00150 }
00151
00152 void ProxyPullConsumer_i::trigger()
00153 {
00154
00155 CORBA::String_var opname ="pull";
00156
00157 if(!CORBA::is_nil(_req) && _req->poll_response())
00158 {
00159 opname=CORBA::string_dup(_req->operation());
00160
00161 CORBA::Environment_ptr env=_req->env();
00162 if(!CORBA::is_nil(env) && env->exception())
00163 {
00164 CORBA::Exception* ex =env->exception();
00165 DB(10,"ProxyPullConsumer got exception"
00166 IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00167 if(0==strcmp("pull",opname))
00168 {
00169 ++(_exceptionCount[Pull]);
00170 opname="try_pull";
00171 }
00172 else if(0==strcmp("try_pull",opname))
00173 {
00174 ++(_exceptionCount[TryPull]);
00175 opname="pull";
00176 }
00177 else
00178 DB(2,"Ignoring unrecognised response. operation:"<<opname);
00179 if(_exceptionCount[Pull]>=2 && _exceptionCount[TryPull]>=2)
00180 {
00181 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00182
00183
00184 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00185 req->send_deferred();
00186 Orb::inst().deferredRequest(req._retn());
00187
00188 _target=CosEventComm::PullSupplier::_nil();
00189 eraseKey("SupplierAdmin/ProxyPullConsumer");
00190 deactivateObject();
00191 }
00192 }
00193 else
00194 {
00195
00196 bool hasEvent=false;
00197 if(0==strcmp("pull",opname))
00198 {
00199 _exceptionCount[Pull]=0;
00200 hasEvent=true;
00201 }
00202 else if(0==strcmp("try_pull",opname))
00203 {
00204 _exceptionCount[TryPull]=0;
00205 CORBA::NVList_ptr args=_req->arguments();
00206 if(args->count()==1)
00207 {
00208 CORBA::NamedValue_var hasEventArg=args->item(0);
00209 if(0==strcmp(hasEventArg->name(),"has_event"))
00210 {
00211 CORBA::Any* a =hasEventArg->value();
00212 CORBA::Boolean b;
00213 CORBA::Any::to_boolean tb(b);
00214 hasEvent=(((*a)>>=tb) && b);
00215 }
00216 }
00217 }
00218
00219 if(hasEvent)
00220 {
00221 CORBA::Any* event =new CORBA::Any();
00222 _req->return_value() >>= (*event);
00223 _queue.push_back(event);
00224 }
00225 }
00226 _req=CORBA::Request::_nil();
00227 }
00229 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00230 {
00231 _req=_target->_request(opname);
00232 _req->set_return_type(CORBA::_tc_any);
00233 if(0==strcmp("try_pull",opname))
00234 _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00235 _req->send_deferred();
00236 }
00237 }
00238
00239 void ProxyPullConsumer_i::reincarnate(
00240 const string& oid,
00241 const PersistNode& node
00242 )
00243 {
00244 CosEventComm::PullSupplier_var pullSupplier =
00245 string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00246
00247 activateObjectWithId(oid.c_str());
00248 connect_pull_supplier(pullSupplier.in());
00249 }
00250
00251 void ProxyPullConsumer_i::output(ostream& os)
00252 {
00253 basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00254 }
00255
00256 };