00001
00002
00003 #include "pch.h"
00004
00005 #include "network.h"
00006 #include "wait.h"
00007
00008 #define CRYPTOPP_TRACE_NETWORK 0
00009
00010 NAMESPACE_BEGIN(CryptoPP)
00011
00012 #ifdef HIGHRES_TIMER_AVAILABLE
00013
00014 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
00015 {
00016 if (!m_maxBytesPerSecond)
00017 return ULONG_MAX;
00018
00019 const double curTime = GetCurTimeAndCleanUp();
00020 CRYPTOPP_UNUSED(curTime);
00021
00022 lword total = 0;
00023 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
00024 total += m_ops[i].second;
00025 return SaturatingSubtract(m_maxBytesPerSecond, total);
00026 }
00027
00028 double LimitedBandwidth::TimeToNextTransceive()
00029 {
00030 if (!m_maxBytesPerSecond)
00031 return 0;
00032
00033 if (!m_nextTransceiveTime)
00034 ComputeNextTransceiveTime();
00035
00036 return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
00037 }
00038
00039 void LimitedBandwidth::NoteTransceive(lword size)
00040 {
00041 if (m_maxBytesPerSecond)
00042 {
00043 double curTime = GetCurTimeAndCleanUp();
00044 m_ops.push_back(std::make_pair(curTime, size));
00045 m_nextTransceiveTime = 0;
00046 }
00047 }
00048
00049 void LimitedBandwidth::ComputeNextTransceiveTime()
00050 {
00051 double curTime = GetCurTimeAndCleanUp();
00052 lword total = 0;
00053 for (unsigned int i=0; i!=m_ops.size(); ++i)
00054 total += m_ops[i].second;
00055 m_nextTransceiveTime =
00056 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
00057 }
00058
00059 double LimitedBandwidth::GetCurTimeAndCleanUp()
00060 {
00061 if (!m_maxBytesPerSecond)
00062 return 0;
00063
00064 double curTime = m_timer.ElapsedTimeAsDouble();
00065 while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
00066 m_ops.pop_front();
00067 return curTime;
00068 }
00069
00070 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
00071 {
00072 double nextTransceiveTime = TimeToNextTransceive();
00073 if (nextTransceiveTime)
00074 container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
00075 }
00076
00077
00078
00079 size_t NonblockingSource::GeneralPump2(
00080 lword& byteCount, bool blockingOutput,
00081 unsigned long maxTime, bool checkDelimiter, byte delimiter)
00082 {
00083 m_blockedBySpeedLimit = false;
00084
00085 if (!GetMaxBytesPerSecond())
00086 {
00087 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
00088 m_doPumpBlocked = (ret != 0);
00089 return ret;
00090 }
00091
00092 bool forever = (maxTime == INFINITE_TIME);
00093 unsigned long timeToGo = maxTime;
00094 Timer timer(Timer::MILLISECONDS, forever);
00095 lword maxSize = byteCount;
00096 byteCount = 0;
00097
00098 timer.StartTimer();
00099
00100 while (true)
00101 {
00102 lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
00103
00104 if (curMaxSize || m_doPumpBlocked)
00105 {
00106 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00107 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
00108 m_doPumpBlocked = (ret != 0);
00109 if (curMaxSize)
00110 {
00111 NoteTransceive(curMaxSize);
00112 byteCount += curMaxSize;
00113 }
00114 if (ret)
00115 return ret;
00116 }
00117
00118 if (maxSize != ULONG_MAX && byteCount >= maxSize)
00119 break;
00120
00121 if (!forever)
00122 {
00123 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00124 if (!timeToGo)
00125 break;
00126 }
00127
00128 double waitTime = TimeToNextTransceive();
00129 if (!forever && waitTime > timeToGo)
00130 {
00131 m_blockedBySpeedLimit = true;
00132 break;
00133 }
00134
00135 WaitObjectContainer container;
00136 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
00137 container.Wait((unsigned long)waitTime);
00138 }
00139
00140 return 0;
00141 }
00142
00143 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00144 {
00145 if (messageCount == 0)
00146 return 0;
00147
00148 messageCount = 0;
00149
00150 lword byteCount;
00151 do {
00152 byteCount = LWORD_MAX;
00153 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00154 } while(byteCount == LWORD_MAX);
00155
00156 if (!m_messageEndSent && SourceExhausted())
00157 {
00158 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00159 m_messageEndSent = true;
00160 messageCount = 1;
00161 }
00162 return 0;
00163 }
00164
00165 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
00166 {
00167 m_blockedBySpeedLimit = false;
00168
00169 size_t curBufSize = GetCurrentBufferSize();
00170 if (curBufSize <= targetSize && (targetSize || !EofPending()))
00171 return 0;
00172
00173 if (!GetMaxBytesPerSecond())
00174 return DoFlush(maxTime, targetSize);
00175
00176 bool forever = (maxTime == INFINITE_TIME);
00177 unsigned long timeToGo = maxTime;
00178 Timer timer(Timer::MILLISECONDS, forever);
00179 lword totalFlushed = 0;
00180
00181 timer.StartTimer();
00182
00183 while (true)
00184 {
00185 size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
00186 if (flushSize || EofPending())
00187 {
00188 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00189 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
00190 if (ret)
00191 {
00192 NoteTransceive(ret);
00193 curBufSize -= ret;
00194 totalFlushed += ret;
00195 }
00196 }
00197
00198 if (curBufSize <= targetSize && (targetSize || !EofPending()))
00199 break;
00200
00201 if (!forever)
00202 {
00203 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00204 if (!timeToGo)
00205 break;
00206 }
00207
00208 double waitTime = TimeToNextTransceive();
00209 if (!forever && waitTime > timeToGo)
00210 {
00211 m_blockedBySpeedLimit = true;
00212 break;
00213 }
00214
00215 WaitObjectContainer container;
00216 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
00217 container.Wait((unsigned long)waitTime);
00218 }
00219
00220 return totalFlushed;
00221 }
00222
00223 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00224 {
00225 TimedFlush(blocking ? INFINITE_TIME : 0);
00226 return hardFlush && (!!GetCurrentBufferSize() || EofPending());
00227 }
00228
00229
00230
00231 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00232 : NonblockingSource(attachment), m_buf(1024*16)
00233 , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
00234 , m_waitingForResult(false), m_outputBlocked(false)
00235 {
00236 }
00237
00238 unsigned int NetworkSource::GetMaxWaitObjectCount() const
00239 {
00240 return LimitedBandwidth::GetMaxWaitObjectCount()
00241 + GetReceiver().GetMaxWaitObjectCount()
00242 + AttachedTransformation()->GetMaxWaitObjectCount();
00243 }
00244
00245 void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00246 {
00247 if (BlockedBySpeedLimit())
00248 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
00249 else if (!m_outputBlocked)
00250 {
00251 if (m_dataBegin == m_dataEnd)
00252 AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
00253 else
00254 container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
00255 }
00256
00257 AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
00258 }
00259
00260 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00261 {
00262 NetworkReceiver &receiver = AccessReceiver();
00263
00264 lword maxSize = byteCount;
00265 byteCount = 0;
00266 bool forever = maxTime == INFINITE_TIME;
00267 Timer timer(Timer::MILLISECONDS, forever);
00268 BufferedTransformation *t = AttachedTransformation();
00269
00270 if (m_outputBlocked)
00271 goto DoOutput;
00272
00273 while (true)
00274 {
00275 if (m_dataBegin == m_dataEnd)
00276 {
00277 if (receiver.EofReceived())
00278 break;
00279
00280 if (m_waitingForResult)
00281 {
00282 if (receiver.MustWaitForResult() &&
00283 !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00284 CallStack("NetworkSource::DoPump() - wait receive result", 0)))
00285 break;
00286
00287 unsigned int recvResult = receiver.GetReceiveResult();
00288 #if CRYPTOPP_TRACE_NETWORK
00289 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00290 #endif
00291 m_dataEnd += recvResult;
00292 m_waitingForResult = false;
00293
00294 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00295 goto ReceiveNoWait;
00296 }
00297 else
00298 {
00299 m_dataEnd = m_dataBegin = 0;
00300
00301 if (receiver.MustWaitToReceive())
00302 {
00303 if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00304 CallStack("NetworkSource::DoPump() - wait receive", 0)))
00305 break;
00306
00307 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00308 m_waitingForResult = true;
00309 }
00310 else
00311 {
00312 ReceiveNoWait:
00313 m_waitingForResult = true;
00314
00315
00316 #if CRYPTOPP_TRACE_NETWORK
00317 OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00318 #endif
00319 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00320 {
00321 unsigned int recvResult = receiver.GetReceiveResult();
00322 #if CRYPTOPP_TRACE_NETWORK
00323 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00324 #endif
00325 m_dataEnd += recvResult;
00326 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00327 {
00328 m_waitingForResult = false;
00329 break;
00330 }
00331 }
00332 }
00333 }
00334 }
00335 else
00336 {
00337 m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
00338
00339 if (checkDelimiter)
00340 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00341
00342 DoOutput:
00343 size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00344 if (result)
00345 {
00346 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00347 CallStack("NetworkSource::DoPump() - wait attachment", 0)))
00348 goto DoOutput;
00349 else
00350 {
00351 m_outputBlocked = true;
00352 return result;
00353 }
00354 }
00355 m_outputBlocked = false;
00356
00357 byteCount += m_putSize;
00358 m_dataBegin += m_putSize;
00359 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00360 break;
00361 if (maxSize != ULONG_MAX && byteCount == maxSize)
00362 break;
00363
00364
00365
00366 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00367 break;
00368 }
00369 }
00370
00371 return 0;
00372 }
00373
00374
00375
00376 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00377 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00378 , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
00379 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
00380 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00381 , m_currentSpeed(0), m_maxObservedSpeed(0)
00382 {
00383 }
00384
00385 float NetworkSink::ComputeCurrentSpeed()
00386 {
00387 if (m_speedTimer.ElapsedTime() > 1000)
00388 {
00389 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00390 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00391 m_byteCountSinceLastTimerReset = 0;
00392 m_speedTimer.StartTimer();
00393
00394 }
00395 return m_currentSpeed;
00396 }
00397
00398 float NetworkSink::GetMaxObservedSpeed() const
00399 {
00400 lword m = GetMaxBytesPerSecond();
00401 return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
00402 }
00403
00404 unsigned int NetworkSink::GetMaxWaitObjectCount() const
00405 {
00406 return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
00407 }
00408
00409 void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00410 {
00411 if (BlockedBySpeedLimit())
00412 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
00413 else if (m_wasBlocked)
00414 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
00415 else if (!m_buffer.IsEmpty())
00416 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
00417 else if (EofPending())
00418 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
00419 }
00420
00421 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
00422 {
00423 if (m_eofState == EOF_DONE)
00424 {
00425 if (length || messageEnd)
00426 throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
00427
00428 return 0;
00429 }
00430
00431 if (m_eofState > EOF_NONE)
00432 goto EofSite;
00433
00434 {
00435 if (m_skipBytes)
00436 {
00437 assert(length >= m_skipBytes);
00438 inString += m_skipBytes;
00439 length -= m_skipBytes;
00440 }
00441
00442 m_buffer.Put(inString, length);
00443
00444 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00445 TimedFlush(0, 0);
00446
00447 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
00448 if (blocking)
00449 TimedFlush(INFINITE_TIME, targetSize);
00450
00451 if (m_buffer.CurrentSize() > targetSize)
00452 {
00453 assert(!blocking);
00454 m_wasBlocked = true;
00455 m_skipBytes += length;
00456 size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
00457 return STDMAX<size_t>(blockedBytes, 1);
00458 }
00459
00460 m_wasBlocked = false;
00461 m_skipBytes = 0;
00462 }
00463
00464 if (messageEnd)
00465 {
00466 m_eofState = EOF_PENDING_SEND;
00467
00468 EofSite:
00469 TimedFlush(blocking ? INFINITE_TIME : 0, 0);
00470 if (m_eofState != EOF_DONE)
00471 return 1;
00472 }
00473
00474 return 0;
00475 }
00476
00477 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
00478 {
00479 NetworkSender &sender = AccessSender();
00480
00481 bool forever = maxTime == INFINITE_TIME;
00482 Timer timer(Timer::MILLISECONDS, forever);
00483 unsigned int totalFlushSize = 0;
00484
00485 while (true)
00486 {
00487 if (m_buffer.CurrentSize() <= targetSize)
00488 break;
00489
00490 if (m_needSendResult)
00491 {
00492 if (sender.MustWaitForResult() &&
00493 !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00494 CallStack("NetworkSink::DoFlush() - wait send result", 0)))
00495 break;
00496
00497 unsigned int sendResult = sender.GetSendResult();
00498 #if CRYPTOPP_TRACE_NETWORK
00499 OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00500 #endif
00501 m_buffer.Skip(sendResult);
00502 totalFlushSize += sendResult;
00503 m_needSendResult = false;
00504
00505 if (!m_buffer.AnyRetrievable())
00506 break;
00507 }
00508
00509 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00510 if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
00511 break;
00512
00513 size_t contiguousSize = 0;
00514 const byte *block = m_buffer.Spy(contiguousSize);
00515
00516 #if CRYPTOPP_TRACE_NETWORK
00517 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00518 #endif
00519 sender.Send(block, contiguousSize);
00520 m_needSendResult = true;
00521
00522 if (maxTime > 0 && timeOut == 0)
00523 break;
00524 }
00525
00526 m_byteCountSinceLastTimerReset += totalFlushSize;
00527 ComputeCurrentSpeed();
00528
00529 if (m_buffer.IsEmpty() && !m_needSendResult)
00530 {
00531 if (m_eofState == EOF_PENDING_SEND)
00532 {
00533 sender.SendEof();
00534 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
00535 }
00536
00537 while (m_eofState == EOF_PENDING_DELIVERY)
00538 {
00539 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00540 if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
00541 break;
00542
00543 if (sender.EofSent())
00544 m_eofState = EOF_DONE;
00545 }
00546 }
00547
00548 return totalFlushSize;
00549 }
00550
00551 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
00552
00553 NAMESPACE_END