1 //===--- Client.cpp ----------------------------------------------*- C++-*-===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include <grpc++/grpc++.h> 10 11 #include "Client.h" 12 #include "Feature.h" 13 #include "Service.grpc.pb.h" 14 #include "index/Index.h" 15 #include "marshalling/Marshalling.h" 16 #include "support/Logger.h" 17 #include "support/Trace.h" 18 #include "llvm/ADT/SmallString.h" 19 #include "llvm/ADT/StringRef.h" 20 #include "llvm/Support/Error.h" 21 22 #include <atomic> 23 #include <chrono> 24 #include <memory> 25 26 namespace clang { 27 namespace clangd { 28 namespace remote { 29 namespace { 30 31 llvm::StringRef toString(const grpc_connectivity_state &State) { 32 switch (State) { 33 case GRPC_CHANNEL_IDLE: 34 return "idle"; 35 case GRPC_CHANNEL_CONNECTING: 36 return "connecting"; 37 case GRPC_CHANNEL_READY: 38 return "ready"; 39 case GRPC_CHANNEL_TRANSIENT_FAILURE: 40 return "transient failure"; 41 case GRPC_CHANNEL_SHUTDOWN: 42 return "shutdown"; 43 } 44 llvm_unreachable("Not a valid grpc_connectivity_state."); 45 } 46 47 class IndexClient : public clangd::SymbolIndex { 48 void updateConnectionStatus() const { 49 auto NewStatus = Channel->GetState(/*try_to_connect=*/false); 50 auto OldStatus = ConnectionStatus.exchange(NewStatus); 51 if (OldStatus != NewStatus) 52 vlog("Remote index connection [{0}]: {1} => {2}", ServerAddress, 53 toString(OldStatus), toString(NewStatus)); 54 } 55 56 template <typename RequestT, typename ReplyT> 57 using StreamingCall = std::unique_ptr<grpc::ClientReader<ReplyT>> ( 58 remote::v1::SymbolIndex::Stub::*)(grpc::ClientContext *, 59 const RequestT &); 60 61 template <typename RequestT, typename ReplyT, typename ClangdRequestT, 62 typename CallbackT> 63 bool streamRPC(ClangdRequestT Request, 64 StreamingCall<RequestT, ReplyT> RPCCall, 65 CallbackT Callback) const { 66 updateConnectionStatus(); 67 // We initialize to true because stream might be broken before we see the 68 // final message. In such a case there are actually more results on the 69 // stream, but we couldn't get to them. 70 bool HasMore = true; 71 trace::Span Tracer(RequestT::descriptor()->name()); 72 const auto RPCRequest = ProtobufMarshaller->toProtobuf(Request); 73 SPAN_ATTACH(Tracer, "Request", RPCRequest.DebugString()); 74 grpc::ClientContext Context; 75 Context.AddMetadata("version", versionString()); 76 Context.AddMetadata("features", featureString()); 77 Context.AddMetadata("platform", platformString()); 78 std::chrono::system_clock::time_point StartTime = 79 std::chrono::system_clock::now(); 80 auto Deadline = StartTime + DeadlineWaitingTime; 81 Context.set_deadline(Deadline); 82 auto Reader = (Stub.get()->*RPCCall)(&Context, RPCRequest); 83 dlog("Sending {0}: {1}", RequestT::descriptor()->name(), 84 RPCRequest.DebugString()); 85 ReplyT Reply; 86 unsigned Successful = 0; 87 unsigned FailedToParse = 0; 88 while (Reader->Read(&Reply)) { 89 if (!Reply.has_stream_result()) { 90 HasMore = Reply.final_result().has_more(); 91 continue; 92 } 93 auto Response = ProtobufMarshaller->fromProtobuf(Reply.stream_result()); 94 if (!Response) { 95 elog("Received invalid {0}: {1}. Reason: {2}", 96 ReplyT::descriptor()->name(), Reply.stream_result().DebugString(), 97 Response.takeError()); 98 ++FailedToParse; 99 continue; 100 } 101 Callback(*Response); 102 ++Successful; 103 } 104 auto Millis = std::chrono::duration_cast<std::chrono::milliseconds>( 105 std::chrono::system_clock::now() - StartTime) 106 .count(); 107 vlog("Remote index [{0}]: {1} => {2} results in {3}ms.", ServerAddress, 108 RequestT::descriptor()->name(), Successful, Millis); 109 SPAN_ATTACH(Tracer, "Status", Reader->Finish().ok()); 110 SPAN_ATTACH(Tracer, "Successful", Successful); 111 SPAN_ATTACH(Tracer, "Failed to parse", FailedToParse); 112 updateConnectionStatus(); 113 return HasMore; 114 } 115 116 public: 117 IndexClient( 118 std::shared_ptr<grpc::Channel> Channel, llvm::StringRef Address, 119 llvm::StringRef ProjectRoot, 120 std::chrono::milliseconds DeadlineTime = std::chrono::milliseconds(1000)) 121 : Stub(remote::v1::SymbolIndex::NewStub(Channel)), Channel(Channel), 122 ServerAddress(Address), 123 ConnectionStatus(Channel->GetState(/*try_to_connect=*/true)), 124 ProtobufMarshaller(new Marshaller(/*RemoteIndexRoot=*/"", 125 /*LocalIndexRoot=*/ProjectRoot)), 126 DeadlineWaitingTime(DeadlineTime) { 127 assert(!ProjectRoot.empty()); 128 } 129 130 void lookup(const clangd::LookupRequest &Request, 131 llvm::function_ref<void(const clangd::Symbol &)> Callback) 132 const override { 133 streamRPC(Request, &remote::v1::SymbolIndex::Stub::Lookup, Callback); 134 } 135 136 bool fuzzyFind(const clangd::FuzzyFindRequest &Request, 137 llvm::function_ref<void(const clangd::Symbol &)> Callback) 138 const override { 139 return streamRPC(Request, &remote::v1::SymbolIndex::Stub::FuzzyFind, 140 Callback); 141 } 142 143 bool 144 refs(const clangd::RefsRequest &Request, 145 llvm::function_ref<void(const clangd::Ref &)> Callback) const override { 146 return streamRPC(Request, &remote::v1::SymbolIndex::Stub::Refs, Callback); 147 } 148 149 bool containedRefs(const clangd::ContainedRefsRequest &Request, 150 llvm::function_ref<void(const ContainedRefsResult &)> 151 Callback) const override { 152 return streamRPC(Request, &remote::v1::SymbolIndex::Stub::ContainedRefs, 153 Callback); 154 } 155 156 void 157 relations(const clangd::RelationsRequest &Request, 158 llvm::function_ref<void(const SymbolID &, const clangd::Symbol &)> 159 Callback) const override { 160 streamRPC(Request, &remote::v1::SymbolIndex::Stub::Relations, 161 // Unpack protobuf Relation. 162 [&](std::pair<SymbolID, clangd::Symbol> SubjectAndObject) { 163 Callback(SubjectAndObject.first, SubjectAndObject.second); 164 }); 165 } 166 167 llvm::unique_function<IndexContents(llvm::StringRef) const> 168 indexedFiles() const override { 169 // FIXME: For now we always return IndexContents::None regardless of whether 170 // the file was indexed or not. A possible implementation could be 171 // based on the idea that we do not want to send a request at every 172 // call of a function returned by IndexClient::indexedFiles(). 173 return [](llvm::StringRef) { return IndexContents::None; }; 174 } 175 176 // IndexClient does not take any space since the data is stored on the 177 // server. 178 size_t estimateMemoryUsage() const override { return 0; } 179 180 private: 181 std::unique_ptr<remote::v1::SymbolIndex::Stub> Stub; 182 std::shared_ptr<grpc::Channel> Channel; 183 llvm::SmallString<256> ServerAddress; 184 mutable std::atomic<grpc_connectivity_state> ConnectionStatus; 185 std::unique_ptr<Marshaller> ProtobufMarshaller; 186 // Each request will be terminated if it takes too long. 187 std::chrono::milliseconds DeadlineWaitingTime; 188 }; 189 190 } // namespace 191 192 std::unique_ptr<clangd::SymbolIndex> getClient(llvm::StringRef Address, 193 llvm::StringRef ProjectRoot) { 194 const auto Channel = 195 grpc::CreateChannel(Address.str(), grpc::InsecureChannelCredentials()); 196 return std::unique_ptr<clangd::SymbolIndex>( 197 new IndexClient(Channel, Address, ProjectRoot)); 198 } 199 200 } // namespace remote 201 } // namespace clangd 202 } // namespace clang 203