Synapse TCP/IP client and server

Общие вопросы программирования, алгоритмы и т.п.

Модератор: Модераторы

Re: Synapse TCP/IP client and server

Сообщение delphius » 21.08.2023 11:21:12

Sharfik писал(а):Уже все равно перекопал все

Понятно) я сейчас тоже докопаю websockets, потому что задачи, связанные с работой приоритетнее) отчёт выложу сюда. Но с вашей ситуацией это никак не связано, у вас проблемы именно на уровне самих сокетов, как правильно организовать блокирующие или не блокирующие в синхронном режиме или вообще асинхронные, организовать очередь отправки с применением потоков, собрать все правильно на принимающей стороне и сделать это с минимальными накладными расходами на обеих сторонах.

Добавлено спустя 1 минуту 57 секунд:
Sharfik писал(а):Сейчас меня беспокоит, за неимением опыта, прав ли я делая Sleep(10) в цикле.

Может ещё кто-то включится, более опытный. Я сисадмин, а не профессиональный разработчик, для меня ЯП это больше средство создания инструментов для повседневной работы.
delphius
постоялец
 
Сообщения: 122
Зарегистрирован: 18.03.2020 13:40:11

Re: Synapse TCP/IP client and server

Сообщение stikriz11 » 04.09.2023 16:07:51

Сейчас меня беспокоит, за неимением опыта, прав ли я делая Sleep(10) в цикле.

Правильно.
Если повторно прочитать буфер, то прочитано будет ноль байт или превышено время ожидания. Чтобы об этом не думать,
я всегда перед циклом делаю так:
var Opt: Integer;
...
Opt:=1;
fpsetsockopt(FSocket.Socket, IPPROTO_TCP, TCP_NODELAY, PChar(@Opt), Sizeof(Opt));

Потом в конце тела цикла sleep(10);

А пишу и читаю RecvStream и SendStream
Так удобнее, чем буфер читать и писать
А главное, я не слежу сколько надо записать-прочитать.
Надо только аккуратнее с позицией.
stikriz11
постоялец
 
Сообщения: 114
Зарегистрирован: 04.09.2023 15:54:19

Re: Synapse TCP/IP client and server

Сообщение Sharfik » 05.09.2023 19:14:42

stikriz11 писал(а):Правильно.

Хоть чье то мнение, спасибо!

Посмотрел книги по теме, но вот про Sleep везде умалчивают авторы. Что в книгах про сокеты Линукса, что в книгах по Delphi.

..есть такой алгоритм
Цикл
Пишем в сокет (n раз)
Читаем сокет (n раз)
Sleep(Х)
Конец цикла

На обоих сторонах. Если я делаю Sleep(100), то у меня все вроде бы работает, только пинг 125. Работает - я имею ввиду SendMessage, SendStream. Если я ставлю Sleep(10), то все работает на локальном ПК в пределах роутера(по ip выданным роутером, не через 127...). Как только я начинаю коннектится через свой внешний IP к себе же, то SendStream перестают обрабатываться корректно. Клиент перестает присылать DONE о получении.

1. Непонятно, нужно ли делать Sleep() чтобы процессорное время переключилось на работу сокета при отправке. Учитывая проблему с Stream отправками
2. Sleep в конце нужен чтобы основной поток программы обработал работу интерфейса и процедуры чтения данных обработали полученные данные. Но как оценить сколько им надо разумного времени. И так потоковое разделение есть, откуда палки в колесах из-за разницы времени Sleep не соображу пока что.
3. SendMessage работает всегда четко, SendStream запинается в зависимости от таймингов... и тут не понятна следующая логика. Мы делаем отправку, а получатель в цикле делает Sleep(10..100) и потом с таймаутом делает чтение сокета. Т.е. в один из циклов чтения так или иначе мы должны получить что было отправлено в сокет. Но по факту получается что иногда спотыкается все, если я пытаюсь делать цикл быстрее.

Пока пришел к тому, что "стабильная" схема Ping*4=1200ms. Т.е. отправка с эхо ответом занимает 125..250мс, а фактическая передача каждого сообщения делаться с ожиданием встречного подтверждения DONE, итого 1200ms примерно. Это на локальном ПК. :? Пробовал ставить по разному NoBlock ноль эмоций.
Аватара пользователя
Sharfik
энтузиаст
 
