00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022
00023 #include <drizzled/session.h>
00024 #include <drizzled/user_var_entry.h>
00025 #include <drizzled/plugin/client/cached.h>
00026 #include <drizzled/plugin/client/concurrent.h>
00027 #include <drizzled/catalog/local.h>
00028 #include <drizzled/execute.h>
00029
00030 namespace drizzled
00031 {
00032
00033 Execute::Execute(Session &arg, bool wait_arg) :
00034 wait(wait_arg),
00035 _session(arg)
00036 {
00037 }
00038
00039 Execute::~Execute()
00040 {
00041 }
00042
00043 void Execute::run(const char *arg, size_t length)
00044 {
00045 std::string execution_string(arg, length);
00046 run(execution_string);
00047 }
00048
00049 void Execute::run(std::string &execution_string, sql::ResultSet &result_set)
00050 {
00051 boost_thread_shared_ptr thread;
00052
00053 if (_session.isConcurrentExecuteAllowed())
00054 {
00055 plugin::client::Cached *client= new plugin::client::Cached(result_set);
00056 client->pushSQL(execution_string);
00057 Session::shared_ptr new_session= Session::make_shared(client, catalog::local());
00058
00059
00060 util::string::const_shared_ptr schema(_session.schema());
00061 if (not schema->empty())
00062 new_session->set_db(*schema);
00063
00064 new_session->setConcurrentExecute(false);
00065
00066
00067
00068
00069 new_session->user()= _session.user();
00070
00071 if (Session::schedule(new_session))
00072 {
00073 Session::unlink(new_session);
00074 }
00075 else if (wait)
00076 {
00077 thread= new_session->getThread();
00078 }
00079 }
00080 else
00081 {
00082 my_error(ER_WRONG_ARGUMENTS, MYF(0), "A Concurrent Execution Session can not launch another session.");
00083 return;
00084 }
00085
00086 if (wait && thread && thread->joinable())
00087 {
00088
00089 if (_session.getThread())
00090 {
00091 boost::this_thread::restore_interruption dl(_session.getThreadInterupt());
00092
00093 try {
00094 thread->join();
00095 }
00096 catch(boost::thread_interrupted const&)
00097 {
00098
00099 my_error(drizzled::ER_QUERY_INTERRUPTED, MYF(0));
00100 return;
00101 }
00102 }
00103 else
00104 {
00105 thread->join();
00106 }
00107 }
00108 }
00109
00110 void Execute::run(std::string &execution_string)
00111 {
00112 boost_thread_shared_ptr thread;
00113
00114 if (_session.isConcurrentExecuteAllowed())
00115 {
00116 plugin::client::Concurrent *client= new plugin::client::Concurrent;
00117 client->pushSQL(execution_string);
00118 Session::shared_ptr new_session= Session::make_shared(client, catalog::local());
00119
00120
00121 util::string::const_shared_ptr schema(_session.schema());
00122 if (not schema->empty())
00123 new_session->set_db(*schema);
00124
00125 new_session->setConcurrentExecute(false);
00126
00127
00128
00129
00130 new_session->user()= _session.user();
00131
00132 if (Session::schedule(new_session))
00133 {
00134 Session::unlink(new_session);
00135 }
00136 else if (wait)
00137 {
00138 thread= new_session->getThread();
00139 }
00140 }
00141 else
00142 {
00143 my_error(ER_WRONG_ARGUMENTS, MYF(0), "A Concurrent Execution Session can not launch another session.");
00144 return;
00145 }
00146
00147 if (wait && thread && thread->joinable())
00148 {
00149
00150 if (_session.getThread())
00151 {
00152 boost::this_thread::restore_interruption dl(_session.getThreadInterupt());
00153
00154 try {
00155 thread->join();
00156 }
00157 catch(boost::thread_interrupted const&)
00158 {
00159
00160 my_error(drizzled::ER_QUERY_INTERRUPTED, MYF(0));
00161 return;
00162 }
00163 }
00164 else
00165 {
00166 thread->join();
00167 }
00168 }
00169 }
00170
00171 }