package com.google.devtools.mobileharness.infra.client.longrunningservice.rpc.service;

import com.google.common.base.Ascii;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger;
import com.google.devtools.common.metrics.stability.rpc.grpc.GrpcServiceUtil;
import com.google.devtools.mobileharness.infra.client.longrunningservice.controller.LogManager;
import com.google.devtools.mobileharness.infra.client.longrunningservice.controller.SessionManager;
import com.google.devtools.mobileharness.infra.client.longrunningservice.proto.ControlServiceGrpc;
import com.google.devtools.mobileharness.infra.client.longrunningservice.proto.ControlServiceProto;
import com.google.devtools.mobileharness.infra.client.longrunningservice.proto.LogProto;
import com.google.devtools.mobileharness.infra.client.longrunningservice.proto.SessionProto;
import com.google.devtools.mobileharness.infra.client.longrunningservice.util.SessionQueryUtil;
import com.google.devtools.mobileharness.shared.util.base.ProtoTextFormat;
import com.google.devtools.mobileharness.shared.util.comm.server.LifecycleManager;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import javax.annotation.Nullable;
import javax.inject.Inject;

/* loaded from: input_file:com/google/devtools/mobileharness/infra/client/longrunningservice/rpc/service/ControlService.class */
public class ControlService extends ControlServiceGrpc.ControlServiceImplBase {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    private final LogManager<LogProto.LogRecords> logManager;
    private final SessionManager sessionManager;
    private final Cache<String, Boolean> aliveClientIds = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(1)).removalListener(removalNotification -> {
        if (removalNotification.wasEvicted()) {
            logger.atInfo().log("Client [%s] becomes not alive", removalNotification.getKey());
        }
    }).build();
    private volatile LifecycleManager lifecycleManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/devtools/mobileharness/infra/client/longrunningservice/rpc/service/ControlService$GetLogRequestStreamObserver.class */
    public class GetLogRequestStreamObserver implements StreamObserver<ControlServiceProto.GetLogRequest> {
        private final ForwardingLogRecordHandler forwardingLogRecordHandler;

        @Nullable
        private volatile String clientId;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/devtools/mobileharness/infra/client/longrunningservice/rpc/service/ControlService$GetLogRequestStreamObserver$ForwardingLogRecordHandler.class */
        public class ForwardingLogRecordHandler implements LogManager.LogRecordsConsumer<LogProto.LogRecords> {
            private final StreamObserver<ControlServiceProto.GetLogResponse> responseObserver;

            private ForwardingLogRecordHandler(StreamObserver<ControlServiceProto.GetLogResponse> streamObserver) {
                this.responseObserver = streamObserver;
            }

            @Override // com.google.devtools.mobileharness.infra.client.longrunningservice.controller.LogManager.LogRecordsConsumer
            public void consumeLogRecords(LogProto.LogRecords logRecords) {
                List<LogProto.LogRecord> logRecordList = logRecords.getLogRecordList();
                List<LogProto.LogRecord> filterLogRecords = filterLogRecords(logRecordList);
                this.responseObserver.onNext(ControlServiceProto.GetLogResponse.newBuilder().setLogRecords(logRecordList.size() == filterLogRecords.size() ? logRecords : logRecords.toBuilder().clearLogRecord().addAllLogRecord(filterLogRecords).build()).build());
            }

            private void onCompleted() {
                this.responseObserver.onCompleted();
            }

            private List<LogProto.LogRecord> filterLogRecords(List<LogProto.LogRecord> list) {
                String str = GetLogRequestStreamObserver.this.clientId;
                if (str == null) {
                    return list;
                }
                int i = 0;
                Iterator<LogProto.LogRecord> it = list.iterator();
                while (it.hasNext()) {
                    if (ControlService.acceptLogRecord(str, it.next())) {
                        i++;
                    }
                }
                if (i == list.size()) {
                    return list;
                }
                if (i == 0) {
                    return ImmutableList.of();
                }
                ArrayList arrayList = new ArrayList(i);
                for (LogProto.LogRecord logRecord : list) {
                    if (ControlService.acceptLogRecord(str, logRecord)) {
                        arrayList.add(logRecord);
                    }
                }
                return arrayList;
            }
        }

        private GetLogRequestStreamObserver(StreamObserver<ControlServiceProto.GetLogResponse> streamObserver) {
            this.forwardingLogRecordHandler = new ForwardingLogRecordHandler(streamObserver);
        }

        @Override // io.grpc.stub.StreamObserver
        public void onNext(ControlServiceProto.GetLogRequest getLogRequest) {
            if (!getLogRequest.getEnable()) {
                ControlService.this.logManager.removeConsumer(this.forwardingLogRecordHandler);
            } else {
                this.clientId = getLogRequest.hasClientId() ? getLogRequest.getClientId() : null;
                ControlService.this.logManager.addConsumer(this.forwardingLogRecordHandler);
            }
        }

        @Override // io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            doOnCompleted();
            if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.Code.CANCELLED)) {
                return;
            }
            ControlService.logger.atWarning().withCause(th).log("GetLog RPC error");
        }

        @Override // io.grpc.stub.StreamObserver
        public void onCompleted() {
            doOnCompleted();
        }

        private void doOnCompleted() {
            ControlService.this.logManager.removeConsumer(this.forwardingLogRecordHandler);
            this.forwardingLogRecordHandler.onCompleted();
        }
    }

    @Inject
    ControlService(LogManager<LogProto.LogRecords> logManager, SessionManager sessionManager) {
        this.logManager = logManager;
        this.sessionManager = sessionManager;
    }

    public void setLifecycleManager(LifecycleManager lifecycleManager) {
        this.lifecycleManager = lifecycleManager;
    }

    @Override // com.google.devtools.mobileharness.infra.client.longrunningservice.proto.ControlServiceGrpc.AsyncService
    public void killServer(ControlServiceProto.KillServerRequest killServerRequest, StreamObserver<ControlServiceProto.KillServerResponse> streamObserver) {
        GrpcServiceUtil.invoke(killServerRequest, streamObserver, this::doKillServer, ControlServiceGrpc.getServiceDescriptor(), ControlServiceGrpc.getKillServerMethod());
    }

    @Override // com.google.devtools.mobileharness.infra.client.longrunningservice.proto.ControlServiceGrpc.AsyncService
    public StreamObserver<ControlServiceProto.GetLogRequest> getLog(StreamObserver<ControlServiceProto.GetLogResponse> streamObserver) {
        return new GetLogRequestStreamObserver(streamObserver);
    }

    @Override // com.google.devtools.mobileharness.infra.client.longrunningservice.proto.ControlServiceGrpc.AsyncService
    public void setLogLevel(ControlServiceProto.SetLogLevelRequest setLogLevelRequest, StreamObserver<ControlServiceProto.SetLogLevelResponse> streamObserver) {
        GrpcServiceUtil.invoke(setLogLevelRequest, streamObserver, this::doSetLogLevel, ControlServiceGrpc.getServiceDescriptor(), ControlServiceGrpc.getSetLogLevelMethod());
    }

    @Override // com.google.devtools.mobileharness.infra.client.longrunningservice.proto.ControlServiceGrpc.AsyncService
    public void heartbeat(ControlServiceProto.HeartbeatRequest heartbeatRequest, StreamObserver<ControlServiceProto.HeartbeatResponse> streamObserver) {
        GrpcServiceUtil.invoke(heartbeatRequest, streamObserver, this::doHeartbeat, ControlServiceGrpc.getServiceDescriptor(), ControlServiceGrpc.getHeartbeatMethod());
    }

    private ControlServiceProto.KillServerResponse doKillServer(ControlServiceProto.KillServerRequest killServerRequest) {
        ControlServiceProto.KillServerResponse.Builder serverPid = ControlServiceProto.KillServerResponse.newBuilder().setServerPid(ProcessHandle.current().pid());
        String clientId = killServerRequest.getClientId();
        ImmutableList immutableList = (ImmutableList) this.sessionManager.getAllSessions(SessionQueryUtil.SESSION_ID_FIELD_MASK, SessionQueryUtil.getAllAbortableSessionFromClientFilter(clientId)).stream().map((v0) -> {
            return v0.getSessionId();
        }).map((v0) -> {
            return v0.getId();
        }).collect(ImmutableList.toImmutableList());
        logger.atInfo().log("Unfinished sessions from client [%s]: %s", clientId, immutableList);
        this.sessionManager.abortSessions(immutableList);
        ImmutableList<SessionProto.SessionDetail> allSessions = this.sessionManager.getAllSessions(SessionQueryUtil.SESSION_SUMMARY_FIELD_MASK, SessionQueryUtil.UNFINISHED_NOT_ABORTED_SESSION_FILTER);
        this.aliveClientIds.invalidate(clientId);
        Set<String> keySet = this.aliveClientIds.asMap().keySet();
        if (allSessions.isEmpty() && keySet.isEmpty()) {
            logger.atInfo().log("Exiting by KillServerRequest");
            this.lifecycleManager.shutdown();
            return serverPid.setSuccess(ControlServiceProto.KillServerResponse.Success.getDefaultInstance()).build();
        }
        ControlServiceProto.KillServerResponse build = serverPid.setFailure(ControlServiceProto.KillServerResponse.Failure.newBuilder().addAllUnfinishedSessions(allSessions).addAllAliveClients(keySet)).build();
        logger.atInfo().log("KillServerRequest is rejected due to unfinished (and not aborted) sessions or alive clients, response=[%s]", ProtoTextFormat.shortDebugString(build));
        return build;
    }

    private ControlServiceProto.SetLogLevelResponse doSetLogLevel(ControlServiceProto.SetLogLevelRequest setLogLevelRequest) {
        this.logManager.getLogHandler().setLevel(Level.parse(Ascii.toUpperCase(setLogLevelRequest.getLevel())));
        return ControlServiceProto.SetLogLevelResponse.getDefaultInstance();
    }

    private ControlServiceProto.HeartbeatResponse doHeartbeat(ControlServiceProto.HeartbeatRequest heartbeatRequest) {
        this.aliveClientIds.put(heartbeatRequest.getClientId(), false);
        return ControlServiceProto.HeartbeatResponse.getDefaultInstance();
    }

    private static boolean acceptLogRecord(String str, LogProto.LogRecord logRecord) {
        return !logRecord.hasClientId() || logRecord.getClientId().equals(str);
    }
}