Сообщения: 766
Зарегистрирован: 20.07.2013 01:04:30

Re: Synapse TCP/IP client and server

Сообщение stikriz11 » 07.09.2023 15:50:57

Возможно, все дело в каких=то мелочах. На сервере у Вас же TThread в Execute читает и пишет?
while not Terminated do
begin
if FSocket.WaitingData > 0 then
begin
...
end;
sleep(10);
end;
Это код на сервере. И Terminated надо проверять. И правильно подобрать FSocket.SetTimeout правильно.
Попробуйте убрать sleep - у меня тогда проц без нагрузки потребляет ресурсы. Это непорядок.

А так, все работает отдично. Никаких проблем с сокетом. Я написал этот код давно - везде использую, все работает.

Добавлено спустя 2 минуты 36 секунд:
И, как я уже говорил, с позицией TStream надо аккуратно работать. Смотрите в отладке какая позиция установлена сейчас.
stikriz11
постоялец
 
Сообщения: 114
Зарегистрирован: 04.09.2023 15:54:19

Re: Synapse TCP/IP client and server

Сообщение Sharfik » 07.09.2023 17:36:49

Я хочу поэкспериментировать и сделать очередь полученных через List, а не синхронизацию с постановкой в очередь. Но пока времени нет.
Аватара пользователя
Sharfik
энтузиаст
 
Сообщения: 766
Зарегистрирован: 20.07.2013 01:04:30

Re: Synapse TCP/IP client and server

Сообщение Sharfik » 14.09.2023 05:07:54

Аватара пользователя
Sharfik
энтузиаст
 
Сообщения: 766
Зарегистрирован: 20.07.2013 01:04:30

Re: Synapse TCP/IP client and server

Сообщение Sharfik » 03.10.2023 23:46:48

Поскольку для меня это первый опыт разработки такого приложения был, то и леса за деревьями я не видел изначально(Алекс украду у тебя твое PS :D ). Принцип работы TCP соединения описан в такой схеме
Изображение
И вот тут автор кода на котором все базировалось и подставил читателя(меня). У него все работало на "коротких" посылках. А чуть выход за таймаут, чуть что то не так - сразу выполняется Disconnect. Что по сути неверно.
1. Клиент и сервер работают в противофазах. Один читает, другой в этот момент пишет. И если фаза смещается, то выполнения корректного кода не будет.
Чтобы все было хорошо все посылки должны подтверждаться второй стороной на уровне программы. Т.е. мы должны контролировать что посылка доставлена сами. И только потом отсылать следующую. Если объект стека отправлен полностью, то принимающая сторона может обрабатывать содержимое посылок, а отправляющая заниматься следующей задачей. Иначе общий сброс и заход на второй круг. А не Дисконнект из-за ошибки доставки. Добавив контроль отправки в цикл при ошибке можно просто повторить отправку.
2. Читать надо пока дают, а обрабатывать потом. Иначе будет увеличение тайминга и задержки лишние.
3. В книжках примеры всегда отражают идеальный вариант - в момент отправки нас ждут на чтении.

Добавлено спустя 1 час 4 минуты 26 секунд:
итого на сервере примерно такой цикл будет

Код: Выделить всё
procedure TClientOnServerThread.Execute;
var
  msRecvStream    : TMemoryStream;
  pRecv           : PRecvBufferItem;
  Params          : TStringArray;
  iSize           : Int64;

  bRecvEnabled,
  bSendEnabled,
  bRecvError,
  bRecvProcess,
  bSendError,
  bSendProcess,
  bRecved,
  bSended         : Boolean;
  iAttemptCount,
  iRecv,
  iRecvWaitCount,
  iRecvCount      : Integer;

  sRecvMsg        : String;
  iTransportDelay,
  iDebugCicle,
  iTypeMsg        : Integer;

  i               : Integer;
  FileName        : String;
  sPath           : String;
  SendList        : TList;
  RecvList        : TList;

  Task            : TSendBufferItem;
  FLastIdleTick,
  FCurIdleTick,
  FLastTick,
  FCurTick        : QWord;
  DoIdleControl   : Boolean;
