Client_IO.cc

Go to the documentation of this file.
00001 
00006 #include "Client_IO.H"
00007 
00008 // -------------------------------------------------------------------------
00009 
00010 const int ConnectRetries = 3; // #retries to server before client gives up
00011 const int RetryDelay = 60; // delay in seconds between retries
00012 
00013 // example 1:
00014 // Abort immediately after detecting that the server is not reachable:
00015 //  ConnectRetries = 0;
00016 
00017 // example 2:
00018 // Wait up to 5 minutes after detecting that the server is not reachable,
00019 // try to reconnect every minute:
00020 // ConnectRetries = 5; RetryDelay = 60;
00021 
00022 
00023 string CClientPolynomFetcher::buffer;
00024 
00025 void * CClientPolynomFetcher::THREAD_fetch_polynom(void *)
00026 {
00027   static unsigned int PolyInterval = 1000;
00028   static time_t prev_time = 0;
00029   if (PolyInterval>500 && time(NULL)>prev_time+300) PolyInterval=100*(PolyInterval/200);
00030   else if (time(NULL)<prev_time+120) PolyInterval+=100*(PolyInterval/200);
00031   prev_time=time(NULL);
00032 
00033   int retries_on_err = 0;
00034 try_again:
00035   try
00036    {
00037      unix_io_stream tcp_ServerConnection(communication_name, server_port);
00038      cout_network << "Fetching mpqs polynomial interval (" << PolyInterval << ")..." << endl;
00039      tcp_ServerConnection << "Polynom? " << PolyInterval << endl;
00040      buffer.clear();
00041      while (!tcp_ServerConnection.eof())
00042       {
00043         char c;
00044         tcp_ServerConnection.get(c);
00045         buffer+=c;
00046       }
00047      cout_network << "background thread: polynomial fetched." << endl;
00048      return NULL;
00049    }
00050   catch (unix_buffer_exception &e)
00051    {
00052      cerr << "caught exception in background thread while fetching polynomial:" << endl
00053           << e.what() << endl;
00054      if (prev_time!=0 && ++retries_on_err<=ConnectRetries) { sleep(RetryDelay); goto try_again; }
00055      else throw; // propagate exception
00056    }
00057   catch (...)
00058    {
00059      cerr << "caught unknown exception in background thread while fetching polynomial." << endl;
00060      throw; // propagate exception
00061    }
00062 }
00063 
00064 void CClientPolynomFetcher::fetch(mpz_t UpperBound_D)
00065 {
00066   static pthread_t thread_fetch_polynom;
00067   int retcode;
00068 
00069   static bool first_time_first_love = true;
00070   if (first_time_first_love)
00071    {
00072      first_time_first_love=false;
00073      retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
00074      if (retcode != 0)
00075       {
00076         cerr << "CClientPolynomFetcher: pthread_create failed!" << endl;
00077         exit(1);
00078       }
00079    }
00080 
00081   // join thread
00082   cout_network << "Polynomfetch join..." << endl;
00083   retcode = pthread_join(thread_fetch_polynom, NULL);
00084   if (retcode != 0)
00085    {
00086      cerr << "CClientPolynomFetcher: Joining thread failed!" << endl;
00087      exit(1);
00088    }
00089 
00090   cout << "Setting mpqs polynomial interval..." << endl;
00091 
00092   // read in polynomial data from internal string buffer
00093   istringstream is(buffer);
00094   Polynom.load(is);
00095   cout << "Old UpperBound_D = " << UpperBound_D << endl;
00096   is >> UpperBound_D;
00097   cout << "New UpperBound_D = " << UpperBound_D << endl;
00098 
00099   // and fetch the next polynomial (threaded in background)
00100   retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
00101   if (retcode != 0)
00102    {
00103      cerr << "CClientPolynomFetcher::pthread_create failed!" << endl;
00104      exit(1);
00105    }
00106 }
00107 
00108 
00109 // ----------------------------------------------------------------------
00110 
00111 
00112 CMutex CClientDynamicFactorFetcher::Mutex;
00113 queue<int> CClientDynamicFactorFetcher::buffer;
00114 
00115 void * CClientDynamicFactorFetcher::THREAD_fetch_DynamicFactors(void *)
00116 {
00117  int sleep_secs = 100;
00118  while(true)
00119   {
00120     try
00121      {
00122        unix_io_stream tcp_ServerConnection(communication_name, server_port);
00123        cout << "Fetching dynamic factors from server..." << endl;
00124        static unsigned int dynfac_pos = 0;
00125        tcp_ServerConnection << "DynamicFactors?_ab_index " << dynfac_pos << endl;
00126 
00127        int factor = 0;
00128        int counter = 0;
00129        while (true) // fetching dynamic relations
00130         {
00131           //tcp_ServerConnection >> factor;
00132           // cout << factor << "\r" << flush;
00133           char c[4];
00134           if (tcp_ServerConnection.peek()==EOF)
00135            {
00136              // magical mystery tour...
00137              // it seems that EOF is set sometimes although the stream
00138              // is still fully readable...
00139              // for that reason we clear the stream just before the
00140              // real read...
00141              if ( Cpoll(tcp_ServerConnection).readable_chars_within(128,2000) < 4)
00142                cerr << "possibly failing..." << endl;
00143              tcp_ServerConnection.clear();
00144            }
00145           tcp_ServerConnection.read(c,4);
00146           factor=static_cast<unsigned int>(static_cast<unsigned char>(c[0]));
00147           factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[1]))<<8;
00148           factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[2]))<<16;
00149           factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[3]))<<24;
00150           if (factor<=0 || tcp_ServerConnection.fail())
00151            {
00152              MARK; break;
00153            }
00154           Mutex.lock(); buffer.push(factor); Mutex.unlock();
00155           ++counter;
00156         }
00157        dynfac_pos+=counter;
00158        cout_network << "background thread: " << counter << " dynamic factors fetched." << endl;
00159        if (counter>200) sleep_secs-=sleep_secs/4;
00160        else if (counter<20 && sleep_secs<3600) sleep_secs+=(sleep_secs+3)/4;
00161      }
00162     catch (unix_buffer_exception &e)
00163      {
00164        cerr << "caught exception in background thread while fetching dynamic factors:" << endl
00165             << e.what() << endl;
00166        sleep_secs=90; // retry in 90 seconds
00167      }
00168     catch (...)
00169      {
00170        cerr << "caught unknown exception in background thread while fetching dynamic factors." << endl;
00171        throw; // propagate exception
00172      }
00173 
00174     cout_network << "next dynamic factor fetch request in " << sleep_secs << " seconds." << endl;
00175     sleep(sleep_secs); // wait a few seconds...
00176   }
00177  return NULL;
00178 }
00179 
00180 
00181 void CClientDynamicFactorFetcher::fetch()
00182 {
00183   static pthread_t thread_fetch_DynamicFactors;
00184 
00185   int retcode;
00186   static bool first_time_first_love = true;
00187 
00188   if (first_time_first_love)
00189    {
00190      first_time_first_love=false;
00191      retcode = pthread_create(&thread_fetch_DynamicFactors, NULL, THREAD_fetch_DynamicFactors, NULL);
00192      if (retcode != 0)
00193       {
00194         cerr << "CClientDynamicFactorFetcher: pthread_create failed!" << endl;
00195         exit(1);
00196       }
00197    }
00198 
00199   CUnlockMutexAtDestruction Unlocker(Mutex); Mutex.lock();
00200   // Mutex will be automatically unlocked at destructor call of Unlocker!
00201 
00202   if (buffer.empty()) return;
00203 
00204 #ifdef VERBOSE
00205   cout << "Inserting " << buffer.size() << " dynamic factors..." << endl;
00206 #endif
00207 
00208   // read in dynamic factors from internal class buffer
00209   TDynamicFactorRelation relation;
00210   relation.fpos=0; relation.factor=0;
00211   while (!buffer.empty()) // get dynamic relation
00212    {
00213      relation.factor=buffer.front(); buffer.pop();
00214      // cout << relation.factor << "\r" << flush;
00215      relation.append_for_sieving();
00216      DynamicFactorRelations.insert(relation);
00217    }
00218 }
00219 
00220 
00221 // ---------------------------------------------------------------------------
00222 
00223 
00224 void CClientRelation_Delivery::init(void)
00225 {
00226   // first check whether the host is reachable
00227   // and get the hostname
00228 
00229 #ifdef CYGWIN_COMPAT
00230   struct hostent *hp;
00231   hp = gethostbyname(communication_name.c_str());
00232   if (hp == NULL)
00233     {
00234       cerr << "Unknown host " << communication_name << endl;
00235       exit (1);
00236     }
00237 #else
00238   // refer "man getaddrinfo" and "man socket" 
00239   struct addrinfo hints; // our wishes are placed here
00240   memset(&hints,0,sizeof(hints)); // must be zeroed for default options
00241   hints.ai_family=PF_INET; // we want IPv4 as protocol
00242   hints.ai_socktype=SOCK_STREAM; // and we need a stream, not datagram!
00243   struct addrinfo *addrinfo_res = NULL; // here the result will be delivered
00244   const int retval = getaddrinfo(communication_name.c_str(),NULL,&hints,&addrinfo_res);
00245   if ( retval || addrinfo_res==NULL ) // any error?
00246    {
00247      cerr << "can't reach " << "\"" <<  communication_name << "\"" << endl;
00248      cerr << "Error given by getaddrinfo: " << endl;
00249      cerr << gai_strerror(retval) << endl;
00250      exit(1);
00251    }
00252   if (addrinfo_res->ai_socktype!=SOCK_STREAM) // we got a "stream"-protocol?
00253    {
00254      cerr << "provided protocol doesn't support SOCK_STREAM" << endl;
00255      exit(1);
00256    }
00257 
00258   freeaddrinfo(addrinfo_res); // free resources allocated by getaddrinfo
00259 #endif
00260 
00261 
00262  // now initialize consumer thread, which transmits the relations to the server
00263   pthread_attr_t detached_thread;
00264   pthread_t thread_transmit_relations;
00265 
00266   pthread_attr_init(&detached_thread);
00267   pthread_attr_setdetachstate(&detached_thread, PTHREAD_CREATE_DETACHED);
00268 
00269   const int retcode = pthread_create(&thread_transmit_relations, &detached_thread,
00270                        CClientRelation_Delivery::THREAD_transmit_Relations, NULL);
00271 
00272   if (retcode != 0)
00273    {
00274      cerr << "pthread_create failed for THREAD_transmit_Relations!" << endl;
00275      exit(1);
00276    }
00277 
00278 }
00279 
00280 
00281 
00282 void * CClientRelation_Delivery::THREAD_transmit_Relations(void *)
00283 {
00284   // important: this thread shall not run concurrently to itself!
00285   string s;
00286   char line[1024];
00287 
00288   // if the first attempt to connect the server fails: abort the program
00289   // if the first attempt succeeded, but later attempts fail, this
00290   // indicates that the server is temporarily unavailable.
00291   // Wait a few minutes and try again. If this fails too often, then give up.
00292 
00293   unix_io_stream *connection_to_server = NULL;
00294   int err_count = 0;
00295   static bool connection_reachable = false;
00296   bool retried_transmission = false;
00297   ostringstream *temp_relations = NULL;
00298 
00299 reconnect:
00300   try
00301    {
00302      if (connection_to_server!=NULL) delete connection_to_server;
00303  
00304      // wait until the pipe produces input
00305      PipeInput.get(); PipeInput.unget();
00306 
00307      connection_to_server = new unix_io_stream(communication_name, server_port);
00308      connection_reachable=true;
00309    }
00310   catch (unix_buffer_exception &e)
00311    {
00312      connection_to_server=NULL;
00313      cerr << "caught an exception: " << e.what() << endl;
00314      if (connection_reachable && ++err_count<=ConnectRetries)
00315       {
00316         cout << "sleeping " << RetryDelay << " seconds before retrying..." << endl;
00317         sleep(RetryDelay);
00318         goto reconnect;
00319       }
00320      else
00321       {
00322         goto done;
00323       }
00324    }
00325 
00326 
00327   *connection_to_server << "NewRelations! " << kN << endl;
00328   // verify, that relations belong to our factorization
00329   *connection_to_server >> s;
00330   if (s!="proceed")
00331    {
00332      cerr << "Oops! Server does not accept my relations: " << endl;
00333      cerr << s << endl;
00334      goto done;
00335    }
00336 
00337   // if this is omitted, then the network socket name will be used as account name by the server
00338   if (ClientAccountName!="") *connection_to_server << "Account: " << ClientAccountName << endl; // sending Account name for registering statistical data
00339 
00340   if (retried_transmission)
00341    {
00342      // previous transmission failed, retry it instead of giving up
00343      MARK;
00344      retried_transmission=false;
00345      cout << "retry sending relations." << endl;
00346      if (temp_relations==NULL || temp_relations->str().empty())
00347       {
00348         cout << "nothing to transmit..." << endl;
00349       }
00350      else
00351       {
00352         *connection_to_server << temp_relations->str() << flush;
00353         if (connection_to_server->fail())
00354          {
00355            MARK;
00356            cerr << "stream state indicates that transmission has failed!" << endl;
00357            retried_transmission=true;
00358            goto reconnect;
00359          }
00360 
00361         // synchronize communication with server
00362         
00363         static int counter = 0;
00364         *connection_to_server << "Relationsblock_Sync " << flush;
00365         s="(empty)"; *connection_to_server >> s;
00366         if (s!="synced.")
00367          {
00368            cerr << "sync with server failed: " << endl; cerr << s << endl;
00369            retried_transmission=true;
00370            if (++counter > 3)
00371             {
00372               cerr << "too many retries. giving up. sorry." << endl;
00373               goto done;
00374             }
00375            goto reconnect;
00376          }
00377         counter = 0; // reset counter
00378         cout << "retry sending relations succeeded." << endl;  
00379       } 
00380    }
00381 
00382 transmit:
00383   while (PipeInput.peek()!=EOF)
00384    {
00385      delete temp_relations; temp_relations = new ostringstream();
00386 
00387      int count=1000+1; // send maximal count-1 relations per block
00388      const int max_duration = 8; // Maximal duration of a transmission for a block in seconds (to prevent life-locks...)
00389      const time_t starttime = time(NULL);
00390      while (PipeInput.peek()!=EOF && --count && time(NULL)-starttime<=max_duration)
00391       {
00392         *connection_to_server << "RL "; // new token for "Relation! "
00393         *temp_relations << "RL ";
00394         while (PipeInput.peek()!='\n')
00395          {
00396            PipeInput.get(line,sizeof(line),'\n');
00397            *connection_to_server << line;
00398            *temp_relations << line;
00399          }
00400         char c; PipeInput.get(c);
00401         *connection_to_server << '\n';
00402         *temp_relations << '\n';
00403       }
00404       
00405      *connection_to_server << flush;
00406      
00407      if (connection_to_server->fail())
00408       {
00409         MARK;
00410         cerr << "stream state indicates that transmission has failed!" << endl;
00411         retried_transmission=true;
00412         goto reconnect;
00413       }
00414 
00415      if (PipeInput.peek()!=EOF)
00416       {
00417         // synchronize communication with server
00418         *connection_to_server << "Relationsblock_Sync " << flush;
00419         s="(empty)"; *connection_to_server >> s;
00420         if (s!="synced.")
00421          {
00422            cerr << "sync with server failed: " << endl; cerr << s << endl;
00423            retried_transmission=true;
00424            goto reconnect;
00425          }
00426       }
00427    }
00428   PipeInput.clear();
00429 
00430   // synchronize communication with server
00431   cout << "syncing with server..." << endl;
00432   *connection_to_server << "Relationsblock_Sync " << flush;
00433   s="(empty)"; *connection_to_server >> s;
00434   if (s!="synced.")
00435    {
00436      cerr << "sync with server failed: " << endl; cerr << s << endl;
00437      goto done;
00438    }
00439 
00440   sleep(5); // wait to let relations drop into the pipe
00441   if (PipeInput.peek()!=EOF) goto transmit;
00442 
00443   *connection_to_server << "Relationsblock_Ende " << flush;
00444   *connection_to_server >> s;
00445   if (s=="ignoriert!")
00446    {
00447      cerr << "Oops! - Transmitted relations were rejected by the server!" << endl;
00448      goto done;
00449    }
00450   if (s!="empfangen.")
00451    {
00452      cerr << "Transmission of relations not acknowledged!" << endl;
00453      goto done;
00454    }
00455 
00456   goto reconnect; // end this transmission and respawn a new transmission when new relations arrive...
00457 
00458 done:
00459   // we're finished with our job...
00460   cout << "ending transmissions... bye, bye..." << endl;
00461   delete temp_relations;
00462   delete connection_to_server;
00463   exit(1);
00464 }

Generated on Wed Nov 7 23:29:25 2007 for Qsieve by  doxygen 1.5.4