begin

  FCurTick      := GetTickCount64;
  Ping          := 100; //Прописываем усреденный адекватный пинг, для расчета пауз
  iAttemptCount := 0;

  bRecvEnabled  :=True;
  bSendEnabled  :=True;

  //Вкл/выкл отключение при долгом простое
  DoIdleControl  := True;
  FCurIdleTick   := FCurTick;
  FLastIdleTick  := FCurTick;

  iDebugCicle    := 0; //Контроль кол-ва отправленных/полученных сообщений

  if (TCPBase.Active) then
  begin
    BroadcastConnection(Connection.FLogin);
  end;

  while (not Terminated) do
  begin
    iTransportDelay :=GetTransportDelay;

    bRecvError      :=False;
    bRecvProcess    :=False;
    bSendError      :=False;
    bSendProcess    :=False;
    bRecved         :=False;
    bSended         :=False;

    iTypeMsg        :=-1;
    sRecvMsg        :='';
    Params          :=[];

    FCurTick        :=GetTickCount64;

    if (not SocketActive)or(FNeedCloseConnection) then
    begin
        DoDisconnect;
        Break;
    end;

    if (not Busy) and (not BreakForDisconnect) then
    begin
      Busy  := True;
      try
          //Read data
          iDebugCicle   :=0;

          if bRecvEnabled then
          begin
            RecvList  := RecvQueue.LockList;
            try

                i:=ReadBeginDialog;

                bRecvProcess  :=(i=1);
                bRecvError    :=(i=-1);
                iRecvCount    :=0;
                iRecvWaitCount:=-1;

                if bRecvError then
                begin
                   ThWriteInfoLog(Format(rsMessageInfoEx, ['Error on read']));
                end;

                while (bRecvProcess)and(not BreakForDisconnect) do
                begin
                    iTypeMsg    :=-1;
                    sRecvMsg    :='';
                    Params      :=[];

                    if (bRecvProcess) then
                    begin
                       if not RecvMessage(KARINA_TIMEOUT_RECVMESSAGE, iTypeMsg, sRecvMsg, Params) then
                       begin
                          bRecvError   :=True;
                          bRecvProcess :=False;
                       end
                       else
                       begin
                          if (BlockSocket.LastError = WSAETIMEDOUT) then
                          begin
                              bRecvError   :=True;
                              bRecvProcess :=False;
                              ThWriteDebugLog('Timeout on RecvMessage');
                          end;
                       end;
                    end;

                    if (bRecvProcess)and(not BreakForDisconnect) then
                    begin
                      IncRecvCounter;
                      inc(iDebugCicle);
                      inc(iRecvCount);

                      if iRecvCount=1 then
                      begin
                         //Первое сообщение должно быть PFIRST. Если это оно, то тригеры вернуться в позитивное состояние.
                         bRecvError   :=True;
                         bRecvProcess :=False;
                      end;

                      case iTypeMsg of
                        0: begin
                             if ShortCompareText(sRecvMsg,'PFIRST')=0 then
                             begin
                                if (SendMessageDone([])=0) then bRecvError:=True; //DONE
                                //i am first
                                if Length(Params)>=1 then
                                begin
                                   iRecvWaitCount:=StrToIntDef(Params[0],-2);
                                   if (iRecvCount=1)and(iRecvWaitCount>0) then
                                   begin
                                     bRecvError   :=False;
                                     bRecvProcess :=True;
                                   end;
                                end;
                             end
                             else if (iRecvCount>1)and(ShortCompareText(sRecvMsg,'PLAST')=0) then
                             begin
                                if (SendMessageDone([])=0) then bRecvError:=True; //DONE
                                //i am last
                                bRecvProcess :=False;
                                if (iRecvWaitCount<>iRecvCount) then
                                begin
                                    bRecvError :=True;
                                    ThWriteDebugLog('Wrong RecvMessage count');
                                end;
                             end
                             else if (iRecvCount>1)and(ShortCompareText(sRecvMsg,'PING')=0) then
                             begin
                                if (SendMessageDone([])=0) then bRecvError:=True; //DONE
                                if Length(Params)>=1 then
                                begin
                                   if ShortCompareText(Params[0],'INFO')=0 then
                                   begin
                                     SendQueue.AddTask('PING', 'PING', ['INFO',inttostr(Ping)], nil, '');
                                   end;
                                end;
                             end
                             else if (iRecvCount>1)then
                             begin
                                if (SendMessageDone([])=0) then bRecvError:=True; //DONE
                                RecvQueueAdd(0, sRecvMsg, Params, nil);
                             end
                             else
                             begin
                                bRecvError:=True;
                             end;
                           end;
                        1: begin  //stream
                             iSize  := StrToInt64Def(Params[Length(Params) - 1], -1);
                             if (iRecvCount>1)and(iSize > 0) then
                             begin
                               SetLength(Params, Length(Params) - 1);
                               msRecvStream := TMemoryStream.Create;
                               try
                                 msRecvStream.SetSize(iSize);
                                 msRecvStream.Position := 0;
                                 if RecvStream(msRecvStream) then
                                 begin
                                   if (SendMessageDone([])=0) then bRecvError:=True; //DONE
                                   RecvQueueAdd(1, sRecvMsg, Params, msRecvStream);
                                   msRecvStream:=nil;
                                 end
                                 else begin
                                   bRecvError:=True;
                                 end;
                               except
                                 i:=-1;
                                 msRecvStream.Free;
                               end;
                             end
                             else begin
                               bRecvError:=True;
                             end;
                           end;
                        2: begin  //file
                             iSize      := StrToInt64Def(Params[Length(Params) - 2], -1);
                             FileName   := Params[Length(Params) - 1];
                             SetLength(Params, Length(Params) - 2);
                             if (iRecvCount>1)and(iSize > 0) then
                             begin
                               sPath := AddDirSeparator(GetDownloadDir(FTCPBase.DownloadDirectory));
                               if DirectoryExists(sPath) then
                               begin
                                 sPath := ConcatPaths([sPath,FileName]);
                                 if RecvFile(sPath, iSize) then
                                 begin
                                   if (SendMessageDone([])=0) then bRecvError:=True; //DONE
                                   RecvQueueAdd(2, sRecvMsg, Params, nil, sPath);
                                 end;
                               end
                               else
                                 ThWriteErrorLog(rsInvalidDirectory, 0);
                             end
                             else begin
                               bRecvError:=True;
                             end;

                           end;
                      end;

                    end;

                    if bRecvError then
                       bRecvProcess :=False;

                end;//while read

                if (not bRecvError)and(not bRecvProcess) then
                begin
                    bRecved :=(RecvList.Count>0);
                    for iRecv:=0 to RecvList.Count-1 do
                    begin
                        pRecv :=RecvList.Items[iRecv];
                        case pRecv^.TypeCode of
                           0:
                              begin
                                DoRecv(0, pRecv^.Message, pRecv^.Params, nil);
                              end;
                           1:
                              begin
                                DoRecv(1, pRecv^.Message, pRecv^.Params, pRecv^.Stream);
                              end;
                           2:
                              begin
                                DoRecv(2, pRecv^.Message, pRecv^.Params, nil, pRecv^.FileName);
                              end;
                        end;
                    end;

                end;

            finally
              if RecvList.Count>0 then
              begin
                 FLastIdleTick := FCurTick;
              end;

              for iRecv:=RecvList.Count-1 downto 0 do
              begin
                    pRecv:=RecvList.Items[iRecv];
                    SetLength(pRecv^.Params,0);
                    if Assigned(pRecv^.Stream) then
                       pRecv^.Stream.Free;
                    Dispose(pRecv);
                    RecvList.Delete(iRecv);
               end;
               RecvQueue.UnlockList;
            end;

          end;

          if iDebugCicle>0 then
          begin
              ThWriteDebugLog(Format(rsMessageInfoEx, [format('Readed %d messages',[iDebugCicle])]));
          end;

          //Отключение пользователя при простоях.
          //Если клиент не запрашивает долго пинг, то отключаем
          if DoIdleControl and (not BreakForDisconnect) then
          begin
            FCurIdleTick := FCurTick;
            if (FCurIdleTick -  FLastIdleTick> KARINA_PING_PERIODONSERVER) then
            begin
                ThWriteInfoLog('Слишком долгий простой соединения.');
                DoDisconnect;
                Break;
            end;
            if (Ping> KARINA_PING_DISCONNECT) then
            begin
                ThWriteInfoLog('Пинг превышает допустимое значение.');
                DoDisconnect;
                Break;
            end;
          end;

          Sleep(40);

          //Write data
          iDebugCicle:=0;
          if bSendEnabled then
          begin
              bSendProcess   := False;
              bSendError     := False;
              bSended        := False;
              SendList       := SendQueue.Items.LockList;
              try
                  if (SendList.Count>0)and(FBlockSocket.LastError = 0) then
                  begin
                       FLastTick  := GetTickCount64;
                       if SendMessageWithDone('0PFIRST', [IntToStr(SendList.Count+2)])>0 then
                       begin
                         FCurTick   := GetTickCount64;
                         Ping       := FCurTick - FLastTick;
                         ThWriteDebugLog(Format(rsMessageInfoEx, [format('Ping %d ms',[Ping])]));
                         bSendProcess := True;
                       end;
                  end;

                  //Отправка
                  if (bSendProcess)and(SendList.Count>0) then
                  begin
                    i:=0;
                    while (bSendProcess)and (not BreakForDisconnect) do
                    begin
                      Task := TSendBufferItem(SendList.Items[I]);
                      if ProcessTask(Task) then
                      begin
                        if RecvMessageDone>0 then
                        begin
                           IncSendCounter;
                           inc(iDebugCicle);
                        end
                        else begin
                           bSendError   := True;
                        end;
                      end
                      else
                      begin
                        bSendError  := True;
                      end;

                      inc(i);

                      bSendProcess :=(i<SendList.Count);

                      if bSendError then
                         bSendProcess:=False;
                    end;

                    if (SendList.Count=i)and(not bSendProcess) and(not bSendError) then
                    begin
                        if SendMessageWithDone('0PLAST', [])=0 then
                        begin
                           bSendError := True;
                        end
                        else begin
                           bSended :=True;
                        end;
                    end;
                  end;

                   if (bSended)and(not bSendError) then
                   begin
                      FLastIdleTick := FCurTick;
                      //Если доставлено
                      iAttemptCount:=0;
                      while (SendList.Count>0) do
                      begin
                         i    := 0;
                         Task := TSendBufferItem(SendList.Items[I]);
                         if Task <> nil then
                              SendQueue.DeleteTask(Task);
                         SendList.Delete(I);
                      end;
                   end
                   else if (SendList.Count>0)and(iAttemptCount<KARINA_SERVER_SENDATTEMPT) then
                   begin
                     FLastIdleTick := FCurTick;
                     //Попытка повторной доставки
                     inc(iAttemptCount);
                     ThWriteDebugLog(Format(rsMessageInfoEx, [format('Send attempt %d ',[iAttemptCount+1])]));
                   end
                   else if (SendList.Count>0) then
                   begin
                     FLastIdleTick := FCurTick;
                     //Все попытки провальные
                     iAttemptCount:=0;
                     ThWriteInfoLog(Format(rsMessageInfoEx, ['Clearing the SendList of sendpackages due to a delivery error']));
                     while (SendList.Count>0) do
                     begin
                       i:=0;
                       Task := TSendBufferItem(SendList.Items[I]);
                       if Task <> nil then
                            SendQueue.DeleteTask(Task);
                       SendList.Delete(I);
                     end;
                   end;
              finally
                 SendQueue.Items.UnlockList;
              end;

              if (iAttemptCount>1) then
                  Sleep(iTransportDelay); //Смена такта

          end;

          if iDebugCicle>0 then
          begin
              ThWriteDebugLog(Format(rsMessageInfoEx, [format('Sended %d messages',[iDebugCicle])]));
          end;

      finally
        Busy := False;
      end;

    end;

    //Не убирать и не блокировать sleep()
    Sleep(10);

  end; //цикл

  if (TCPBase.Active) then
  begin
     BroadcastDisconnection(Connection.FLogin);
  end;

  ThWriteInfoLog(....);

  FDisconnected := True;

end;     
function TClientOnServerThread.SendMessageWithDone(AMsg: String;
  AParams: array of String): longint;
var
sConfirm:String;
begin
    sConfirm :='';
    Result   :=SendMessage(AMsg,AParams);
    if Result>0 then
    begin
        sConfirm :=BlockSocket.RecvTerminated(BlockSocket.NonblockSendTimeout,CRLF);
        if (ShortCompareText(sConfirm,'DONE')<>0)or(BlockSocket.LastError<>0) then
        begin
           Result:=0;
        end;
    end;
end;

function TClientOnServerThread.SendMessageDone(AParams: array of String
  ): longint;
var
  Msg  :String;
begin
  Result := 0;
  if (BreakForDisconnect) then
    Exit;

  Msg  := 'DONE';
  if Length(AParams) > 0 then
  begin
    raise Exception.Create('Params not support in this version');
  end;

  if Length(CryptKey)>0 then
     Msg := Encrypt(CryptKey, Msg);

  FBlockSocket.SendString(Msg+CRLF);
  if FBlockSocket.LastError = 0 then
  begin
     Result := SizeOf(Msg);
  end;

  if Result>0 then
  begin
     ThWriteDebugLog(Format(rsMessageSent, [Msg]));
  end
  else if Result=0 then
  begin
    ThWriteErrorLog(Format(rsMessageSentError, [Msg,FBlockSocket.LastErrorDesc]), FBlockSocket.LastError);
  end;
end;       
Аватара пользователя
Sharfik
энтузиаст
 
Сообщения: 766
Зарегистрирован: 20.07.2013 01:04:30

Re: Synapse TCP/IP client and server

Сообщение stikriz11 » 04.10.2023 10:43:53

У меня общая логика примерно такая для TCPIP: всегда клиент спрашивает и ждет ответа, сервер отвечает и ждет следующего вопроса. Можно в пакете, который должен обработаться целиком первым послать 4 байта с длиной пакета. И оба обработчика на сервере и клиенте сначала читают 4 байта, откуда узнают до каких пор читать. А если надо (ну надо, например, для БД транзакцию держать) долго держать коннект, то клиент в паузах посылает пакетик, типа, "я тута". А если клиент ждет ответ, а он должен ждать пока не придет ответ, так вот, если он ждет долго, то разрывает соединение по таймауту - это законно и правильно. Если мне надо обработать много пакетов сразу, то я его запихиваю в пакет для пакетов и все так же как с одним. Разница уже выше логики маршалинга. А что в пакетах? Я посмотрел как устроены параметры в компонентах Query и сделал класс, который может себя записывать в TStream. И эти стримы я и передаю туды-сюды. В результате транспорт полностью отвязан от логики и никак не меняется при наращивании функционала. Если при обработке данных на сервере произошла любая ошибка (не в транспорте), то номер и сообщение передается в ответе клиенту, ничего не разрывается и клиент решает критично это или нет. Если обработка данных на сервере может быть выполнена в фоне, то клиент все равно ждет, просто на сервере запускается нитка, а клиенту возвращается ответ, что процесс пошел. Когда данные обработались и готов ответ на сервере, то есть две реализации. Либо клиент периодически спрашивает есть ли сообщения для клиента и сервер отдает сообщения. Либо есть еще один конект в который клиент посылает те же самые вопросы по сообщениям и долго ждет ответа, чтобы не засерать сеть. Вот так я думаю надежно и правильно. Проверено на нескольких проектах.
stikriz11
постоялец
 
Сообщения: 114
Зарегистрирован: 04.09.2023 15:54:19

Re: Synapse TCP/IP client and server

Сообщение Sharfik » 24.12.2023 00:48:17

Интересная статья на тему передачи файлов
https://habr.com/ru/companies/ibm/articles/274807/
Аватара пользователя
Sharfik
энтузиаст
 
Сообщения: 766
Зарегистрирован: 20.07.2013 01:04:30

Пред.

Вернуться в Общее

Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 44

Рейтинг@Mail.ru